"""文档解析工作流 - 异步处理""" import asyncio import uuid from datetime import datetime from typing import List import io from app.core.config import settings from app.services.minio import minio_service from app.services.database import db_service, DocStatus from app.services.tasks import set_task_status, get_task_status from app.services.document import DocumentService from app.utils.chunking import TextChunker from app.utils.logger import logger def generate_doc_id() -> str: """生成文档ID""" return f"doc-{uuid.uuid4().hex[:12]}" def generate_chunk_id(doc_id: str, index: int) -> str: """生成块ID""" return f"{doc_id}-chunk-{index}" async def run_parse_workflow(task_id: str, doc_id: str): """ 执行文档解析工作流 处理步骤: 1. 获取文件 - 从 MinIO 下载文件 2. 解析文档 - 提取文本内容 3. 文本分块 - 按条款或固定大小分块 4. 保存结果 - 存储分块数据 Args: task_id: 任务ID doc_id: 文档ID """ chunker = TextChunker() doc_service = DocumentService(settings.data_raw_dir, settings.data_parsed_dir) try: # Step 1: 获取文件 set_task_status(task_id, { "status": "running", "step": "fetching", "progress": 10, "message": "正在从存储获取文件...", "started_at": datetime.now(), }) db_service.update_document_status(doc_id, DocStatus.parsing.value) doc = db_service.get_document(doc_id) if not doc: raise ValueError(f"Document {doc_id} not found") # 从 MinIO 获取文件 file_data = minio_service.get_file(doc.minio_path) # 保存到本地临时目录(用于解析) temp_path = f"{settings.data_raw_dir}/{doc_id}_{doc.filename}" with open(temp_path, "wb") as f: f.write(file_data) await asyncio.sleep(0.5) # 模拟延迟 # Step 2: 解析文档 set_task_status(task_id, { "status": "running", "step": "parsing", "progress": 30, "message": "正在解析文档内容...", }) text = doc_service.parse_document(temp_path) if not text: raise ValueError("Document parsing returned empty content") # 保存解析后的文本 parsed_path = doc_service.save_parsed_text(doc_id, text) await asyncio.sleep(0.5) # Step 3: 文本分块 set_task_status(task_id, { "status": "running", "step": "chunking", "progress": 50, "message": "正在进行文本分块...", }) # 尝试按条款分块,如果不是法规格式则按大小分块 chunks = chunker.chunk_by_clause(text) if len(chunks) == 0: chunks = chunker.chunk_by_size(text) await asyncio.sleep(0.5) # Step 4: 保存分块结果 set_task_status(task_id, { "status": "running", "step": "saving", "progress": 80, "message": f"正在保存 {len(chunks)} 个文本块...", }) # TODO: 将分块存储到数据库或向量库 # 这里先统计数量 chunk_count = len(chunks) await asyncio.sleep(0.5) # Step 5: 完成 set_task_status(task_id, { "status": "completed", "step": "done", "progress": 100, "message": f"解析完成,共生成 {chunk_count} 个文本块", "completed_at": datetime.now(), "result": { "doc_id": doc_id, "chunks": chunk_count, "parsed_path": parsed_path, } }) db_service.update_document_status( doc_id, DocStatus.parsed.value, chunks=chunk_count, ) logger.info(f"Parse workflow completed for doc {doc_id}: {chunk_count} chunks") except Exception as e: logger.error(f"Parse workflow failed for doc {doc_id}: {e}") set_task_status(task_id, { "status": "failed", "step": "error", "progress": 0, "message": str(e), "completed_at": datetime.now(), }) db_service.update_document_status( doc_id, DocStatus.failed.value, error_message=str(e), ) async def run_embedding_workflow(task_id: str, doc_id: str): """ 执行向量化工作流 处理步骤: 1. 获取分块数据 2. 生成向量嵌入 3. 存入向量数据库 Args: task_id: 任务ID doc_id: 文档ID """ try: # Step 1: 获取分块 set_task_status(task_id, { "status": "running", "step": "fetching_chunks", "progress": 10, "message": "正在获取文本分块...", "started_at": datetime.now(), }) db_service.update_document_status(doc_id, DocStatus.embedding.value) doc = db_service.get_document(doc_id) if not doc: raise ValueError(f"Document {doc_id} not found") await asyncio.sleep(0.5) # Step 2: 生成嵌入 set_task_status(task_id, { "status": "running", "step": "embedding", "progress": 40, "message": "正在生成向量嵌入...", }) # TODO: 调用 Embedding 服务生成向量 # 这里先模拟处理 vector_count = doc.chunks await asyncio.sleep(1) # Step 3: 存入向量库 set_task_status(task_id, { "status": "running", "step": "storing", "progress": 70, "message": "正在存入向量数据库...", }) # TODO: 存入 Milvus await asyncio.sleep(0.5) # Step 4: 完成 set_task_status(task_id, { "status": "completed", "step": "done", "progress": 100, "message": f"向量化完成,共处理 {vector_count} 个向量", "completed_at": datetime.now(), "result": { "doc_id": doc_id, "vectors": vector_count, } }) db_service.update_document_status( doc_id, DocStatus.indexed.value, vectors=vector_count, ) logger.info(f"Embedding workflow completed for doc {doc_id}") except Exception as e: logger.error(f"Embedding workflow failed for doc {doc_id}: {e}") set_task_status(task_id, { "status": "failed", "step": "error", "progress": 0, "message": str(e), "completed_at": datetime.now(), }) db_service.update_document_status( doc_id, DocStatus.failed.value, error_message=str(e), )