274 lines
8.5 KiB
Python
274 lines
8.5 KiB
Python
"""Provide service-layer logic for minio client."""
|
||
|
||
from minio import Minio
|
||
from minio.error import S3Error
|
||
from typing import Optional, Dict, Any
|
||
from loguru import logger
|
||
from io import BytesIO
|
||
import os
|
||
|
||
from app.config.settings import settings
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
|
||
|
||
|
||
class MinIOClient:
|
||
"""Represent the Min I O Client type."""
|
||
|
||
def __init__(
|
||
self,
|
||
endpoint: str = None,
|
||
access_key: str = None,
|
||
secret_key: str = None,
|
||
bucket: str = None,
|
||
secure: bool = None
|
||
):
|
||
"""Initialize the Min I O Client instance."""
|
||
self.endpoint = endpoint or settings.minio_endpoint
|
||
self.access_key = access_key or settings.minio_access_key
|
||
self.secret_key = secret_key or settings.minio_secret_key
|
||
self.bucket = bucket or settings.minio_bucket
|
||
self.secure = secure or settings.minio_secure
|
||
|
||
self.client: Optional[Minio] = None
|
||
self.connected = False
|
||
|
||
logger.info(f"MinIO客户端配置: {self.endpoint}, bucket={self.bucket}")
|
||
|
||
def connect(self) -> bool:
|
||
"""Handle connect for the Min I O Client instance."""
|
||
try:
|
||
self.client = Minio(
|
||
self.endpoint,
|
||
access_key=self.access_key,
|
||
secret_key=self.secret_key,
|
||
secure=self.secure
|
||
)
|
||
self.connected = True
|
||
logger.success(f"MinIO连接成功: {self.endpoint}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"MinIO连接失败: {e}")
|
||
self.connected = False
|
||
return False
|
||
|
||
def ensure_bucket(self) -> bool:
|
||
"""Handle ensure bucket for the Min I O Client instance."""
|
||
if not self.connected:
|
||
logger.warning("未连接MinIO,请先调用connect()")
|
||
return False
|
||
|
||
try:
|
||
if not self.client.bucket_exists(self.bucket):
|
||
self.client.make_bucket(self.bucket)
|
||
logger.success(f"创建存储桶: {self.bucket}")
|
||
else:
|
||
logger.info(f"存储桶已存在: {self.bucket}")
|
||
return True
|
||
except S3Error as e:
|
||
logger.error(f"存储桶操作失败: {e}")
|
||
return False
|
||
|
||
def upload_file(
|
||
self,
|
||
file_path: str,
|
||
object_name: str,
|
||
metadata: Dict[str, Any] = None
|
||
) -> bool:
|
||
"""Handle upload file for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
self.ensure_bucket()
|
||
|
||
try:
|
||
file_size = os.stat(file_path).st_size
|
||
content_type = self._get_content_type(file_path)
|
||
|
||
with open(file_path, 'rb') as f:
|
||
self.client.put_object(
|
||
self.bucket,
|
||
object_name,
|
||
f,
|
||
file_size,
|
||
content_type=content_type,
|
||
metadata=metadata
|
||
)
|
||
|
||
logger.success(f"文件上传成功: {object_name}, 大小={file_size}")
|
||
return True
|
||
|
||
except S3Error as e:
|
||
logger.error(f"文件上传失败: {e}")
|
||
return False
|
||
|
||
def upload_bytes(
|
||
self,
|
||
data: bytes,
|
||
object_name: str,
|
||
content_type: str = "application/octet-stream",
|
||
metadata: Dict[str, Any] = None
|
||
) -> bool:
|
||
"""Handle upload bytes for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
self.ensure_bucket()
|
||
|
||
try:
|
||
data_stream = BytesIO(data)
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
safe_metadata = None
|
||
if metadata:
|
||
safe_metadata = {}
|
||
for key, value in metadata.items():
|
||
if isinstance(value, str):
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
try:
|
||
value.encode('ascii')
|
||
safe_metadata[key] = value
|
||
except UnicodeEncodeError:
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
safe_metadata[key] = ""
|
||
else:
|
||
safe_metadata[key] = str(value)
|
||
|
||
self.client.put_object(
|
||
self.bucket,
|
||
object_name,
|
||
data_stream,
|
||
len(data),
|
||
content_type=content_type,
|
||
metadata=safe_metadata
|
||
)
|
||
|
||
logger.success(f"数据上传成功: {object_name}, 大小={len(data)}")
|
||
return True
|
||
|
||
except S3Error as e:
|
||
logger.error(f"数据上传失败: {e}")
|
||
return False
|
||
|
||
def download_file(
|
||
self,
|
||
object_name: str,
|
||
file_path: str
|
||
) -> bool:
|
||
"""Handle download file for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
|
||
try:
|
||
self.client.fget_object(
|
||
self.bucket,
|
||
object_name,
|
||
file_path
|
||
)
|
||
logger.success(f"文件下载成功: {object_name} -> {file_path}")
|
||
return True
|
||
|
||
except S3Error as e:
|
||
logger.error(f"文件下载失败: {e}")
|
||
return False
|
||
|
||
def get_object_url(
|
||
self,
|
||
object_name: str,
|
||
expires: int = 3600
|
||
) -> Optional[str]:
|
||
"""Return object url for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
|
||
try:
|
||
url = self.client.presigned_get_object(
|
||
self.bucket,
|
||
object_name,
|
||
expires=expires
|
||
)
|
||
return url
|
||
|
||
except S3Error as e:
|
||
logger.error(f"获取URL失败: {e}")
|
||
return None
|
||
|
||
def get_object_data(self, object_name: str) -> Optional[bytes]:
|
||
"""Return object data for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
|
||
try:
|
||
response = self.client.get_object(self.bucket, object_name)
|
||
data = response.read()
|
||
response.close()
|
||
response.release_conn()
|
||
return data
|
||
|
||
except S3Error as e:
|
||
logger.error(f"获取对象数据失败: {e}")
|
||
return None
|
||
|
||
def delete_object(self, object_name: str) -> bool:
|
||
"""Delete object for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
|
||
try:
|
||
self.client.remove_object(self.bucket, object_name)
|
||
logger.info(f"对象删除成功: {object_name}")
|
||
return True
|
||
|
||
except S3Error as e:
|
||
logger.error(f"对象删除失败: {e}")
|
||
return False
|
||
|
||
def list_objects(self, prefix: str = "") -> list:
|
||
"""List objects for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
|
||
try:
|
||
objects = self.client.list_objects(self.bucket, prefix=prefix)
|
||
return [obj.object_name for obj in objects]
|
||
|
||
except S3Error as e:
|
||
logger.error(f"列出对象失败: {e}")
|
||
return []
|
||
|
||
def object_exists(self, object_name: str) -> bool:
|
||
"""Handle object exists for the Min I O Client instance."""
|
||
if not self.connected:
|
||
self.connect()
|
||
|
||
try:
|
||
self.client.stat_object(self.bucket, object_name)
|
||
return True
|
||
|
||
except S3Error:
|
||
return False
|
||
|
||
def _get_content_type(self, file_path: str) -> str:
|
||
"""Handle get content type for this module for the Min I O Client instance."""
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
content_types = {
|
||
'.pdf': 'application/pdf',
|
||
'.doc': 'application/msword',
|
||
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||
'.txt': 'text/plain',
|
||
'.json': 'application/json',
|
||
'.xml': 'application/xml',
|
||
}
|
||
return content_types.get(ext, 'application/octet-stream')
|
||
|
||
def close(self):
|
||
"""Release the resources held by this component."""
|
||
self.connected = False
|
||
logger.info("MinIO客户端已关闭")
|
||
|
||
|
||
def create_minio_client() -> MinIOClient:
|
||
"""Create minio client."""
|
||
client = MinIOClient()
|
||
client.connect()
|
||
client.ensure_bucket()
|
||
return client
|