Files
AIRegulation-DocAnalysis/aliyun_parser/parse_pdf.py

475 lines
16 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
阿里云文档智能 API 解析 PDF输出三层结构 chunks
- structure_nodes: 目录树结构
- semantic_blocks: 语义块章节文本表格图片
- vector_chunks: 检索块 overlap 切分
"""
import argparse
import json
import re
import time
from pathlib import Path
from typing import Dict, List
from alibabacloud_docmind_api20220711.client import Client as DocmindClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_docmind_api20220711 import models as docmind_models
from alibabacloud_tea_util import models as util_models
# ===================== 阿里云配置 =====================
ALIBABA_ACCESS_KEY_ID = "LTAI5t6fWvAsvZkoF9WTbtys"
ALIBABA_ACCESS_KEY_SECRET = "WX4oaE4FLYRa5L85TMQkqRPHeTJAF0"
ALIBABA_ENDPOINT = "docmind-api.cn-hangzhou.aliyuncs.com"
# ===================== 切分参数 =====================
MAX_CHARS = 600
OVERLAP_CHARS = 80
# ===================== 布局类型常量 =====================
TOC_TITLES = {"目次", "目录"}
TITLE_SUBTYPES = {"doc_title", "para_title"}
TEXT_SUBTYPES = {"para", "none"}
FIGURE_TYPES = {"figure", "figure_name", "figure_note"}
FIGURE_SUBTYPES = {"picture", "pic_title", "pic_caption"}
# ===================== 阿里云 API 客户端 =====================
def init_client() -> DocmindClient:
config = open_api_models.Config(
access_key_id=ALIBABA_ACCESS_KEY_ID,
access_key_secret=ALIBABA_ACCESS_KEY_SECRET,
)
config.endpoint = ALIBABA_ENDPOINT
return DocmindClient(config)
def submit_job(client: DocmindClient, file_path: str) -> str:
"""提交文档解析任务"""
file_name = Path(file_path).name
request = docmind_models.SubmitDocParserJobAdvanceRequest(
file_url_object=open(file_path, "rb"),
file_name=file_name,
file_name_extension=Path(file_path).suffix.lstrip("."),
llm_enhancement=True,
enhancement_mode="VLM",
)
runtime = util_models.RuntimeOptions()
response = client.submit_doc_parser_job_advance(request, runtime)
return response.body.data.id
def query_status(client: DocmindClient, task_id: str) -> Dict:
"""查询任务状态"""
request = docmind_models.QueryDocParserStatusRequest(id=task_id)
response = client.query_doc_parser_status(request)
return response.body.data.to_map() if response.body.data else None
def wait_for_completion(client: DocmindClient, task_id: str, poll_interval: int = 5) -> bool:
"""等待任务完成"""
while True:
status_data = query_status(client, task_id)
if not status_data:
return False
status = status_data.get("Status", "").lower()
if status == "success":
return True
elif status == "failed":
print(f"任务失败: {status_data}")
return False
print(f"任务状态: {status}, 等待中...")
time.sleep(poll_interval)
def get_result(client: DocmindClient, task_id: str, layout_num: int = 0, layout_step_size: int = 50) -> Dict:
"""获取解析结果"""
request = docmind_models.GetDocParserResultRequest(
id=task_id,
layout_step_size=layout_step_size,
layout_num=layout_num,
)
response = client.get_doc_parser_result(request)
return response.body.data if response.body.data else None
def collect_all_results(client: DocmindClient, task_id: str, layout_step_size: int = 50) -> List[Dict]:
"""收集所有解析结果"""
all_layouts = []
layout_num = 0
while True:
result_data = get_result(client, task_id, layout_num, layout_step_size)
if not result_data:
break
layouts = result_data.get("layouts", [])
if not layouts:
break
all_layouts.extend(layouts)
layout_num += len(layouts)
if len(layouts) < layout_step_size:
break
return all_layouts
# ===================== 文本处理 =====================
def normalize_text(text: str) -> str:
text = text.replace("\r", "\n")
text = text.replace(" ", " ")
text = re.sub(r"\n+", "\n", text)
text = re.sub(r"[ \t]+", " ", text)
return text.strip()
def get_page(layout: Dict) -> int:
return layout.get("pageNum", layout.get("pageNumber", 0))
def get_text(layout: Dict) -> str:
text = normalize_text(layout.get("text", ""))
if text:
return text
return normalize_text(layout.get("markdownContent", ""))
# ===================== 布局类型判断 =====================
def is_title(layout: Dict) -> bool:
return layout.get("type") == "title" or layout.get("subType") in TITLE_SUBTYPES
def is_text(layout: Dict) -> bool:
return layout.get("type") == "text" and layout.get("subType", "none") in TEXT_SUBTYPES
def is_figure(layout: Dict) -> bool:
return layout.get("type") in FIGURE_TYPES or layout.get("subType") in FIGURE_SUBTYPES
def is_table(layout: Dict) -> bool:
return layout.get("type") == "table"
def is_toc_layout(layout: Dict) -> bool:
text = get_text(layout)
if text in TOC_TITLES:
return True
if get_page(layout) == 1 and re.match(r"^\d+(\.\d+)*\s+.+[.。…]{2,}\s*\d+$", text):
return True
return False
def extract_table_text(layout: Dict) -> str:
rows = []
for cell in layout.get("cells", []):
texts = []
for cell_layout in cell.get("layouts", []):
cell_text = normalize_text(cell_layout.get("text", ""))
if cell_text:
texts.append(cell_text)
if texts:
rows.append(" ".join(texts))
return "\n".join(rows).strip()
# ===================== 结构层:目录树 =====================
def build_structure_nodes(layouts: List[Dict]) -> List[Dict]:
nodes = []
for layout in layouts:
if not is_title(layout):
continue
text = get_text(layout)
if not text or text in TOC_TITLES:
continue
nodes.append(
{
"unique_id": layout.get("uniqueId"),
"page": get_page(layout),
"index": layout.get("index", 0),
"level": layout.get("level", 0),
"title": text,
"type": layout.get("type"),
"sub_type": layout.get("subType"),
}
)
return nodes
# ===================== 语义层:章节内容 =====================
def update_section_path(section_stack: List[Dict], layout: Dict) -> List[Dict]:
level = layout.get("level", 0)
title = get_text(layout)
while section_stack and section_stack[-1]["level"] >= level:
section_stack.pop()
section_stack.append(
{
"level": level,
"title": title,
"page": get_page(layout),
"unique_id": layout.get("uniqueId"),
}
)
return section_stack
def section_path_titles(section_stack: List[Dict]) -> List[str]:
return [item["title"] for item in section_stack]
def flush_text_block(blocks: List[Dict], semantic_blocks: List[Dict], block_id: int) -> int:
if not blocks:
return block_id
texts = [item["text"] for item in blocks if item["text"]]
merged_text = "\n".join(texts).strip()
if not merged_text:
return block_id
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "section_text",
"page_start": min(item["page"] for item in blocks),
"page_end": max(item["page"] for item in blocks),
"section_path": blocks[0]["section_path"],
"section_level": blocks[0]["section_level"],
"section_title": blocks[0]["section_title"],
"source_ids": [item["unique_id"] for item in blocks if item.get("unique_id")],
"text": merged_text,
}
)
return block_id + 1
def build_semantic_blocks(layouts: List[Dict]) -> List[Dict]:
semantic_blocks = []
section_stack = []
pending_text_blocks = []
block_id = 1
skip_toc_page = False
for layout in layouts:
text = get_text(layout)
page = get_page(layout)
if is_toc_layout(layout):
skip_toc_page = True
continue
if skip_toc_page and page == 1:
continue
if skip_toc_page and page != 1:
skip_toc_page = False
if is_title(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
section_stack = update_section_path(section_stack, layout)
continue
section_path = section_path_titles(section_stack)
section_title = section_path[-1] if section_path else "未分类"
section_level = len(section_path)
if is_table(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
table_text = extract_table_text(layout)
if table_text:
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "table",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": table_text,
}
)
block_id += 1
continue
if is_figure(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
if text:
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "figure",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": text,
}
)
block_id += 1
continue
if is_text(layout) and text:
pending_text_blocks.append(
{
"page": page,
"text": text,
"unique_id": layout.get("uniqueId"),
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
}
)
flush_text_block(pending_text_blocks, semantic_blocks, block_id)
return semantic_blocks
# ===================== 检索层:向量 chunks =====================
def split_text_with_overlap(text: str, max_chars: int, overlap_chars: int) -> List[str]:
text = text.strip()
if len(text) <= max_chars:
return [text] if text else []
parts = []
start = 0
while start < len(text):
end = min(len(text), start + max_chars)
parts.append(text[start:end].strip())
if end >= len(text):
break
start = max(0, end - overlap_chars)
return [part for part in parts if part]
def build_vector_chunks(
semantic_blocks: List[Dict],
doc_id: str,
doc_title: str,
max_chars: int,
overlap_chars: int,
) -> List[Dict]:
vector_chunks = []
chunk_index = 1
for block in semantic_blocks:
pieces = split_text_with_overlap(block["text"], max_chars, overlap_chars)
for piece_index, piece in enumerate(pieces, start=1):
if block["section_path"]:
header = f"标准:{doc_title}\n章节:{' > '.join(block['section_path'])}\n\n"
else:
header = f"标准:{doc_title}\n\n"
vector_chunks.append(
{
"doc_id": doc_id,
"doc_title": doc_title,
"chunk_id": f"chunk-{chunk_index}",
"chunk_index": chunk_index,
"semantic_id": block["semantic_id"],
"chunk_type": block["block_type"],
"piece_index": piece_index,
"page_start": block["page_start"],
"page_end": block["page_end"],
"section_path": block["section_path"],
"section_level": block["section_level"],
"section_title": block["section_title"],
"source_ids": block["source_ids"],
"text": piece,
"embedding_text": header + piece,
}
)
chunk_index += 1
return vector_chunks
# ===================== 主转换函数 =====================
def convert_layouts(
layouts: List[Dict],
doc_id: str,
doc_title: str,
max_chars: int,
overlap_chars: int,
) -> Dict:
structure_nodes = build_structure_nodes(layouts)
semantic_blocks = build_semantic_blocks(layouts)
vector_chunks = build_vector_chunks(
semantic_blocks,
doc_id=doc_id,
doc_title=doc_title,
max_chars=max_chars,
overlap_chars=overlap_chars,
)
return {
"doc_id": doc_id,
"doc_title": doc_title,
"structure_nodes": structure_nodes,
"semantic_blocks": semantic_blocks,
"vector_chunks": vector_chunks,
}
# ===================== CLI 入口 =====================
def main() -> None:
parser = argparse.ArgumentParser(description="阿里云文档智能解析 PDF输出三层结构 chunks")
parser.add_argument("pdf_path", help="PDF 文件路径")
parser.add_argument("--out", default="vector_chunks.json", help="输出 JSON 文件路径")
parser.add_argument("--layouts-out", dest="layouts_output", help="输出原始 layouts JSON")
parser.add_argument("--doc-id", default="GB14747-2006", help="文档 ID")
parser.add_argument("--doc-title", default="GB 14747—2006 儿童三轮车安全要求", help="文档标题")
parser.add_argument("--max-chars", type=int, default=MAX_CHARS, help="单个检索 chunk 最大字符数")
parser.add_argument("--overlap-chars", type=int, default=OVERLAP_CHARS, help="相邻检索 chunk 重叠字符数")
parser.add_argument("--poll-interval", type=int, default=5, help="轮询间隔(秒)")
args = parser.parse_args()
pdf_path = Path(args.pdf_path).expanduser().resolve()
if not pdf_path.exists():
raise FileNotFoundError(f"PDF 文件不存在: {pdf_path}")
# 1. 提交阿里云任务
client = init_client()
print(f"提交任务: {pdf_path}")
task_id = submit_job(client, str(pdf_path))
print(f"任务 ID: {task_id}")
# 2. 等待完成
print("等待任务完成...")
if not wait_for_completion(client, task_id, args.poll_interval):
print("任务失败,退出")
return
# 3. 获取 layouts
print("获取解析结果...")
layouts = collect_all_results(client, task_id)
print(f"获取到 {len(layouts)} 个布局块")
# 4. 输出原始 layouts可选
if args.layouts_output:
layouts_path = Path(args.layouts_output).expanduser().resolve()
layouts_path.write_text(json.dumps(layouts, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"原始 layouts 已写入: {layouts_path}")
# 5. 转换为三层结构
print("转换为三层结构...")
data = convert_layouts(
layouts,
doc_id=args.doc_id,
doc_title=args.doc_title,
max_chars=args.max_chars,
overlap_chars=args.overlap_chars,
)
# 6. 输出结果
output_path = Path(args.out).expanduser().resolve()
output_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"结构层节点数: {len(data['structure_nodes'])}")
print(f"语义层块数: {len(data['semantic_blocks'])}")
print(f"检索层块数: {len(data['vector_chunks'])}")
print(f"输出文件: {output_path}")
if __name__ == "__main__":
main()