Files
AIRegulation-DocAnalysis/scripts/clear_all.py

195 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
清理脚本 - 清空Milvus向量数据库和MinIO对象存储中的所有文档数据
使用方法:
python scripts/clear_all.py # 清空所有数据
python scripts/clear_all.py --milvus # 仅清空Milvus
python scripts/clear_all.py --minio # 仅清空MinIO
python scripts/clear_all.py --dry-run # 仅查看数据统计,不删除
"""
import argparse
import sys
import os
# 添加 backend 到导入路径
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(PROJECT_ROOT, "backend"))
from app.config.settings import settings
from loguru import logger
def clear_milvus(dry_run: bool = False):
"""清空Milvus向量数据库"""
try:
from pymilvus import connections, Collection, utility
logger.info(f"连接Milvus: {settings.milvus_host}:{settings.milvus_port}")
connections.connect(
alias="default",
host=settings.milvus_host,
port=settings.milvus_port
)
collection_name = settings.milvus_collection
# 检查collection是否存在
if utility.has_collection(collection_name):
collection = Collection(collection_name)
if dry_run:
# 仅统计数量
collection.load()
count = collection.num_entities
logger.info(f"[DRY-RUN] Milvus collection '{collection_name}' 包含 {count} 条记录")
return count
# 删除collection数据会全部清空
logger.info(f"删除collection: {collection_name}")
utility.drop_collection(collection_name)
logger.success(f"Milvus collection '{collection_name}' 已删除")
# 重新创建collection可选
logger.info("重新创建collection...")
from app.services.storage.milvus_client import MilvusClient
client = MilvusClient()
client.connect()
client.create_collection(recreate=True)
client.disconnect()
logger.success("Milvus collection已重新创建")
return 0
else:
logger.info(f"Milvus collection '{collection_name}' 不存在")
return 0
except Exception as e:
logger.error(f"清理Milvus失败: {e}")
return -1
finally:
try:
connections.disconnect("default")
except:
pass
def clear_minio(dry_run: bool = False):
"""清空MinIO对象存储"""
try:
from minio import Minio
from minio.error import S3Error
logger.info(f"连接MinIO: {settings.minio_endpoint}")
client = Minio(
settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_secure
)
bucket = settings.minio_bucket
# 检查bucket是否存在
if not client.bucket_exists(bucket):
logger.info(f"MinIO bucket '{bucket}' 不存在")
return 0
# 列出所有对象
objects = list(client.list_objects(bucket))
count = len(objects)
if dry_run:
logger.info(f"[DRY-RUN] MinIO bucket '{bucket}' 包含 {count} 个对象:")
for obj in objects:
logger.info(f" - {obj.object_name} ({obj.size} bytes)")
return count
# 删除所有对象
logger.info(f"清空bucket '{bucket}' 中的 {count} 个对象...")
deleted = 0
for obj in objects:
try:
client.remove_object(bucket, obj.object_name)
deleted += 1
logger.info(f" 已删除: {obj.object_name}")
except S3Error as e:
logger.warning(f" 删除失败: {obj.object_name} - {e}")
logger.success(f"MinIO已清空删除 {deleted} 个对象")
return deleted
except Exception as e:
logger.error(f"清理MinIO失败: {e}")
return -1
def main():
parser = argparse.ArgumentParser(
description="清空Milvus和MinIO中的所有文档数据"
)
parser.add_argument(
"--milvus",
action="store_true",
help="仅清空Milvus向量数据库"
)
parser.add_argument(
"--minio",
action="store_true",
help="仅清空MinIO对象存储"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="仅查看数据统计,不执行删除"
)
args = parser.parse_args()
# 配置日志格式
logger.remove()
logger.add(sys.stdout, format="{time:HH:mm:ss} | {level} | {message}")
logger.info("=" * 50)
logger.info("文档数据清理脚本")
logger.info("=" * 50)
if args.dry_run:
logger.warning("[DRY-RUN 模式] 仅统计,不删除数据")
results = {}
# 清理Milvus
if args.milvus or (not args.milvus and not args.minio):
logger.info("\n[1] 清理Milvus向量数据库")
results["milvus"] = clear_milvus(dry_run=args.dry_run)
# 清理MinIO
if args.minio or (not args.milvus and not args.minio):
logger.info("\n[2] 清理MinIO对象存储")
results["minio"] = clear_minio(dry_run=args.dry_run)
# 输出结果摘要
logger.info("\n" + "=" * 50)
logger.info("清理结果摘要:")
for name, count in results.items():
if count >= 0:
status = "已清空" if not args.dry_run else "统计完成"
logger.info(f" {name}: {status} ({count} 条/个)")
else:
logger.error(f" {name}: 清理失败")
logger.info("=" * 50)
# 返回状态码
if all(c >= 0 for c in results.values()):
logger.success("清理完成!")
return 0
else:
logger.error("清理失败,请检查错误信息")
return 1
if __name__ == "__main__":
sys.exit(main())