Files
AIRegulation-DocAnalysis/backend/app/infrastructure/storage/minio_binary_store.py

48 lines
1.6 KiB
Python
Raw Normal View History

"""Implement infrastructure support for minio binary store."""
from __future__ import annotations
from app.domain.documents import DocumentBinaryStore
from app.services.storage.minio_client import MinIOClient
# Keep adapter behavior explicit so integration details remain easy to audit.
class MinioDocumentBinaryStore(DocumentBinaryStore):
"""Provide the Minio Document Binary Store store implementation."""
def __init__(self) -> None:
"""Initialize the Minio Document Binary Store instance."""
self.client = MinIOClient()
self.client.connect()
self.client.ensure_bucket()
def save(
self,
*,
object_name: str,
data: bytes,
content_type: str,
metadata: dict[str, str] | None = None,
) -> None:
"""Handle save for the Minio Document Binary Store instance."""
success = self.client.upload_bytes(
data=data,
object_name=object_name,
content_type=content_type,
metadata=metadata,
)
if not success:
raise RuntimeError("MinIO 保存失败")
def read(self, object_name: str) -> bytes:
"""Handle read for the Minio Document Binary Store instance."""
data = self.client.get_object_data(object_name)
if data is None:
raise FileNotFoundError(f"对象不存在: {object_name}")
return data
def delete(self, object_name: str) -> None:
"""Handle delete for the Minio Document Binary Store instance."""
if not self.client.delete_object(object_name):
raise FileNotFoundError(f"对象删除失败: {object_name}")