Files
AIRegulation-DocAnalysis/docs/architecture/document-core-processing-flow.md

23 KiB
Raw Blame History

核心文档处理主链路说明

本文件说明当前默认生产链路中的核心文档处理流程,也就是:

  • AliyunDocumentParser
  • AliyunVectorChunkBuilder
  • OpenAICompatibleEmbeddingProvider
  • MilvusVectorIndex

目标是回答四个核心问题:

  1. ParsedDocument 为什么是多层结构
  2. 这些结构分别保存到哪里
  3. 哪一步才真正做了向量化
  4. Milvus 里最后到底存的是什么

数据库表设计、关系模型、DDL 和 PostgreSQL 职责边界已经单独整理到 document-processing-database-design.md。本文件保留流程视角,只在必要处给出与存储分工相关的摘要,不再作为数据库设计 authority。

1. 主链路总览

当前默认实现由 DocumentCommandService.upload_and_process() 统一编排。它不是“parse 完直接进向量库”,而是先生成结构化解析产物,再把其中适合检索的一层送去 embedding 和 Milvus。

sequenceDiagram
    participant API as API / Service
    participant MinIO as BinaryStore
    participant Parser as AliyunDocumentParser
    participant PG as DocumentRepository / ParseArtifactStore
    participant Embed as EmbeddingProvider
    participant Milvus as VectorIndex

    API->>MinIO: 保存原始文件
    API->>Parser: parse(file_path, doc_id, doc_name)
    Parser-->>API: ParsedDocument
    API->>MinIO: 保存 layouts/structure_nodes/semantic_blocks/vector_chunks JSON
    API->>PG: 更新 documents.status=parsed
    API->>PG: 保存 structure_nodes / semantic_blocks
    API->>API: chunk_builder.build(parsed_document)
    API->>Embed: embed_texts([chunk.embedding_text])
    Embed-->>API: vectors
    API->>Milvus: upsert(chunks, vectors)
    API->>PG: 更新 documents.status=indexed

主链路编排代码在 services.py

def upload_and_process(
    self,
    *,
    doc_id: str | None = None,
    file_name: str,
    content: bytes,
    content_type: str,
    doc_name: str | None,
    regulation_type: str,
    version: str,
    generate_summary: bool,
) -> DocumentProcessResult:
    doc_id = doc_id or str(uuid.uuid4())[:8]
    final_doc_name = doc_name or file_name
    object_name = f"{doc_id}/{file_name}"

    self.document_repository.create(document)
    self.binary_store.save(object_name=object_name, data=content, content_type=content_type, metadata={"doc_id": doc_id})
    self.document_repository.update_status(doc_id, DocumentStatus.STORED)

    parsed_document = self.parser.parse(file_path=temp_path, doc_id=doc_id, doc_name=final_doc_name)
    artifact_keys = self._save_parse_artifacts(doc_id=doc_id, parsed_document=parsed_document)
    self.document_repository.update_status(doc_id, DocumentStatus.PARSED, parser_name=parsed_document.parser_name, metadata={...})

    if self.parse_artifact_store:
        self.parse_artifact_store.save(doc_id, parsed_document.structure_nodes, parsed_document.semantic_blocks)

    chunks = self.chunk_builder.build(parsed_document=parsed_document, regulation_type=regulation_type, version=version)
    vectors = self.embedding_provider.embed_texts([chunk.embedding_text for chunk in chunks])
    inserted = self.vector_index.upsert(chunks, vectors)

    self.document_repository.update_status(doc_id, DocumentStatus.INDEXED, chunk_count=len(chunks), index_name=health.get("collection_name", ""), metadata={...})

默认绑定关系在 bootstrap.py

def get_parser():
    if settings.parser_backend == "aliyun":
        return AliyunDocumentParser()
    return LocalDocumentParser()


def get_chunk_builder():
    if settings.chunk_backend == "aliyun":
        return AliyunVectorChunkBuilder()
    return LocalRegulationChunkBuilder(...)


def get_embedding_provider() -> OpenAICompatibleEmbeddingProvider:
    return OpenAICompatibleEmbeddingProvider()


def get_vector_index() -> VectorIndex:
    return LazyVectorIndex(_build_vector_index)

也就是说,当前默认主链路是:

  • parser: AliyunDocumentParser
  • chunk builder: AliyunVectorChunkBuilder
  • embedding provider: OpenAICompatibleEmbeddingProvider
  • vector index: MilvusVectorIndex

2. ParsedDocument 为什么是三层结构

ParsedDocument 不是最终入库格式,而是 parser 输出给后续处理步骤的统一中间结构。它把“结构理解”和“向量检索准备”拆成了三层。

定义在 models.py

@dataclass
class ParsedDocument:
    doc_id: str
    doc_name: str
    structure_nodes: list[dict[str, Any]]
    semantic_blocks: list[dict[str, Any]]
    vector_chunks: list[dict[str, Any]]
    parser_name: str
    raw_text: str = ""
    raw_layouts: list[dict[str, Any]] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

这三层的职责不同:

  • structure_nodes

    • 标题层级骨架
    • 描述“文档有哪些章、节、条”
    • 用于保留结构,不直接做 embedding
  • semantic_blocks

    • 语义块层
    • 把正文、表格、图片说明整理成连续的语义单元
    • 是从原始 layout 到检索 chunk 之间的中间层
  • vector_chunks

    • 检索和向量化层
    • 已经是适合送给 embedding 模型的 chunk 视图
    • 后续 ChunkBuilder 基本就是把这层映射成统一 Chunk

2.1 这三层是怎么从 parser 结果生成的

AliyunDocumentParser.parse() 先通过网关拿到阿里云返回的 layouts,再把 layouts 转成三层结构。

代码在 aliyun_document_parser.py

def parse(self, *, file_path: str, doc_id: str, doc_name: str) -> ParsedDocument:
    payload = self.gateway.parse_document(file_path=file_path)
    layouts = payload.layouts
    structure_nodes = build_structure_nodes(layouts)
    semantic_blocks = build_semantic_blocks(layouts)
    vector_chunks = build_vector_chunks(
        semantic_blocks,
        doc_id=doc_id,
        doc_title=doc_name,
        max_chars=MAX_CHARS,
        overlap_chars=OVERLAP_CHARS,
    )
    raw_text = "\n\n".join(
        block.get("text", "")
        for block in semantic_blocks
        if block.get("text")
    )
    return ParsedDocument(
        doc_id=doc_id,
        doc_name=doc_name,
        structure_nodes=structure_nodes,
        semantic_blocks=semantic_blocks,
        vector_chunks=vector_chunks,
        parser_name=self.parser_name,
        raw_text=raw_text,
        raw_layouts=layouts,
        metadata={...},
    )

也就是说:

  • parser 原始输出是 layouts
  • 当前系统真正消费的是 ParsedDocument
  • ParsedDocument 是由 normalizer 从 layouts 规整出来的

2.2 第一层:structure_nodes

这一层只保留标题和层级。

代码在 aliyun_layout_normalizer.py

def build_structure_nodes(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]:
    nodes: list[dict[str, Any]] = []
    for layout in layouts:
        if not is_title(layout):
            continue
        text = get_text(layout)
        if not text or text in TOC_TITLES:
            continue
        nodes.append(
            {
                "unique_id": layout.get("uniqueId"),
                "page": get_page(layout),
                "index": layout.get("index", 0),
                "level": layout.get("level", 0),
                "title": text,
                "type": layout.get("type"),
                "sub_type": layout.get("subType"),
            }
        )
    return nodes

示例:

[
  {
    "unique_id": "l-title-001",
    "page": 2,
    "index": 11,
    "level": 1,
    "title": "1 范围",
    "type": "title",
    "sub_type": "para_title"
  },
  {
    "unique_id": "l-title-002",
    "page": 3,
    "index": 18,
    "level": 2,
    "title": "1.1 适用对象",
    "type": "title",
    "sub_type": "para_title"
  }
]

这层的意义是“保留目录树”,不是直接拿来检索。

2.3 第二层:semantic_blocks

这一层会把连续正文合并成一个语义块,也会单独处理表格和图片说明。

代码在 aliyun_layout_normalizer.py

def build_semantic_blocks(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]:
    semantic_blocks: list[dict[str, Any]] = []
    section_stack: list[dict[str, Any]] = []
    pending_text_blocks: list[dict[str, Any]] = []
    block_id = 1

    for layout in layouts:
        text = get_text(layout)
        page = get_page(layout)

        if is_title(layout):
            block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
            pending_text_blocks = []
            section_stack = update_section_path(section_stack, layout)
            continue

        section_path = section_path_titles(section_stack)
        section_title = section_path[-1] if section_path else "未分类"
        section_level = len(section_path)

        if is_table(layout):
            ...
            semantic_blocks.append(
                {
                    "semantic_id": f"semantic-{block_id}",
                    "block_type": "table",
                    "page_start": page,
                    "page_end": page,
                    "section_path": section_path,
                    "section_level": section_level,
                    "section_title": section_title,
                    "source_ids": [layout.get("uniqueId")],
                    "text": table_text,
                }
            )
            continue

        if is_text(layout) and text:
            pending_text_blocks.append(
                {
                    "page": page,
                    "text": text,
                    "unique_id": layout.get("uniqueId"),
                    "section_path": section_path,
                    "section_level": section_level,
                    "section_title": section_title,
                }
            )

正文合并后会形成类似这样的语义块:

[
  {
    "semantic_id": "semantic-1",
    "block_type": "section_text",
    "page_start": 2,
    "page_end": 2,
    "section_path": ["1 范围", "1.1 适用对象"],
    "section_level": 2,
    "section_title": "1.1 适用对象",
    "source_ids": ["l-text-001", "l-text-002"],
    "text": "本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。"
  }
]

3. 这些结构分别保存到哪里

3.1 原始文件和中间 artifacts 先落 MinIO

当前链路在上传阶段会先把原始文件保存到对象存储;解析完成后,又会把结构化中间产物保存为 JSON。

保存 artifacts 的代码在 services.py

def _save_parse_artifacts(self, *, doc_id: str, parsed_document: ParsedDocument) -> dict[str, str]:
    prefix = f"{parsed_document.metadata.get('artifact_prefix', 'artifacts').strip('/')}/{doc_id}"
    artifact_payloads = {
        "layouts": parsed_document.raw_layouts,
        "structure_nodes": parsed_document.structure_nodes,
        "semantic_blocks": parsed_document.semantic_blocks,
        "vector_chunks": parsed_document.vector_chunks,
    }
    artifact_keys: dict[str, str] = {}
    for name, payload in artifact_payloads.items():
        object_name = f"{prefix}/{name}.json"
        self.binary_store.save(
            object_name=object_name,
            data=json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8"),
            content_type="application/json",
            metadata={"doc_id": doc_id, "artifact_type": name},
        )
        artifact_keys[name] = object_name
    return artifact_keys

DocumentBinaryStore 的当前默认实现是 MinioDocumentBinaryStore,也就是:

  • 原始上传文件进 MinIO
  • layouts.json 进 MinIO
  • structure_nodes.json 进 MinIO
  • semantic_blocks.json 进 MinIO
  • vector_chunks.json 进 MinIO

3.2 PostgreSQL 在流程中的职责摘要

当前流程中PostgreSQL 承担的是“文档元数据 + 结构化快照”的职责,而不是向量或大对象存储:

  • documents 保存当前文档主记录、状态、统计和索引信息
  • structure_nodes 保存当前最新解析快照的目录结构
  • semantic_blocks 保存当前最新解析快照的语义块结构

更完整的 PostgreSQL 设计,包括:

  • documents
  • document_processing_runs
  • document_status_history
  • document_artifacts
  • structure_nodes
  • semantic_blocks

document-processing-database-design.md

3.3 存储分工一览

数据层 保存位置 是否直接用于 embedding 是否最终进入 Milvus 主要用途
原始文件 MinIO 保留原始上传文档
raw_layouts MinIO layouts.json 保留 parser 原始返回
structure_nodes MinIO + PostgreSQL 目录树、层级结构
semantic_blocks MinIO + PostgreSQL 间接 语义单元、中间层
vector_chunks MinIO 间接 embedding 前的检索块
Chunk 内存态 + Milvus 统一向量入库模型
documents 元数据 PostgreSQL 处理状态、统计、索引信息

4. 哪一步才真正“变成向量”

这是整个流程最关键的点。

结论先说清楚:

  • parse 不做向量化
  • 保存 artifacts 不做向量化
  • ChunkBuilder.build() 也不做向量化
  • 只有 EmbeddingProvider.embed_texts() 才真正调用 embedding 模型
  • 只有 VectorIndex.upsert() 才真正把向量写入向量库

4.1 vector_chunks 先被映射成统一 Chunk

AliyunVectorChunkBuilder 并不做 embedding它只负责把 ParsedDocument.vector_chunks 转成领域层统一 Chunk 模型。

代码在 vector_chunk_builder.py

def build(
    self,
    *,
    parsed_document: ParsedDocument,
    regulation_type: str,
    version: str,
) -> list[Chunk]:
    chunks: list[Chunk] = []
    for index, item in enumerate(parsed_document.vector_chunks):
        content = item.get("content") or item.get("text") or ""
        embedding_text = item.get("embedding_text") or content
        if not embedding_text.strip():
            continue
        section_path = item.get("section_path") or []
        section_title = item.get("section_title") or (section_path[-1] if section_path else "")
        page_number = item.get("page_start") or item.get("page") or 0
        chunk_id = item.get("chunk_id") or f"{parsed_document.doc_id}-chunk-{index}"
        metadata = {k: v for k, v in item.items() if k not in {"content", "embedding_text"}}
        chunks.append(
            Chunk(
                chunk_id=str(chunk_id),
                doc_id=parsed_document.doc_id,
                doc_name=parsed_document.doc_name,
                content=content,
                embedding_text=embedding_text,
                section_title=section_title,
                section_path=section_path,
                page_number=int(page_number or 0),
                regulation_type=regulation_type,
                version=version,
                semantic_id=item.get("semantic_id", ""),
                block_type=item.get("block_type", ""),
                metadata=metadata,
            )
        )
    return chunks

Chunk 的定义在 models.py

@dataclass
class Chunk:
    chunk_id: str
    doc_id: str
    doc_name: str
    content: str
    embedding_text: str
    section_title: str = ""
    section_path: list[str] = field(default_factory=list)
    page_number: int = 0
    regulation_type: str = ""
    version: str = ""
    semantic_id: str = ""
    block_type: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)

一个 Chunk 的典型样子如下:

{
  "chunk_id": "doc-001-chunk-1",
  "doc_id": "doc-001",
  "doc_name": "动力电池安全规范",
  "content": "本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。",
  "embedding_text": "标准:动力电池安全规范\n章节1 范围 > 1.1 适用对象\n\n本标准适用于道路车辆动力电池系统的安全要求。企业应建立一致的测试和验证方法。",
  "section_title": "1.1 适用对象",
  "section_path": ["1 范围", "1.1 适用对象"],
  "page_number": 2,
  "regulation_type": "GB",
  "version": "2025",
  "semantic_id": "semantic-1",
  "block_type": "section_text",
  "metadata": {
    "chunk_index": 1,
    "piece_index": 1,
    "source_ids": ["l-text-001", "l-text-002"]
  }
}

这里最关键的是要区分两个字段:

  • content

    • 用于检索命中后的展示内容
    • 更接近用户最终看到的正文片段
  • embedding_text

    • 用于送给 embedding 模型
    • content 多了“标准名 + 章节路径”的上下文

所以“向量化输入”不是纯正文,而是增强后的上下文文本。

4.2 真正调用 embedding API 的地方

真正把文本变成向量的是 OpenAICompatibleEmbeddingProvider.embed_texts()

代码在 openai_compatible_embedding_provider.py

def embed_texts(self, texts: list[str]) -> list[list[float]]:
    if not texts:
        return []

也就是说,只有在这一步:

  • 输入:list[str]embedding_text
  • 输出:list[list[float]] 的 dense vectors

前面的 parse、normalizer、chunk builder 都只是准备文本,没有任何向量值产生。

4.3 真正把向量写进 Milvus 的地方

向量值生成之后,MilvusVectorIndex.upsert() 才会把 Chunk + vector 写入向量库。

代码在 milvus_vector_index.py

def upsert(self, chunks: list[Chunk], vectors: list[list[float]]) -> int:
    if len(chunks) != len(vectors):
        raise ValueError("chunks 与 vectors 数量不一致")
    data = []
    now = int(time.time())
    for chunk, vector in zip(chunks, vectors):
        data.append(
            {
                "id": chunk.chunk_id,
                "doc_id": chunk.doc_id,
                "doc_name": chunk.doc_name,
                "content": chunk.content[:65535],
                "embedding": vector,
                "section_title": chunk.section_title[:512],
                "section_path": json.dumps(chunk.section_path, ensure_ascii=False)[:4096],
                "page_number": chunk.page_number,
                "regulation_type": chunk.regulation_type[:128],
                "version": chunk.version[:64],
                "semantic_id": chunk.semantic_id[:128],
                "block_type": chunk.block_type[:64],
                "metadata_json": json.dumps(chunk.metadata, ensure_ascii=False)[:65535],
                "created_at": now,
            }
        )
    self.collection.insert(data)
    self.collection.flush()
    return len(data)

也就是说Milvus 最终存进去的是:

  • 主键:chunk_id
  • 文档维度字段:doc_iddoc_name
  • 检索展示字段:content
  • 向量字段:embedding
  • 过滤/回溯字段:section_titlesection_pathpage_numberregulation_typeversionsemantic_idblock_type
  • 附加元数据:metadata_json

5. Milvus 里最后到底存的是什么

5.1 Collection schema

当前 MilvusVectorIndex 初始化 collection 时定义的 schema 在 milvus_vector_index.py

schema = CollectionSchema(
    fields=[
        FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=128, is_primary=True, auto_id=False),
        FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="doc_name", dtype=DataType.VARCHAR, max_length=256),
        FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=settings.embedding_dim),
        FieldSchema(name="section_title", dtype=DataType.VARCHAR, max_length=512),
        FieldSchema(name="section_path", dtype=DataType.VARCHAR, max_length=4096),
        FieldSchema(name="page_number", dtype=DataType.INT64),
        FieldSchema(name="regulation_type", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="version", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="semantic_id", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="block_type", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="metadata_json", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="created_at", dtype=DataType.INT64),
    ],
    description="Dense-only regulations index",
    enable_dynamic_field=False,
)

这说明 Milvus 存的不是“只有 embedding 的极简向量表”,而是:

  • 一个 dense vector
  • 一组检索时要返回或过滤的结构化字段

但要注意:这并不意味着 Milvus 是业务主记录库。它仍然主要服务于检索,而不是替代 PostgreSQL 的文档管理职责。

5.2 list_documents() 为什么会先看 Milvus

文档列表查询在 services.py 中实现,它会:

  1. 从 Milvus 查询当前真的存在向量的文档
  2. 从文档元数据仓储加载文档记录
  3. 以 Milvus 为索引状态真相源进行 merge

原因不是“Milvus 替代 PostgreSQL”而是

  • indexed 这个状态最终是否真实成立,要看 Milvus 里有没有对应 chunk
  • 但下载、删除、重试、文件定位、错误信息仍然要依赖文档元数据仓储

所以:

  • Milvus 是“索引真相源”
  • PostgreSQL/JSON 是“文档元数据真相源”

这两者职责不同,不能互相替代。