Files
AIRegulation-DocAnalysis/docs/architecture/document-processing-database-design.md

17 KiB
Raw Permalink Blame History

文档处理链路数据库设计

1. Purpose

本文档定义当前文档处理主链路的 PostgreSQL 数据库设计,覆盖上传、解析、索引、状态查询、重试、删除这条核心链路,以及围绕该链路的常用运维与审计需求。

本文档的目标不是替代 document-core-processing-flow.md 的流程说明,而是补齐关系型存储的 authority使后续从 JSON 元数据切换到 PostgreSQL 时有清晰、稳定、可实施的数据库设计基线。

1.1 Scope And Design Target

本文档只覆盖以下范围:

  • 文档主记录
  • 文档处理运行记录
  • 文档状态历史
  • 解析产物引用
  • 当前最新结构化解析快照

本文档不覆盖以下范围:

  • Agent 会话
  • 反馈和人工审核
  • 合规分析任务
  • Milvus collection schema 的详细实现

设计原则采用 Compat First

  • 保持与当前 DocumentRepository / ParseArtifactStore 主流程兼容
  • 新增关系表以补足运维与审计能力
  • 不为了理想化模型而反推大规模接口重写

2. Storage Responsibilities

当前系统采用三类存储,各自职责必须清晰分离:

存储 保存内容 是否业务主记录 说明
MinIO 原始文件、layouts.jsonstructure_nodes.jsonsemantic_blocks.jsonvector_chunks.json 负责大对象与产物归档,不承担关系查询
Milvus chunk 级向量和检索辅助字段 负责向量检索,不承担文档生命周期管理
PostgreSQL 文档元数据、处理状态、结构化快照、处理历史、artifact 引用 负责文档管理、运维可观测性和关系查询

约束说明:

  • PostgreSQL 不保存 embedding 向量。
  • PostgreSQL 不新增 vector_chunks 内容表。
  • Milvus 可以保存 doc_iddoc_nameregulation_typeversion 等检索辅助字段,但不是业务真相源。
  • 文档下载、删除、重试仍以 PostgreSQL 中的文档主记录为入口。

3. Design Overview

3.1 Entity Responsibilities

数据库采用“当前态主记录 + 当前快照 + 历史过程”的分层模型:

  • documents
    • 当前文档主记录
    • 保存供管理、下载、重试、删除直接使用的元数据和当前状态
  • document_processing_runs
    • 每次上传或重试对应一次处理运行
    • 保存运行级统计、阶段时间点和失败信息
  • document_status_history
    • 追加式状态事件流
    • 保存每次状态变更的上下文
  • document_artifacts
    • 保存 MinIO artifact 的引用信息
    • 不保存 artifact 内容本体
  • structure_nodes
    • 当前最新解析快照中的目录结构
  • semantic_blocks
    • 当前最新解析快照中的语义块结构

3.2 Current Snapshot Vs Historical Records

本设计显式区分两类数据:

  • 当前快照
    • documents
    • structure_nodes
    • semantic_blocks
  • 历史过程
    • document_processing_runs
    • document_status_history
    • document_artifacts

其中:

  • structure_nodessemantic_blocks 只保存“最新一次成功解析后”的当前快照
  • 历史版本回溯依赖 document_processing_runsdocument_artifacts 和 MinIO 中对应 run 的 artifact 文件

4. Table Design

4.1 documents

用途:

  • 作为文档生命周期的主记录表
  • 为下载、删除、重试、管理列表、状态查询提供当前态真相

字段设计:

CREATE TABLE IF NOT EXISTS documents (
    doc_id              VARCHAR(128) PRIMARY KEY,
    doc_name            VARCHAR(512)  NOT NULL DEFAULT '',
    file_name           VARCHAR(512)  NOT NULL DEFAULT '',
    object_name         VARCHAR(1024) NOT NULL DEFAULT '',
    content_type        VARCHAR(128)  NOT NULL DEFAULT '',
    size_bytes          BIGINT        NOT NULL DEFAULT 0,
    status              VARCHAR(32)   NOT NULL DEFAULT 'pending',
    regulation_type     VARCHAR(128)  NOT NULL DEFAULT '',
    version             VARCHAR(64)   NOT NULL DEFAULT '',
    summary             TEXT          NOT NULL DEFAULT '',
    summary_latency_ms  INTEGER       NOT NULL DEFAULT 0,
    chunk_count         INTEGER       NOT NULL DEFAULT 0,
    parser_name         VARCHAR(128)  NOT NULL DEFAULT '',
    index_name          VARCHAR(128)  NOT NULL DEFAULT '',
    error_message       TEXT          NOT NULL DEFAULT '',
    metadata            JSONB         NOT NULL DEFAULT '{}'::jsonb,
    created_at          TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    updated_at          TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    CONSTRAINT chk_documents_status
        CHECK (status IN ('pending', 'stored', 'parsed', 'indexed', 'failed'))
);

CREATE INDEX IF NOT EXISTS idx_documents_status_updated_at
    ON documents(status, updated_at DESC);

CREATE INDEX IF NOT EXISTS idx_documents_regulation_version
    ON documents(regulation_type, version);

CREATE INDEX IF NOT EXISTS idx_documents_updated_at
    ON documents(updated_at DESC);

字段说明:

  • object_name
    • 原始上传文件在 MinIO 中的对象路径
    • 当前实现依赖该字段完成下载、重试和删除v1 不拆分为独立文件表
  • status
    • 当前文档处理状态
    • 仅表示当前态,不承担历史审计职责
  • metadata
    • 保存轻量、变动频率较高、暂不值得列式建模的附加信息
    • 典型内容包括 parse_task_idprocessing_stageartifact_keys、统计计数等

4.2 document_processing_runs

用途:

  • 记录一次上传或一次重试的完整处理运行
  • 用于解释“这份文档本次处理为什么成功或失败”

字段设计:

CREATE TABLE IF NOT EXISTS document_processing_runs (
    run_id                 BIGSERIAL PRIMARY KEY,
    doc_id                 VARCHAR(128) NOT NULL,
    trigger_type           VARCHAR(16)  NOT NULL,
    run_status             VARCHAR(16)  NOT NULL,
    parser_backend         VARCHAR(64)  NOT NULL DEFAULT '',
    chunk_backend          VARCHAR(64)  NOT NULL DEFAULT '',
    embedding_model        VARCHAR(128) NOT NULL DEFAULT '',
    index_name             VARCHAR(128) NOT NULL DEFAULT '',
    started_at             TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    stored_at              TIMESTAMPTZ,
    parsed_at              TIMESTAMPTZ,
    indexed_at             TIMESTAMPTZ,
    finished_at            TIMESTAMPTZ,
    layout_count           INTEGER      NOT NULL DEFAULT 0,
    structure_node_count   INTEGER      NOT NULL DEFAULT 0,
    semantic_block_count   INTEGER      NOT NULL DEFAULT 0,
    vector_chunk_count     INTEGER      NOT NULL DEFAULT 0,
    chunk_count            INTEGER      NOT NULL DEFAULT 0,
    failure_stage          VARCHAR(32)  NOT NULL DEFAULT '',
    error_message          TEXT         NOT NULL DEFAULT '',
    metadata               JSONB        NOT NULL DEFAULT '{}'::jsonb,
    CONSTRAINT fk_runs_document
        FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE,
    CONSTRAINT chk_runs_trigger_type
        CHECK (trigger_type IN ('upload', 'retry')),
    CONSTRAINT chk_runs_status
        CHECK (run_status IN ('running', 'succeeded', 'failed'))
);

CREATE INDEX IF NOT EXISTS idx_runs_doc_started_at
    ON document_processing_runs(doc_id, started_at DESC);

CREATE INDEX IF NOT EXISTS idx_runs_status_started_at
    ON document_processing_runs(run_status, started_at DESC);

字段说明:

  • trigger_type
    • 标识该次处理由首次上传还是 retry 触发
  • run_status
    • 只表示该次运行的最终结果
  • failure_stage
    • 建议取值与应用层关键阶段一致,例如 storeparseartifact_persistembedindex
  • metadata
    • 保存运行级附加上下文例如配置快照、后端实现名、provider 返回信息摘要

4.3 document_status_history

用途:

  • 保存状态变化事件流
  • 用于排障、审计和运行轨迹分析

字段设计:

CREATE TABLE IF NOT EXISTS document_status_history (
    event_id         BIGSERIAL PRIMARY KEY,
    doc_id           VARCHAR(128) NOT NULL,
    run_id           BIGINT,
    from_status      VARCHAR(32)  NOT NULL DEFAULT '',
    to_status        VARCHAR(32)  NOT NULL,
    stage            VARCHAR(32)  NOT NULL DEFAULT '',
    message          TEXT         NOT NULL DEFAULT '',
    metadata         JSONB        NOT NULL DEFAULT '{}'::jsonb,
    occurred_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    CONSTRAINT fk_status_document
        FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE,
    CONSTRAINT fk_status_run
        FOREIGN KEY (run_id) REFERENCES document_processing_runs(run_id) ON DELETE CASCADE,
    CONSTRAINT chk_status_history_to_status
        CHECK (to_status IN ('pending', 'stored', 'parsed', 'indexed', 'failed'))
);

CREATE INDEX IF NOT EXISTS idx_status_history_doc_occurred_at
    ON document_status_history(doc_id, occurred_at DESC);

CREATE INDEX IF NOT EXISTS idx_status_history_run_occurred_at
    ON document_status_history(run_id, occurred_at DESC);

字段说明:

  • from_status 可以为空字符串
    • 用于首个事件,例如文档创建时进入 pending
  • stage
    • 用于记录状态推进对应的业务阶段
  • message
    • 用于记录面向排障的人类可读说明

4.4 document_artifacts

用途:

  • 保存解析产物在 MinIO 中的位置与基本属性
  • 支持后续定位某次 run 的 artifacts而不扫描对象存储

字段设计:

CREATE TABLE IF NOT EXISTS document_artifacts (
    artifact_id       BIGSERIAL PRIMARY KEY,
    doc_id            VARCHAR(128)  NOT NULL,
    run_id            BIGINT,
    artifact_type     VARCHAR(32)   NOT NULL,
    object_name       VARCHAR(1024) NOT NULL,
    content_type      VARCHAR(128)  NOT NULL DEFAULT 'application/json',
    byte_size         BIGINT        NOT NULL DEFAULT 0,
    checksum          VARCHAR(128)  NOT NULL DEFAULT '',
    metadata          JSONB         NOT NULL DEFAULT '{}'::jsonb,
    created_at        TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    CONSTRAINT fk_artifacts_document
        FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE,
    CONSTRAINT fk_artifacts_run
        FOREIGN KEY (run_id) REFERENCES document_processing_runs(run_id) ON DELETE CASCADE,
    CONSTRAINT chk_artifact_type
        CHECK (artifact_type IN ('layouts', 'structure_nodes', 'semantic_blocks', 'vector_chunks'))
);

CREATE INDEX IF NOT EXISTS idx_artifacts_doc_created_at
    ON document_artifacts(doc_id, created_at DESC);

CREATE INDEX IF NOT EXISTS idx_artifacts_run_type
    ON document_artifacts(run_id, artifact_type);

CREATE UNIQUE INDEX IF NOT EXISTS uq_artifacts_run_type_object
    ON document_artifacts(run_id, artifact_type, object_name);

字段说明:

  • 该表只记录 artifact 引用,不记录原始文件
  • 原始文件仍由 documents.object_name 表达,这是为了保持当前下载和重试逻辑兼容

4.5 structure_nodes

用途:

  • 保存当前最新解析快照中的标题层级结构
  • 供目录树查询、结构化浏览、调试和审计使用

字段设计:

CREATE TABLE IF NOT EXISTS structure_nodes (
    id             BIGSERIAL PRIMARY KEY,
    doc_id         VARCHAR(128) NOT NULL,
    unique_id      VARCHAR(128),
    page           INTEGER      NOT NULL DEFAULT 0,
    idx            INTEGER      NOT NULL DEFAULT 0,
    level          INTEGER      NOT NULL DEFAULT 0,
    title          TEXT         NOT NULL DEFAULT '',
    type           VARCHAR(64),
    sub_type       VARCHAR(64),
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    CONSTRAINT fk_structure_nodes_document
        FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_structure_nodes_doc_idx
    ON structure_nodes(doc_id, idx);

CREATE INDEX IF NOT EXISTS idx_structure_nodes_doc_level
    ON structure_nodes(doc_id, level);

设计约束:

  • 该表表示当前快照,不做多版本建模
  • 新一轮成功解析会覆盖同一 doc_id 的旧快照

4.6 semantic_blocks

用途:

  • 保存当前最新解析快照中的语义块
  • 供结构回溯、调试和后续关系型查询使用

字段设计:

CREATE TABLE IF NOT EXISTS semantic_blocks (
    id              BIGSERIAL PRIMARY KEY,
    doc_id          VARCHAR(128) NOT NULL,
    semantic_id     VARCHAR(128) NOT NULL,
    block_type      VARCHAR(64)  NOT NULL DEFAULT '',
    page_start      INTEGER      NOT NULL DEFAULT 0,
    page_end        INTEGER      NOT NULL DEFAULT 0,
    section_path    JSONB        NOT NULL DEFAULT '[]'::jsonb,
    section_level   INTEGER      NOT NULL DEFAULT 0,
    section_title   VARCHAR(512) NOT NULL DEFAULT '',
    source_ids      JSONB        NOT NULL DEFAULT '[]'::jsonb,
    text            TEXT         NOT NULL DEFAULT '',
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    CONSTRAINT fk_semantic_blocks_document
        FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE,
    CONSTRAINT uq_semantic_blocks_doc_semantic
        UNIQUE (doc_id, semantic_id)
);

CREATE INDEX IF NOT EXISTS idx_semantic_blocks_doc_id
    ON semantic_blocks(doc_id);

CREATE INDEX IF NOT EXISTS idx_semantic_blocks_doc_section_title
    ON semantic_blocks(doc_id, section_title);

CREATE INDEX IF NOT EXISTS idx_semantic_blocks_doc_block_type
    ON semantic_blocks(doc_id, block_type);

设计约束:

  • 该表表示当前快照,不保存历史版本
  • 历史回溯应通过 run 对应的 artifact 文件完成

5. Relationship Model

实体关系如下:

erDiagram
    documents ||--o{ document_processing_runs : has
    documents ||--o{ document_status_history : has
    documents ||--o{ document_artifacts : has
    documents ||--o{ structure_nodes : has
    documents ||--o{ semantic_blocks : has
    document_processing_runs ||--o{ document_status_history : emits
    document_processing_runs ||--o{ document_artifacts : produces

关系语义:

  • documents 是聚合根
  • document_processing_runs 记录一次完整处理尝试
  • document_status_history 记录状态推进轨迹
  • document_artifacts 记录 MinIO 中可回放的结构化产物
  • structure_nodes / semantic_blocks 代表“当前版本”的关系型快照

6. Flow-To-Table Mapping

6.1 Upload

上传开始时:

  1. 创建 documents
  2. 创建一条 document_processing_runs
  3. 写入一条 document_status_historyto_status='pending'

6.2 Store Original File

原始文件写入 MinIO 成功后:

  1. 更新 documents.status='stored'
  2. 更新当前 run 的 stored_at
  3. 追加 document_status_history

6.3 Parse And Persist Artifacts

解析成功后:

  1. 更新当前 run 的 parsed_at
  2. 更新 run 的 layout_countstructure_node_countsemantic_block_countvector_chunk_count
  3. 更新 documents.status='parsed'
  4. 刷新 structure_nodes
  5. 刷新 semantic_blocks
  6. layoutsstructure_nodessemantic_blocksvector_chunks 写入 document_artifacts
  7. 追加 document_status_history

6.4 Embed And Index

向量化和入库成功后:

  1. 更新当前 run 的 indexed_atfinished_at
  2. 更新当前 run 的 run_status='succeeded'
  3. 更新 documents.status='indexed'
  4. 更新 documents.chunk_countindex_name
  5. 追加 document_status_history

6.5 Failure

任一阶段失败时:

  1. 更新当前 run 的 run_status='failed'
  2. 记录 failure_stageerror_message
  3. 更新 finished_at
  4. 更新 documents.status='failed'
  5. 更新 documents.error_message
  6. 追加 document_status_history

6.6 Retry

重试时:

  1. 保留现有 documents.doc_id
  2. 新建一条 document_processing_runs
  3. 为本次重试重新写入状态历史
  4. 本次重试成功后覆盖 structure_nodes / semantic_blocks 当前快照
  5. 历史 run 和 artifact 记录继续保留

6.7 Delete

删除文档时:

  1. 应用层先删除 MinIO 原始文件和 artifacts
  2. 应用层删除 Milvus 中按 doc_id 关联的向量
  3. 最后删除 documents
  4. 依赖外键 ON DELETE CASCADE 清理 run、status history、artifacts、structure nodes、semantic blocks

7. Alignment With Current Backend

7.1 Compatible Parts

当前代码已天然兼容以下设计:

  • documents
  • structure_nodes
  • semantic_blocks
  • 当前快照覆盖式更新
  • doc_id 作为跨 MinIO / Milvus / PostgreSQL 的统一关联键

7.2 Required Future Additions

若后续正式切到 PostgreSQL 默认元数据后端,应新增以下内部 store 或 repository

  • DocumentProcessingRunStore
  • DocumentStatusEventStore
  • DocumentArtifactStore

这些新增能力属于内部增强,不要求修改现有 HTTP API。

7.3 Migration Guidance

从当前 JSON 元数据切换到 PostgreSQL 时,建议按以下顺序进行:

  1. 迁移 documents.json 中已有文档主记录到 documents
  2. DOCUMENT_REPOSITORY_BACKEND 切换为 postgres
  3. 为新上传或重试的文档开始写入 run / status history / artifact records
  4. 历史文档若缺少 run 级数据,可允许为空,不阻塞切换

8. Non-Goals

以下能力不在本设计 v1 范围内:

  • 将 Milvus 替换为 PostgreSQL 向量能力
  • 在 PostgreSQL 中保存向量字段
  • vector_chunks 建独立关系表
  • structure_nodes / semantic_blocks 建历史版本仓库
  • 将原始文件抽象成独立 document_files

这些能力可能在后续重构时被讨论,但不应影响当前主链路切换和现有应用层兼容性。