Files
ash66 30c7bda389 Refactor document handling and update Milvus collection settings
- Removed multiple failed document entries from `documents.json`.
- Added a new document entry with updated metadata and changed the index name to `regulations_dense_1024_v2`.
- Updated architecture documentation to reflect changes in the Milvus collection name.
- Adjusted requirements by removing the sqlalchemy dependency.
- Modified test cases to align with new document structure and naming conventions.
- Introduced a new test file for Milvus vector index runtime recovery and error handling.
- Updated assertions in various test files to ensure compatibility with the new schema.
2026-05-26 20:21:31 +08:00

475 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
阿里云文档智能 API 解析 PDF输出三层结构 chunks
- structure_nodes: 目录树结构
- semantic_blocks: 语义块(章节文本、表格、图片)
- vector_chunks: 检索块(带 overlap 切分)
"""
import argparse
import json
import re
import time
from pathlib import Path
from typing import Dict, List
from alibabacloud_docmind_api20220711.client import Client as DocmindClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_docmind_api20220711 import models as docmind_models
from alibabacloud_tea_util import models as util_models
# ===================== 阿里云配置 =====================
ALIBABA_ACCESS_KEY_ID = "LTAI5t6fWvAsvZkoF9WTbtys"
ALIBABA_ACCESS_KEY_SECRET = "WX4oaE4FLYRa5L85TMQkqRPHeTJAF0"
ALIBABA_ENDPOINT = "docmind-api.cn-hangzhou.aliyuncs.com"
# ===================== 切分参数 =====================
MAX_CHARS = 600
OVERLAP_CHARS = 80
# ===================== 布局类型常量 =====================
TOC_TITLES = {"目次", "目录"}
TITLE_SUBTYPES = {"doc_title", "para_title"}
TEXT_SUBTYPES = {"para", "none"}
FIGURE_TYPES = {"figure", "figure_name", "figure_note"}
FIGURE_SUBTYPES = {"picture", "pic_title", "pic_caption"}
# ===================== 阿里云 API 客户端 =====================
def init_client() -> DocmindClient:
config = open_api_models.Config(
access_key_id=ALIBABA_ACCESS_KEY_ID,
access_key_secret=ALIBABA_ACCESS_KEY_SECRET,
)
config.endpoint = ALIBABA_ENDPOINT
return DocmindClient(config)
def submit_job(client: DocmindClient, file_path: str) -> str:
"""提交文档解析任务"""
file_name = Path(file_path).name
request = docmind_models.SubmitDocParserJobAdvanceRequest(
file_url_object=open(file_path, "rb"),
file_name=file_name,
file_name_extension=Path(file_path).suffix.lstrip("."),
llm_enhancement=True,
enhancement_mode="VLM",
)
runtime = util_models.RuntimeOptions()
response = client.submit_doc_parser_job_advance(request, runtime)
return response.body.data.id
def query_status(client: DocmindClient, task_id: str) -> Dict:
"""查询任务状态"""
request = docmind_models.QueryDocParserStatusRequest(id=task_id)
response = client.query_doc_parser_status(request)
return response.body.data.to_map() if response.body.data else None
def wait_for_completion(client: DocmindClient, task_id: str, poll_interval: int = 5) -> bool:
"""等待任务完成"""
while True:
status_data = query_status(client, task_id)
if not status_data:
return False
status = status_data.get("Status", "").lower()
if status == "success":
return True
elif status == "failed":
print(f"任务失败: {status_data}")
return False
print(f"任务状态: {status}, 等待中...")
time.sleep(poll_interval)
def get_result(client: DocmindClient, task_id: str, layout_num: int = 0, layout_step_size: int = 50) -> Dict:
"""获取解析结果"""
request = docmind_models.GetDocParserResultRequest(
id=task_id,
layout_step_size=layout_step_size,
layout_num=layout_num,
)
response = client.get_doc_parser_result(request)
return response.body.data if response.body.data else None
def collect_all_results(client: DocmindClient, task_id: str, layout_step_size: int = 50) -> List[Dict]:
"""收集所有解析结果"""
all_layouts = []
layout_num = 0
while True:
result_data = get_result(client, task_id, layout_num, layout_step_size)
if not result_data:
break
layouts = result_data.get("layouts", [])
if not layouts:
break
all_layouts.extend(layouts)
layout_num += len(layouts)
if len(layouts) < layout_step_size:
break
return all_layouts
# ===================== 文本处理 =====================
def normalize_text(text: str) -> str:
text = text.replace("\r", "\n")
text = text.replace(" ", " ")
text = re.sub(r"\n+", "\n", text)
text = re.sub(r"[ \t]+", " ", text)
return text.strip()
def get_page(layout: Dict) -> int:
return layout.get("pageNum", layout.get("pageNumber", 0))
def get_text(layout: Dict) -> str:
text = normalize_text(layout.get("text", ""))
if text:
return text
return normalize_text(layout.get("markdownContent", ""))
# ===================== 布局类型判断 =====================
def is_title(layout: Dict) -> bool:
return layout.get("type") == "title" or layout.get("subType") in TITLE_SUBTYPES
def is_text(layout: Dict) -> bool:
return layout.get("type") == "text" and layout.get("subType", "none") in TEXT_SUBTYPES
def is_figure(layout: Dict) -> bool:
return layout.get("type") in FIGURE_TYPES or layout.get("subType") in FIGURE_SUBTYPES
def is_table(layout: Dict) -> bool:
return layout.get("type") == "table"
def is_toc_layout(layout: Dict) -> bool:
text = get_text(layout)
if text in TOC_TITLES:
return True
if get_page(layout) == 1 and re.match(r"^\d+(\.\d+)*\s+.+[.。…]{2,}\s*\d+$", text):
return True
return False
def extract_table_text(layout: Dict) -> str:
rows = []
for cell in layout.get("cells", []):
texts = []
for cell_layout in cell.get("layouts", []):
cell_text = normalize_text(cell_layout.get("text", ""))
if cell_text:
texts.append(cell_text)
if texts:
rows.append(" ".join(texts))
return "\n".join(rows).strip()
# ===================== 结构层:目录树 =====================
def build_structure_nodes(layouts: List[Dict]) -> List[Dict]:
nodes = []
for layout in layouts:
if not is_title(layout):
continue
text = get_text(layout)
if not text or text in TOC_TITLES:
continue
nodes.append(
{
"unique_id": layout.get("uniqueId"),
"page": get_page(layout),
"index": layout.get("index", 0),
"level": layout.get("level", 0),
"title": text,
"type": layout.get("type"),
"sub_type": layout.get("subType"),
}
)
return nodes
# ===================== 语义层:章节内容 =====================
def update_section_path(section_stack: List[Dict], layout: Dict) -> List[Dict]:
level = layout.get("level", 0)
title = get_text(layout)
while section_stack and section_stack[-1]["level"] >= level:
section_stack.pop()
section_stack.append(
{
"level": level,
"title": title,
"page": get_page(layout),
"unique_id": layout.get("uniqueId"),
}
)
return section_stack
def section_path_titles(section_stack: List[Dict]) -> List[str]:
return [item["title"] for item in section_stack]
def flush_text_block(blocks: List[Dict], semantic_blocks: List[Dict], block_id: int) -> int:
if not blocks:
return block_id
texts = [item["text"] for item in blocks if item["text"]]
merged_text = "\n".join(texts).strip()
if not merged_text:
return block_id
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "section_text",
"page_start": min(item["page"] for item in blocks),
"page_end": max(item["page"] for item in blocks),
"section_path": blocks[0]["section_path"],
"section_level": blocks[0]["section_level"],
"section_title": blocks[0]["section_title"],
"source_ids": [item["unique_id"] for item in blocks if item.get("unique_id")],
"text": merged_text,
}
)
return block_id + 1
def build_semantic_blocks(layouts: List[Dict]) -> List[Dict]:
semantic_blocks = []
section_stack = []
pending_text_blocks = []
block_id = 1
skip_toc_page = False
for layout in layouts:
text = get_text(layout)
page = get_page(layout)
if is_toc_layout(layout):
skip_toc_page = True
continue
if skip_toc_page and page == 1:
continue
if skip_toc_page and page != 1:
skip_toc_page = False
if is_title(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
section_stack = update_section_path(section_stack, layout)
continue
section_path = section_path_titles(section_stack)
section_title = section_path[-1] if section_path else "未分类"
section_level = len(section_path)
if is_table(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
table_text = extract_table_text(layout)
if table_text:
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "table",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": table_text,
}
)
block_id += 1
continue
if is_figure(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
if text:
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "figure",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": text,
}
)
block_id += 1
continue
if is_text(layout) and text:
pending_text_blocks.append(
{
"page": page,
"text": text,
"unique_id": layout.get("uniqueId"),
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
}
)
flush_text_block(pending_text_blocks, semantic_blocks, block_id)
return semantic_blocks
# ===================== 检索层:向量 chunks =====================
def split_text_with_overlap(text: str, max_chars: int, overlap_chars: int) -> List[str]:
text = text.strip()
if len(text) <= max_chars:
return [text] if text else []
parts = []
start = 0
while start < len(text):
end = min(len(text), start + max_chars)
parts.append(text[start:end].strip())
if end >= len(text):
break
start = max(0, end - overlap_chars)
return [part for part in parts if part]
def build_vector_chunks(
semantic_blocks: List[Dict],
doc_id: str,
doc_title: str,
max_chars: int,
overlap_chars: int,
) -> List[Dict]:
vector_chunks = []
chunk_index = 1
for block in semantic_blocks:
pieces = split_text_with_overlap(block["text"], max_chars, overlap_chars)
for piece_index, piece in enumerate(pieces, start=1):
if block["section_path"]:
header = f"标准:{doc_title}\n章节:{' > '.join(block['section_path'])}\n\n"
else:
header = f"标准:{doc_title}\n\n"
vector_chunks.append(
{
"doc_id": doc_id,
"doc_title": doc_title,
"chunk_id": f"chunk-{chunk_index}",
"chunk_index": chunk_index,
"semantic_id": block["semantic_id"],
"chunk_type": block["block_type"],
"piece_index": piece_index,
"page_start": block["page_start"],
"page_end": block["page_end"],
"section_path": block["section_path"],
"section_level": block["section_level"],
"section_title": block["section_title"],
"source_ids": block["source_ids"],
"text": piece,
"embedding_text": header + piece,
}
)
chunk_index += 1
return vector_chunks
# ===================== 主转换函数 =====================
def convert_layouts(
layouts: List[Dict],
doc_id: str,
doc_title: str,
max_chars: int,
overlap_chars: int,
) -> Dict:
structure_nodes = build_structure_nodes(layouts)
semantic_blocks = build_semantic_blocks(layouts)
vector_chunks = build_vector_chunks(
semantic_blocks,
doc_id=doc_id,
doc_title=doc_title,
max_chars=max_chars,
overlap_chars=overlap_chars,
)
return {
"doc_id": doc_id,
"doc_title": doc_title,
"structure_nodes": structure_nodes,
"semantic_blocks": semantic_blocks,
"vector_chunks": vector_chunks,
}
# ===================== CLI 入口 =====================
def main() -> None:
parser = argparse.ArgumentParser(description="阿里云文档智能解析 PDF输出三层结构 chunks")
parser.add_argument("pdf_path", help="PDF 文件路径")
parser.add_argument("--out", default="vector_chunks.json", help="输出 JSON 文件路径")
parser.add_argument("--layouts-out", dest="layouts_output", help="输出原始 layouts JSON")
parser.add_argument("--doc-id", default="GB14747-2006", help="文档 ID")
parser.add_argument("--doc-title", default="GB 14747—2006 儿童三轮车安全要求", help="文档标题")
parser.add_argument("--max-chars", type=int, default=MAX_CHARS, help="单个检索 chunk 最大字符数")
parser.add_argument("--overlap-chars", type=int, default=OVERLAP_CHARS, help="相邻检索 chunk 重叠字符数")
parser.add_argument("--poll-interval", type=int, default=5, help="轮询间隔(秒)")
args = parser.parse_args()
pdf_path = Path(args.pdf_path).expanduser().resolve()
if not pdf_path.exists():
raise FileNotFoundError(f"PDF 文件不存在: {pdf_path}")
# 1. 提交阿里云任务
client = init_client()
print(f"提交任务: {pdf_path}")
task_id = submit_job(client, str(pdf_path))
print(f"任务 ID: {task_id}")
# 2. 等待完成
print("等待任务完成...")
if not wait_for_completion(client, task_id, args.poll_interval):
print("任务失败,退出")
return
# 3. 获取 layouts
print("获取解析结果...")
layouts = collect_all_results(client, task_id)
print(f"获取到 {len(layouts)} 个布局块")
# 4. 输出原始 layouts可选
if args.layouts_output:
layouts_path = Path(args.layouts_output).expanduser().resolve()
layouts_path.write_text(json.dumps(layouts, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"原始 layouts 已写入: {layouts_path}")
# 5. 转换为三层结构
print("转换为三层结构...")
data = convert_layouts(
layouts,
doc_id=args.doc_id,
doc_title=args.doc_title,
max_chars=args.max_chars,
overlap_chars=args.overlap_chars,
)
# 6. 输出结果
output_path = Path(args.out).expanduser().resolve()
output_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"结构层节点数: {len(data['structure_nodes'])}")
print(f"语义层块数: {len(data['semantic_blocks'])}")
print(f"检索层块数: {len(data['vector_chunks'])}")
print(f"输出文件: {output_path}")
if __name__ == "__main__":
main()