# 核心文档处理主链路说明 本文件说明当前默认生产链路中的核心文档处理流程,也就是: - `AliyunDocumentParser` - `AliyunVectorChunkBuilder` - `OpenAICompatibleEmbeddingProvider` - `MilvusVectorIndex` 目标是回答四个核心问题: 1. `ParsedDocument` 为什么是多层结构 2. 这些结构分别保存到哪里 3. 哪一步才真正做了向量化 4. Milvus 里最后到底存的是什么 数据库表设计、关系模型、DDL 和 PostgreSQL 职责边界已经单独整理到 [document-processing-database-design.md](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/docs/architecture/document-processing-database-design.md:1)。本文件保留流程视角,只在必要处给出与存储分工相关的摘要,不再作为数据库设计 authority。 ## 1. 主链路总览 当前默认实现由 `DocumentCommandService.upload_and_process()` 统一编排。它不是“parse 完直接进向量库”,而是先生成结构化解析产物,再把其中适合检索的一层送去 embedding 和 Milvus。 ```mermaid sequenceDiagram participant API as API / Service participant MinIO as BinaryStore participant Parser as AliyunDocumentParser participant PG as DocumentRepository / ParseArtifactStore participant Embed as EmbeddingProvider participant Milvus as VectorIndex API->>MinIO: 保存原始文件 API->>Parser: parse(file_path, doc_id, doc_name) Parser-->>API: ParsedDocument API->>MinIO: 保存 layouts/structure_nodes/semantic_blocks/vector_chunks JSON API->>PG: 更新 documents.status=parsed API->>PG: 保存 structure_nodes / semantic_blocks API->>API: chunk_builder.build(parsed_document) API->>Embed: embed_texts([chunk.embedding_text]) Embed-->>API: vectors API->>Milvus: upsert(chunks, vectors) API->>PG: 更新 documents.status=indexed ``` 主链路编排代码在 [services.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/application/documents/services.py:83): ```python def upload_and_process( self, *, doc_id: str | None = None, file_name: str, content: bytes, content_type: str, doc_name: str | None, regulation_type: str, version: str, generate_summary: bool, ) -> DocumentProcessResult: doc_id = doc_id or str(uuid.uuid4())[:8] final_doc_name = doc_name or file_name object_name = f"{doc_id}/{file_name}" self.document_repository.create(document) self.binary_store.save(object_name=object_name, data=content, content_type=content_type, metadata={"doc_id": doc_id}) self.document_repository.update_status(doc_id, DocumentStatus.STORED) parsed_document = self.parser.parse(file_path=temp_path, doc_id=doc_id, doc_name=final_doc_name) artifact_keys = self._save_parse_artifacts(doc_id=doc_id, parsed_document=parsed_document) self.document_repository.update_status(doc_id, DocumentStatus.PARSED, parser_name=parsed_document.parser_name, metadata={...}) if self.parse_artifact_store: self.parse_artifact_store.save(doc_id, parsed_document.structure_nodes, parsed_document.semantic_blocks) chunks = self.chunk_builder.build(parsed_document=parsed_document, regulation_type=regulation_type, version=version) vectors = self.embedding_provider.embed_texts([chunk.embedding_text for chunk in chunks]) inserted = self.vector_index.upsert(chunks, vectors) self.document_repository.update_status(doc_id, DocumentStatus.INDEXED, chunk_count=len(chunks), index_name=health.get("collection_name", ""), metadata={...}) ``` 默认绑定关系在 [bootstrap.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/shared/bootstrap.py:157): ```python def get_parser(): if settings.parser_backend == "aliyun": return AliyunDocumentParser() return LocalDocumentParser() def get_chunk_builder(): if settings.chunk_backend == "aliyun": return AliyunVectorChunkBuilder() return LocalRegulationChunkBuilder(...) def get_embedding_provider() -> OpenAICompatibleEmbeddingProvider: return OpenAICompatibleEmbeddingProvider() def get_vector_index() -> VectorIndex: return LazyVectorIndex(_build_vector_index) ``` 也就是说,当前默认主链路是: - parser: `AliyunDocumentParser` - chunk builder: `AliyunVectorChunkBuilder` - embedding provider: `OpenAICompatibleEmbeddingProvider` - vector index: `MilvusVectorIndex` ## 2. `ParsedDocument` 为什么是三层结构 `ParsedDocument` 不是最终入库格式,而是 parser 输出给后续处理步骤的统一中间结构。它把“结构理解”和“向量检索准备”拆成了三层。 定义在 [models.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/domain/documents/models.py:49): ```python @dataclass class ParsedDocument: doc_id: str doc_name: str structure_nodes: list[dict[str, Any]] semantic_blocks: list[dict[str, Any]] vector_chunks: list[dict[str, Any]] parser_name: str raw_text: str = "" raw_layouts: list[dict[str, Any]] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) ``` 这三层的职责不同: - `structure_nodes` - 标题层级骨架 - 描述“文档有哪些章、节、条” - 用于保留结构,不直接做 embedding - `semantic_blocks` - 语义块层 - 把正文、表格、图片说明整理成连续的语义单元 - 是从原始 layout 到检索 chunk 之间的中间层 - `vector_chunks` - 检索和向量化层 - 已经是适合送给 embedding 模型的 chunk 视图 - 后续 `ChunkBuilder` 基本就是把这层映射成统一 `Chunk` ### 2.1 这三层是怎么从 parser 结果生成的 `AliyunDocumentParser.parse()` 先通过网关拿到阿里云返回的 `layouts`,再把 `layouts` 转成三层结构。 代码在 [aliyun_document_parser.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_document_parser.py:28): ```python def parse(self, *, file_path: str, doc_id: str, doc_name: str) -> ParsedDocument: payload = self.gateway.parse_document(file_path=file_path) layouts = payload.layouts structure_nodes = build_structure_nodes(layouts) semantic_blocks = build_semantic_blocks(layouts) vector_chunks = build_vector_chunks( semantic_blocks, doc_id=doc_id, doc_title=doc_name, max_chars=MAX_CHARS, overlap_chars=OVERLAP_CHARS, ) raw_text = "\n\n".join( block.get("text", "") for block in semantic_blocks if block.get("text") ) return ParsedDocument( doc_id=doc_id, doc_name=doc_name, structure_nodes=structure_nodes, semantic_blocks=semantic_blocks, vector_chunks=vector_chunks, parser_name=self.parser_name, raw_text=raw_text, raw_layouts=layouts, metadata={...}, ) ``` 也就是说: - parser 原始输出是 `layouts` - 当前系统真正消费的是 `ParsedDocument` - `ParsedDocument` 是由 normalizer 从 `layouts` 规整出来的 ### 2.2 第一层:`structure_nodes` 这一层只保留标题和层级。 代码在 [aliyun_layout_normalizer.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_layout_normalizer.py:85): ```python def build_structure_nodes(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]: nodes: list[dict[str, Any]] = [] for layout in layouts: if not is_title(layout): continue text = get_text(layout) if not text or text in TOC_TITLES: continue nodes.append( { "unique_id": layout.get("uniqueId"), "page": get_page(layout), "index": layout.get("index", 0), "level": layout.get("level", 0), "title": text, "type": layout.get("type"), "sub_type": layout.get("subType"), } ) return nodes ``` 示例: ```json [ { "unique_id": "l-title-001", "page": 2, "index": 11, "level": 1, "title": "1 范围", "type": "title", "sub_type": "para_title" }, { "unique_id": "l-title-002", "page": 3, "index": 18, "level": 2, "title": "1.1 适用对象", "type": "title", "sub_type": "para_title" } ] ``` 这层的意义是“保留目录树”,不是直接拿来检索。 ### 2.3 第二层:`semantic_blocks` 这一层会把连续正文合并成一个语义块,也会单独处理表格和图片说明。 代码在 [aliyun_layout_normalizer.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_layout_normalizer.py:163): ```python def build_semantic_blocks(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]: semantic_blocks: list[dict[str, Any]] = [] section_stack: list[dict[str, Any]] = [] pending_text_blocks: list[dict[str, Any]] = [] block_id = 1 for layout in layouts: text = get_text(layout) page = get_page(layout) if is_title(layout): block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id) pending_text_blocks = [] section_stack = update_section_path(section_stack, layout) continue section_path = section_path_titles(section_stack) section_title = section_path[-1] if section_path else "未分类" section_level = len(section_path) if is_table(layout): ... semantic_blocks.append( { "semantic_id": f"semantic-{block_id}", "block_type": "table", "page_start": page, "page_end": page, "section_path": section_path, "section_level": section_level, "section_title": section_title, "source_ids": [layout.get("uniqueId")], "text": table_text, } ) continue if is_text(layout) and text: pending_text_blocks.append( { "page": page, "text": text, "unique_id": layout.get("uniqueId"), "section_path": section_path, "section_level": section_level, "section_title": section_title, } ) ``` 正文合并后会形成类似这样的语义块: ```json [ { "semantic_id": "semantic-1", "block_type": "section_text", "page_start": 2, "page_end": 2, "section_path": ["1 范围", "1.1 适用对象"], "section_level": 2, "section_title": "1.1 适用对象", "source_ids": ["l-text-001", "l-text-002"], "text": "本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。" } ] ``` ## 3. 这些结构分别保存到哪里 ### 3.1 原始文件和中间 artifacts 先落 MinIO 当前链路在上传阶段会先把原始文件保存到对象存储;解析完成后,又会把结构化中间产物保存为 JSON。 保存 artifacts 的代码在 [services.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/application/documents/services.py:62): ```python def _save_parse_artifacts(self, *, doc_id: str, parsed_document: ParsedDocument) -> dict[str, str]: prefix = f"{parsed_document.metadata.get('artifact_prefix', 'artifacts').strip('/')}/{doc_id}" artifact_payloads = { "layouts": parsed_document.raw_layouts, "structure_nodes": parsed_document.structure_nodes, "semantic_blocks": parsed_document.semantic_blocks, "vector_chunks": parsed_document.vector_chunks, } artifact_keys: dict[str, str] = {} for name, payload in artifact_payloads.items(): object_name = f"{prefix}/{name}.json" self.binary_store.save( object_name=object_name, data=json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8"), content_type="application/json", metadata={"doc_id": doc_id, "artifact_type": name}, ) artifact_keys[name] = object_name return artifact_keys ``` `DocumentBinaryStore` 的当前默认实现是 `MinioDocumentBinaryStore`,也就是: - 原始上传文件进 MinIO - `layouts.json` 进 MinIO - `structure_nodes.json` 进 MinIO - `semantic_blocks.json` 进 MinIO - `vector_chunks.json` 进 MinIO ### 3.2 PostgreSQL 在流程中的职责摘要 当前流程中,PostgreSQL 承担的是“文档元数据 + 结构化快照”的职责,而不是向量或大对象存储: - `documents` 保存当前文档主记录、状态、统计和索引信息 - `structure_nodes` 保存当前最新解析快照的目录结构 - `semantic_blocks` 保存当前最新解析快照的语义块结构 更完整的 PostgreSQL 设计,包括: - `documents` - `document_processing_runs` - `document_status_history` - `document_artifacts` - `structure_nodes` - `semantic_blocks` 见 [document-processing-database-design.md](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/docs/architecture/document-processing-database-design.md:1)。 ### 3.3 存储分工一览 | 数据层 | 保存位置 | 是否直接用于 embedding | 是否最终进入 Milvus | 主要用途 | | --- | --- | --- | --- | --- | | 原始文件 | MinIO | 否 | 否 | 保留原始上传文档 | | `raw_layouts` | MinIO `layouts.json` | 否 | 否 | 保留 parser 原始返回 | | `structure_nodes` | MinIO + PostgreSQL | 否 | 否 | 目录树、层级结构 | | `semantic_blocks` | MinIO + PostgreSQL | 否 | 间接 | 语义单元、中间层 | | `vector_chunks` | MinIO | 是 | 间接 | embedding 前的检索块 | | `Chunk` | 内存态 + Milvus | 是 | 是 | 统一向量入库模型 | | `documents` 元数据 | PostgreSQL | 否 | 否 | 处理状态、统计、索引信息 | ## 4. 哪一步才真正“变成向量” 这是整个流程最关键的点。 结论先说清楚: - parse 不做向量化 - 保存 artifacts 不做向量化 - `ChunkBuilder.build()` 也不做向量化 - 只有 `EmbeddingProvider.embed_texts()` 才真正调用 embedding 模型 - 只有 `VectorIndex.upsert()` 才真正把向量写入向量库 ### 4.1 `vector_chunks` 先被映射成统一 `Chunk` `AliyunVectorChunkBuilder` 并不做 embedding,它只负责把 `ParsedDocument.vector_chunks` 转成领域层统一 `Chunk` 模型。 代码在 [vector_chunk_builder.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/vector_chunk_builder.py:12): ```python def build( self, *, parsed_document: ParsedDocument, regulation_type: str, version: str, ) -> list[Chunk]: chunks: list[Chunk] = [] for index, item in enumerate(parsed_document.vector_chunks): content = item.get("content") or item.get("text") or "" embedding_text = item.get("embedding_text") or content if not embedding_text.strip(): continue section_path = item.get("section_path") or [] section_title = item.get("section_title") or (section_path[-1] if section_path else "") page_number = item.get("page_start") or item.get("page") or 0 chunk_id = item.get("chunk_id") or f"{parsed_document.doc_id}-chunk-{index}" metadata = {k: v for k, v in item.items() if k not in {"content", "embedding_text"}} chunks.append( Chunk( chunk_id=str(chunk_id), doc_id=parsed_document.doc_id, doc_name=parsed_document.doc_name, content=content, embedding_text=embedding_text, section_title=section_title, section_path=section_path, page_number=int(page_number or 0), regulation_type=regulation_type, version=version, semantic_id=item.get("semantic_id", ""), block_type=item.get("block_type", ""), metadata=metadata, ) ) return chunks ``` `Chunk` 的定义在 [models.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/domain/documents/models.py:63): ```python @dataclass class Chunk: chunk_id: str doc_id: str doc_name: str content: str embedding_text: str section_title: str = "" section_path: list[str] = field(default_factory=list) page_number: int = 0 regulation_type: str = "" version: str = "" semantic_id: str = "" block_type: str = "" metadata: dict[str, Any] = field(default_factory=dict) ``` 一个 `Chunk` 的典型样子如下: ```json { "chunk_id": "doc-001-chunk-1", "doc_id": "doc-001", "doc_name": "动力电池安全规范", "content": "本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。", "embedding_text": "标准:动力电池安全规范\n章节:1 范围 > 1.1 适用对象\n\n本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。", "section_title": "1.1 适用对象", "section_path": ["1 范围", "1.1 适用对象"], "page_number": 2, "regulation_type": "GB", "version": "2025", "semantic_id": "semantic-1", "block_type": "section_text", "metadata": { "chunk_index": 1, "piece_index": 1, "source_ids": ["l-text-001", "l-text-002"] } } ``` 这里最关键的是要区分两个字段: - `content` - 用于检索命中后的展示内容 - 更接近用户最终看到的正文片段 - `embedding_text` - 用于送给 embedding 模型 - 比 `content` 多了“标准名 + 章节路径”的上下文 所以“向量化输入”不是纯正文,而是增强后的上下文文本。 ### 4.2 真正调用 embedding API 的地方 真正把文本变成向量的是 `OpenAICompatibleEmbeddingProvider.embed_texts()`。 代码在 [openai_compatible_embedding_provider.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/embedding/openai_compatible_embedding_provider.py:64): ```python def embed_texts(self, texts: list[str]) -> list[list[float]]: if not texts: return [] ``` 也就是说,只有在这一步: - 输入:`list[str]` 的 `embedding_text` - 输出:`list[list[float]]` 的 dense vectors 前面的 parse、normalizer、chunk builder 都只是准备文本,没有任何向量值产生。 ### 4.3 真正把向量写进 Milvus 的地方 向量值生成之后,`MilvusVectorIndex.upsert()` 才会把 `Chunk + vector` 写入向量库。 代码在 [milvus_vector_index.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/vectorstore/milvus_vector_index.py:69): ```python def upsert(self, chunks: list[Chunk], vectors: list[list[float]]) -> int: if len(chunks) != len(vectors): raise ValueError("chunks 与 vectors 数量不一致") data = [] now = int(time.time()) for chunk, vector in zip(chunks, vectors): data.append( { "id": chunk.chunk_id, "doc_id": chunk.doc_id, "doc_name": chunk.doc_name, "content": chunk.content[:65535], "embedding": vector, "section_title": chunk.section_title[:512], "section_path": json.dumps(chunk.section_path, ensure_ascii=False)[:4096], "page_number": chunk.page_number, "regulation_type": chunk.regulation_type[:128], "version": chunk.version[:64], "semantic_id": chunk.semantic_id[:128], "block_type": chunk.block_type[:64], "metadata_json": json.dumps(chunk.metadata, ensure_ascii=False)[:65535], "created_at": now, } ) self.collection.insert(data) self.collection.flush() return len(data) ``` 也就是说,Milvus 最终存进去的是: - 主键:`chunk_id` - 文档维度字段:`doc_id`、`doc_name` - 检索展示字段:`content` - 向量字段:`embedding` - 过滤/回溯字段:`section_title`、`section_path`、`page_number`、`regulation_type`、`version`、`semantic_id`、`block_type` - 附加元数据:`metadata_json` ## 5. Milvus 里最后到底存的是什么 ### 5.1 Collection schema 当前 `MilvusVectorIndex` 初始化 collection 时定义的 schema 在 [milvus_vector_index.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/vectorstore/milvus_vector_index.py:37): ```python schema = CollectionSchema( fields=[ FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=128, is_primary=True, auto_id=False), FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="doc_name", dtype=DataType.VARCHAR, max_length=256), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=settings.embedding_dim), FieldSchema(name="section_title", dtype=DataType.VARCHAR, max_length=512), FieldSchema(name="section_path", dtype=DataType.VARCHAR, max_length=4096), FieldSchema(name="page_number", dtype=DataType.INT64), FieldSchema(name="regulation_type", dtype=DataType.VARCHAR, max_length=128), FieldSchema(name="version", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="semantic_id", dtype=DataType.VARCHAR, max_length=128), FieldSchema(name="block_type", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="metadata_json", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="created_at", dtype=DataType.INT64), ], description="Dense-only regulations index", enable_dynamic_field=False, ) ``` 这说明 Milvus 存的不是“只有 embedding 的极简向量表”,而是: - 一个 dense vector - 一组检索时要返回或过滤的结构化字段 但要注意:这并不意味着 Milvus 是业务主记录库。它仍然主要服务于检索,而不是替代 PostgreSQL 的文档管理职责。 ### 5.2 `list_documents()` 为什么会先看 Milvus 文档列表查询在 [services.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/application/documents/services.py:271) 中实现,它会: 1. 从 Milvus 查询当前真的存在向量的文档 2. 从文档元数据仓储加载文档记录 3. 以 Milvus 为索引状态真相源进行 merge 原因不是“Milvus 替代 PostgreSQL”,而是: - `indexed` 这个状态最终是否真实成立,要看 Milvus 里有没有对应 chunk - 但下载、删除、重试、文件定位、错误信息仍然要依赖文档元数据仓储 所以: - Milvus 是“索引真相源” - PostgreSQL/JSON 是“文档元数据真相源” 这两者职责不同,不能互相替代。