Files
AIRegulation-Demo-Test-Backend/app/workflows/document_workflow.py

252 lines
6.9 KiB
Python
Raw Permalink Normal View History

"""文档解析工作流 - 异步处理"""
import asyncio
import uuid
from datetime import datetime
from typing import List
import io
from app.core.config import settings
from app.services.minio import minio_service
from app.services.database import db_service, DocStatus
from app.services.tasks import set_task_status, get_task_status
from app.services.document import DocumentService
from app.utils.chunking import TextChunker
from app.utils.logger import logger
def generate_doc_id() -> str:
"""生成文档ID"""
return f"doc-{uuid.uuid4().hex[:12]}"
def generate_chunk_id(doc_id: str, index: int) -> str:
"""生成块ID"""
return f"{doc_id}-chunk-{index}"
async def run_parse_workflow(task_id: str, doc_id: str):
"""
执行文档解析工作流
处理步骤
1. 获取文件 - MinIO 下载文件
2. 解析文档 - 提取文本内容
3. 文本分块 - 按条款或固定大小分块
4. 保存结果 - 存储分块数据
Args:
task_id: 任务ID
doc_id: 文档ID
"""
chunker = TextChunker()
doc_service = DocumentService(settings.data_raw_dir, settings.data_parsed_dir)
try:
# Step 1: 获取文件
set_task_status(task_id, {
"status": "running",
"step": "fetching",
"progress": 10,
"message": "正在从存储获取文件...",
"started_at": datetime.now(),
})
db_service.update_document_status(doc_id, DocStatus.parsing.value)
doc = db_service.get_document(doc_id)
if not doc:
raise ValueError(f"Document {doc_id} not found")
# 从 MinIO 获取文件
file_data = minio_service.get_file(doc.minio_path)
# 保存到本地临时目录(用于解析)
temp_path = f"{settings.data_raw_dir}/{doc_id}_{doc.filename}"
with open(temp_path, "wb") as f:
f.write(file_data)
await asyncio.sleep(0.5) # 模拟延迟
# Step 2: 解析文档
set_task_status(task_id, {
"status": "running",
"step": "parsing",
"progress": 30,
"message": "正在解析文档内容...",
})
text = doc_service.parse_document(temp_path)
if not text:
raise ValueError("Document parsing returned empty content")
# 保存解析后的文本
parsed_path = doc_service.save_parsed_text(doc_id, text)
await asyncio.sleep(0.5)
# Step 3: 文本分块
set_task_status(task_id, {
"status": "running",
"step": "chunking",
"progress": 50,
"message": "正在进行文本分块...",
})
# 尝试按条款分块,如果不是法规格式则按大小分块
chunks = chunker.chunk_by_clause(text)
if len(chunks) == 0:
chunks = chunker.chunk_by_size(text)
await asyncio.sleep(0.5)
# Step 4: 保存分块结果
set_task_status(task_id, {
"status": "running",
"step": "saving",
"progress": 80,
"message": f"正在保存 {len(chunks)} 个文本块...",
})
# TODO: 将分块存储到数据库或向量库
# 这里先统计数量
chunk_count = len(chunks)
await asyncio.sleep(0.5)
# Step 5: 完成
set_task_status(task_id, {
"status": "completed",
"step": "done",
"progress": 100,
"message": f"解析完成,共生成 {chunk_count} 个文本块",
"completed_at": datetime.now(),
"result": {
"doc_id": doc_id,
"chunks": chunk_count,
"parsed_path": parsed_path,
}
})
db_service.update_document_status(
doc_id,
DocStatus.parsed.value,
chunks=chunk_count,
)
logger.info(f"Parse workflow completed for doc {doc_id}: {chunk_count} chunks")
except Exception as e:
logger.error(f"Parse workflow failed for doc {doc_id}: {e}")
set_task_status(task_id, {
"status": "failed",
"step": "error",
"progress": 0,
"message": str(e),
"completed_at": datetime.now(),
})
db_service.update_document_status(
doc_id,
DocStatus.failed.value,
error_message=str(e),
)
async def run_embedding_workflow(task_id: str, doc_id: str):
"""
执行向量化工作流
处理步骤
1. 获取分块数据
2. 生成向量嵌入
3. 存入向量数据库
Args:
task_id: 任务ID
doc_id: 文档ID
"""
try:
# Step 1: 获取分块
set_task_status(task_id, {
"status": "running",
"step": "fetching_chunks",
"progress": 10,
"message": "正在获取文本分块...",
"started_at": datetime.now(),
})
db_service.update_document_status(doc_id, DocStatus.embedding.value)
doc = db_service.get_document(doc_id)
if not doc:
raise ValueError(f"Document {doc_id} not found")
await asyncio.sleep(0.5)
# Step 2: 生成嵌入
set_task_status(task_id, {
"status": "running",
"step": "embedding",
"progress": 40,
"message": "正在生成向量嵌入...",
})
# TODO: 调用 Embedding 服务生成向量
# 这里先模拟处理
vector_count = doc.chunks
await asyncio.sleep(1)
# Step 3: 存入向量库
set_task_status(task_id, {
"status": "running",
"step": "storing",
"progress": 70,
"message": "正在存入向量数据库...",
})
# TODO: 存入 Milvus
await asyncio.sleep(0.5)
# Step 4: 完成
set_task_status(task_id, {
"status": "completed",
"step": "done",
"progress": 100,
"message": f"向量化完成,共处理 {vector_count} 个向量",
"completed_at": datetime.now(),
"result": {
"doc_id": doc_id,
"vectors": vector_count,
}
})
db_service.update_document_status(
doc_id,
DocStatus.indexed.value,
vectors=vector_count,
)
logger.info(f"Embedding workflow completed for doc {doc_id}")
except Exception as e:
logger.error(f"Embedding workflow failed for doc {doc_id}: {e}")
set_task_status(task_id, {
"status": "failed",
"step": "error",
"progress": 0,
"message": str(e),
"completed_at": datetime.now(),
})
db_service.update_document_status(
doc_id,
DocStatus.failed.value,
error_message=str(e),
)