Files
AIRegulation-DocAnalysis/docs/architecture/document-core-processing-flow.md

624 lines
23 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 核心文档处理主链路说明
本文件说明当前默认生产链路中的核心文档处理流程,也就是:
- `AliyunDocumentParser`
- `AliyunVectorChunkBuilder`
- `OpenAICompatibleEmbeddingProvider`
- `MilvusVectorIndex`
目标是回答四个核心问题:
1. `ParsedDocument` 为什么是多层结构
2. 这些结构分别保存到哪里
3. 哪一步才真正做了向量化
4. Milvus 里最后到底存的是什么
数据库表设计、关系模型、DDL 和 PostgreSQL 职责边界已经单独整理到 [document-processing-database-design.md](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/docs/architecture/document-processing-database-design.md:1)。本文件保留流程视角,只在必要处给出与存储分工相关的摘要,不再作为数据库设计 authority。
## 1. 主链路总览
当前默认实现由 `DocumentCommandService.upload_and_process()` 统一编排。它不是“parse 完直接进向量库”,而是先生成结构化解析产物,再把其中适合检索的一层送去 embedding 和 Milvus。
```mermaid
sequenceDiagram
participant API as API / Service
participant MinIO as BinaryStore
participant Parser as AliyunDocumentParser
participant PG as DocumentRepository / ParseArtifactStore
participant Embed as EmbeddingProvider
participant Milvus as VectorIndex
API->>MinIO: 保存原始文件
API->>Parser: parse(file_path, doc_id, doc_name)
Parser-->>API: ParsedDocument
API->>MinIO: 保存 layouts/structure_nodes/semantic_blocks/vector_chunks JSON
API->>PG: 更新 documents.status=parsed
API->>PG: 保存 structure_nodes / semantic_blocks
API->>API: chunk_builder.build(parsed_document)
API->>Embed: embed_texts([chunk.embedding_text])
Embed-->>API: vectors
API->>Milvus: upsert(chunks, vectors)
API->>PG: 更新 documents.status=indexed
```
主链路编排代码在 [services.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/application/documents/services.py:83)
```python
def upload_and_process(
self,
*,
doc_id: str | None = None,
file_name: str,
content: bytes,
content_type: str,
doc_name: str | None,
regulation_type: str,
version: str,
generate_summary: bool,
) -> DocumentProcessResult:
doc_id = doc_id or str(uuid.uuid4())[:8]
final_doc_name = doc_name or file_name
object_name = f"{doc_id}/{file_name}"
self.document_repository.create(document)
self.binary_store.save(object_name=object_name, data=content, content_type=content_type, metadata={"doc_id": doc_id})
self.document_repository.update_status(doc_id, DocumentStatus.STORED)
parsed_document = self.parser.parse(file_path=temp_path, doc_id=doc_id, doc_name=final_doc_name)
artifact_keys = self._save_parse_artifacts(doc_id=doc_id, parsed_document=parsed_document)
self.document_repository.update_status(doc_id, DocumentStatus.PARSED, parser_name=parsed_document.parser_name, metadata={...})
if self.parse_artifact_store:
self.parse_artifact_store.save(doc_id, parsed_document.structure_nodes, parsed_document.semantic_blocks)
chunks = self.chunk_builder.build(parsed_document=parsed_document, regulation_type=regulation_type, version=version)
vectors = self.embedding_provider.embed_texts([chunk.embedding_text for chunk in chunks])
inserted = self.vector_index.upsert(chunks, vectors)
self.document_repository.update_status(doc_id, DocumentStatus.INDEXED, chunk_count=len(chunks), index_name=health.get("collection_name", ""), metadata={...})
```
默认绑定关系在 [bootstrap.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/shared/bootstrap.py:157)
```python
def get_parser():
if settings.parser_backend == "aliyun":
return AliyunDocumentParser()
return LocalDocumentParser()
def get_chunk_builder():
if settings.chunk_backend == "aliyun":
return AliyunVectorChunkBuilder()
return LocalRegulationChunkBuilder(...)
def get_embedding_provider() -> OpenAICompatibleEmbeddingProvider:
return OpenAICompatibleEmbeddingProvider()
def get_vector_index() -> VectorIndex:
return LazyVectorIndex(_build_vector_index)
```
也就是说,当前默认主链路是:
- parser: `AliyunDocumentParser`
- chunk builder: `AliyunVectorChunkBuilder`
- embedding provider: `OpenAICompatibleEmbeddingProvider`
- vector index: `MilvusVectorIndex`
## 2. `ParsedDocument` 为什么是三层结构
`ParsedDocument` 不是最终入库格式,而是 parser 输出给后续处理步骤的统一中间结构。它把“结构理解”和“向量检索准备”拆成了三层。
定义在 [models.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/domain/documents/models.py:49)
```python
@dataclass
class ParsedDocument:
doc_id: str
doc_name: str
structure_nodes: list[dict[str, Any]]
semantic_blocks: list[dict[str, Any]]
vector_chunks: list[dict[str, Any]]
parser_name: str
raw_text: str = ""
raw_layouts: list[dict[str, Any]] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
```
这三层的职责不同:
- `structure_nodes`
- 标题层级骨架
- 描述“文档有哪些章、节、条”
- 用于保留结构,不直接做 embedding
- `semantic_blocks`
- 语义块层
- 把正文、表格、图片说明整理成连续的语义单元
- 是从原始 layout 到检索 chunk 之间的中间层
- `vector_chunks`
- 检索和向量化层
- 已经是适合送给 embedding 模型的 chunk 视图
- 后续 `ChunkBuilder` 基本就是把这层映射成统一 `Chunk`
### 2.1 这三层是怎么从 parser 结果生成的
`AliyunDocumentParser.parse()` 先通过网关拿到阿里云返回的 `layouts`,再把 `layouts` 转成三层结构。
代码在 [aliyun_document_parser.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_document_parser.py:28)
```python
def parse(self, *, file_path: str, doc_id: str, doc_name: str) -> ParsedDocument:
payload = self.gateway.parse_document(file_path=file_path)
layouts = payload.layouts
structure_nodes = build_structure_nodes(layouts)
semantic_blocks = build_semantic_blocks(layouts)
vector_chunks = build_vector_chunks(
semantic_blocks,
doc_id=doc_id,
doc_title=doc_name,
max_chars=MAX_CHARS,
overlap_chars=OVERLAP_CHARS,
)
raw_text = "\n\n".join(
block.get("text", "")
for block in semantic_blocks
if block.get("text")
)
return ParsedDocument(
doc_id=doc_id,
doc_name=doc_name,
structure_nodes=structure_nodes,
semantic_blocks=semantic_blocks,
vector_chunks=vector_chunks,
parser_name=self.parser_name,
raw_text=raw_text,
raw_layouts=layouts,
metadata={...},
)
```
也就是说:
- parser 原始输出是 `layouts`
- 当前系统真正消费的是 `ParsedDocument`
- `ParsedDocument` 是由 normalizer 从 `layouts` 规整出来的
### 2.2 第一层:`structure_nodes`
这一层只保留标题和层级。
代码在 [aliyun_layout_normalizer.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_layout_normalizer.py:85)
```python
def build_structure_nodes(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]:
nodes: list[dict[str, Any]] = []
for layout in layouts:
if not is_title(layout):
continue
text = get_text(layout)
if not text or text in TOC_TITLES:
continue
nodes.append(
{
"unique_id": layout.get("uniqueId"),
"page": get_page(layout),
"index": layout.get("index", 0),
"level": layout.get("level", 0),
"title": text,
"type": layout.get("type"),
"sub_type": layout.get("subType"),
}
)
return nodes
```
示例:
```json
[
{
"unique_id": "l-title-001",
"page": 2,
"index": 11,
"level": 1,
"title": "1 范围",
"type": "title",
"sub_type": "para_title"
},
{
"unique_id": "l-title-002",
"page": 3,
"index": 18,
"level": 2,
"title": "1.1 适用对象",
"type": "title",
"sub_type": "para_title"
}
]
```
这层的意义是“保留目录树”,不是直接拿来检索。
### 2.3 第二层:`semantic_blocks`
这一层会把连续正文合并成一个语义块,也会单独处理表格和图片说明。
代码在 [aliyun_layout_normalizer.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_layout_normalizer.py:163)
```python
def build_semantic_blocks(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]:
semantic_blocks: list[dict[str, Any]] = []
section_stack: list[dict[str, Any]] = []
pending_text_blocks: list[dict[str, Any]] = []
block_id = 1
for layout in layouts:
text = get_text(layout)
page = get_page(layout)
if is_title(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
section_stack = update_section_path(section_stack, layout)
continue
section_path = section_path_titles(section_stack)
section_title = section_path[-1] if section_path else "未分类"
section_level = len(section_path)
if is_table(layout):
...
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "table",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": table_text,
}
)
continue
if is_text(layout) and text:
pending_text_blocks.append(
{
"page": page,
"text": text,
"unique_id": layout.get("uniqueId"),
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
}
)
```
正文合并后会形成类似这样的语义块:
```json
[
{
"semantic_id": "semantic-1",
"block_type": "section_text",
"page_start": 2,
"page_end": 2,
"section_path": ["1 范围", "1.1 适用对象"],
"section_level": 2,
"section_title": "1.1 适用对象",
"source_ids": ["l-text-001", "l-text-002"],
"text": "本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。"
}
]
```
## 3. 这些结构分别保存到哪里
### 3.1 原始文件和中间 artifacts 先落 MinIO
当前链路在上传阶段会先把原始文件保存到对象存储;解析完成后,又会把结构化中间产物保存为 JSON。
保存 artifacts 的代码在 [services.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/application/documents/services.py:62)
```python
def _save_parse_artifacts(self, *, doc_id: str, parsed_document: ParsedDocument) -> dict[str, str]:
prefix = f"{parsed_document.metadata.get('artifact_prefix', 'artifacts').strip('/')}/{doc_id}"
artifact_payloads = {
"layouts": parsed_document.raw_layouts,
"structure_nodes": parsed_document.structure_nodes,
"semantic_blocks": parsed_document.semantic_blocks,
"vector_chunks": parsed_document.vector_chunks,
}
artifact_keys: dict[str, str] = {}
for name, payload in artifact_payloads.items():
object_name = f"{prefix}/{name}.json"
self.binary_store.save(
object_name=object_name,
data=json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8"),
content_type="application/json",
metadata={"doc_id": doc_id, "artifact_type": name},
)
artifact_keys[name] = object_name
return artifact_keys
```
`DocumentBinaryStore` 的当前默认实现是 `MinioDocumentBinaryStore`,也就是:
- 原始上传文件进 MinIO
- `layouts.json` 进 MinIO
- `structure_nodes.json` 进 MinIO
- `semantic_blocks.json` 进 MinIO
- `vector_chunks.json` 进 MinIO
### 3.2 PostgreSQL 在流程中的职责摘要
当前流程中PostgreSQL 承担的是“文档元数据 + 结构化快照”的职责,而不是向量或大对象存储:
- `documents` 保存当前文档主记录、状态、统计和索引信息
- `structure_nodes` 保存当前最新解析快照的目录结构
- `semantic_blocks` 保存当前最新解析快照的语义块结构
更完整的 PostgreSQL 设计,包括:
- `documents`
- `document_processing_runs`
- `document_status_history`
- `document_artifacts`
- `structure_nodes`
- `semantic_blocks`
见 [document-processing-database-design.md](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/docs/architecture/document-processing-database-design.md:1)。
### 3.3 存储分工一览
| 数据层 | 保存位置 | 是否直接用于 embedding | 是否最终进入 Milvus | 主要用途 |
| --- | --- | --- | --- | --- |
| 原始文件 | MinIO | 否 | 否 | 保留原始上传文档 |
| `raw_layouts` | MinIO `layouts.json` | 否 | 否 | 保留 parser 原始返回 |
| `structure_nodes` | MinIO + PostgreSQL | 否 | 否 | 目录树、层级结构 |
| `semantic_blocks` | MinIO + PostgreSQL | 否 | 间接 | 语义单元、中间层 |
| `vector_chunks` | MinIO | 是 | 间接 | embedding 前的检索块 |
| `Chunk` | 内存态 + Milvus | 是 | 是 | 统一向量入库模型 |
| `documents` 元数据 | PostgreSQL | 否 | 否 | 处理状态、统计、索引信息 |
## 4. 哪一步才真正“变成向量”
这是整个流程最关键的点。
结论先说清楚:
- parse 不做向量化
- 保存 artifacts 不做向量化
- `ChunkBuilder.build()` 也不做向量化
- 只有 `EmbeddingProvider.embed_texts()` 才真正调用 embedding 模型
- 只有 `VectorIndex.upsert()` 才真正把向量写入向量库
### 4.1 `vector_chunks` 先被映射成统一 `Chunk`
`AliyunVectorChunkBuilder` 并不做 embedding它只负责把 `ParsedDocument.vector_chunks` 转成领域层统一 `Chunk` 模型。
代码在 [vector_chunk_builder.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/parser/vector_chunk_builder.py:12)
```python
def build(
self,
*,
parsed_document: ParsedDocument,
regulation_type: str,
version: str,
) -> list[Chunk]:
chunks: list[Chunk] = []
for index, item in enumerate(parsed_document.vector_chunks):
content = item.get("content") or item.get("text") or ""
embedding_text = item.get("embedding_text") or content
if not embedding_text.strip():
continue
section_path = item.get("section_path") or []
section_title = item.get("section_title") or (section_path[-1] if section_path else "")
page_number = item.get("page_start") or item.get("page") or 0
chunk_id = item.get("chunk_id") or f"{parsed_document.doc_id}-chunk-{index}"
metadata = {k: v for k, v in item.items() if k not in {"content", "embedding_text"}}
chunks.append(
Chunk(
chunk_id=str(chunk_id),
doc_id=parsed_document.doc_id,
doc_name=parsed_document.doc_name,
content=content,
embedding_text=embedding_text,
section_title=section_title,
section_path=section_path,
page_number=int(page_number or 0),
regulation_type=regulation_type,
version=version,
semantic_id=item.get("semantic_id", ""),
block_type=item.get("block_type", ""),
metadata=metadata,
)
)
return chunks
```
`Chunk` 的定义在 [models.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/domain/documents/models.py:63)
```python
@dataclass
class Chunk:
chunk_id: str
doc_id: str
doc_name: str
content: str
embedding_text: str
section_title: str = ""
section_path: list[str] = field(default_factory=list)
page_number: int = 0
regulation_type: str = ""
version: str = ""
semantic_id: str = ""
block_type: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
```
一个 `Chunk` 的典型样子如下:
```json
{
"chunk_id": "doc-001-chunk-1",
"doc_id": "doc-001",
"doc_name": "动力电池安全规范",
"content": "本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。",
"embedding_text": "标准:动力电池安全规范\n章节1 范围 > 1.1 适用对象\n\n本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。",
"section_title": "1.1 适用对象",
"section_path": ["1 范围", "1.1 适用对象"],
"page_number": 2,
"regulation_type": "GB",
"version": "2025",
"semantic_id": "semantic-1",
"block_type": "section_text",
"metadata": {
"chunk_index": 1,
"piece_index": 1,
"source_ids": ["l-text-001", "l-text-002"]
}
}
```
这里最关键的是要区分两个字段:
- `content`
- 用于检索命中后的展示内容
- 更接近用户最终看到的正文片段
- `embedding_text`
- 用于送给 embedding 模型
-`content` 多了“标准名 + 章节路径”的上下文
所以“向量化输入”不是纯正文,而是增强后的上下文文本。
### 4.2 真正调用 embedding API 的地方
真正把文本变成向量的是 `OpenAICompatibleEmbeddingProvider.embed_texts()`
代码在 [openai_compatible_embedding_provider.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/embedding/openai_compatible_embedding_provider.py:64)
```python
def embed_texts(self, texts: list[str]) -> list[list[float]]:
if not texts:
return []
```
也就是说,只有在这一步:
- 输入:`list[str]``embedding_text`
- 输出:`list[list[float]]` 的 dense vectors
前面的 parse、normalizer、chunk builder 都只是准备文本,没有任何向量值产生。
### 4.3 真正把向量写进 Milvus 的地方
向量值生成之后,`MilvusVectorIndex.upsert()` 才会把 `Chunk + vector` 写入向量库。
代码在 [milvus_vector_index.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/vectorstore/milvus_vector_index.py:69)
```python
def upsert(self, chunks: list[Chunk], vectors: list[list[float]]) -> int:
if len(chunks) != len(vectors):
raise ValueError("chunks 与 vectors 数量不一致")
data = []
now = int(time.time())
for chunk, vector in zip(chunks, vectors):
data.append(
{
"id": chunk.chunk_id,
"doc_id": chunk.doc_id,
"doc_name": chunk.doc_name,
"content": chunk.content[:65535],
"embedding": vector,
"section_title": chunk.section_title[:512],
"section_path": json.dumps(chunk.section_path, ensure_ascii=False)[:4096],
"page_number": chunk.page_number,
"regulation_type": chunk.regulation_type[:128],
"version": chunk.version[:64],
"semantic_id": chunk.semantic_id[:128],
"block_type": chunk.block_type[:64],
"metadata_json": json.dumps(chunk.metadata, ensure_ascii=False)[:65535],
"created_at": now,
}
)
self.collection.insert(data)
self.collection.flush()
return len(data)
```
也就是说Milvus 最终存进去的是:
- 主键:`chunk_id`
- 文档维度字段:`doc_id``doc_name`
- 检索展示字段:`content`
- 向量字段:`embedding`
- 过滤/回溯字段:`section_title``section_path``page_number``regulation_type``version``semantic_id``block_type`
- 附加元数据:`metadata_json`
## 5. Milvus 里最后到底存的是什么
### 5.1 Collection schema
当前 `MilvusVectorIndex` 初始化 collection 时定义的 schema 在 [milvus_vector_index.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/infrastructure/vectorstore/milvus_vector_index.py:37)
```python
schema = CollectionSchema(
fields=[
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=128, is_primary=True, auto_id=False),
FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="doc_name", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=settings.embedding_dim),
FieldSchema(name="section_title", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="section_path", dtype=DataType.VARCHAR, max_length=4096),
FieldSchema(name="page_number", dtype=DataType.INT64),
FieldSchema(name="regulation_type", dtype=DataType.VARCHAR, max_length=128),
FieldSchema(name="version", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="semantic_id", dtype=DataType.VARCHAR, max_length=128),
FieldSchema(name="block_type", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="metadata_json", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="created_at", dtype=DataType.INT64),
],
description="Dense-only regulations index",
enable_dynamic_field=False,
)
```
这说明 Milvus 存的不是“只有 embedding 的极简向量表”,而是:
- 一个 dense vector
- 一组检索时要返回或过滤的结构化字段
但要注意:这并不意味着 Milvus 是业务主记录库。它仍然主要服务于检索,而不是替代 PostgreSQL 的文档管理职责。
### 5.2 `list_documents()` 为什么会先看 Milvus
文档列表查询在 [services.py](/abs/path/C:/Users/A200477427/Developers/AIRegulation/AIRegulation-DocAnalysis/backend/app/application/documents/services.py:271) 中实现,它会:
1. 从 Milvus 查询当前真的存在向量的文档
2. 从文档元数据仓储加载文档记录
3. 以 Milvus 为索引状态真相源进行 merge
原因不是“Milvus 替代 PostgreSQL”而是
- `indexed` 这个状态最终是否真实成立,要看 Milvus 里有没有对应 chunk
- 但下载、删除、重试、文件定位、错误信息仍然要依赖文档元数据仓储
所以:
- Milvus 是“索引真相源”
- PostgreSQL/JSON 是“文档元数据真相源”
这两者职责不同,不能互相替代。