"""Implement infrastructure support for minio binary store.""" from __future__ import annotations from app.domain.documents import DocumentBinaryStore from app.services.storage.minio_client import MinIOClient # Keep adapter behavior explicit so integration details remain easy to audit. class MinioDocumentBinaryStore(DocumentBinaryStore): """Provide the Minio Document Binary Store store implementation.""" def __init__(self) -> None: """Initialize the Minio Document Binary Store instance.""" self.client = MinIOClient() self.client.connect() self.client.ensure_bucket() def save( self, *, object_name: str, data: bytes, content_type: str, metadata: dict[str, str] | None = None, ) -> None: """Handle save for the Minio Document Binary Store instance.""" success = self.client.upload_bytes( data=data, object_name=object_name, content_type=content_type, metadata=metadata, ) if not success: raise RuntimeError("MinIO 保存失败") def read(self, object_name: str) -> bytes: """Handle read for the Minio Document Binary Store instance.""" data = self.client.get_object_data(object_name) if data is None: raise FileNotFoundError(f"对象不存在: {object_name}") return data def delete(self, object_name: str) -> None: """Handle delete for the Minio Document Binary Store instance.""" if not self.client.delete_object(object_name): raise FileNotFoundError(f"对象删除失败: {object_name}")