Design for processing PDFs page-by-page with image rendering and MD5 tracking for chunk-to-page association. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
8.3 KiB
8.3 KiB
title, date, status
| title | date | status |
|---|---|---|
| PDF 按页分块设计 | 2026-05-13 | approved |
PDF 按页分块与图片关联设计
背景
当前文档处理流程将 PDF 全文合并后分块,无法追溯每个 chunk 的原始页面位置。需要改进为按页读取 PDF,渲染页面图片并计算 MD5,使 chunk 能关联到原文页面,便于检索结果溯源展示。
目标
- PDF 按页独立读取,保留页码信息
- 每页渲染为图片,计算 MD5,存储到 MinIO
- 文本分块后,每个 chunk 关联所属页面信息(可能跨多页)
- Chunk metadata 存入 Milvus,支持检索时溯源原文页面
数据结构
PageInfo - 页面信息
PageInfo = {
"page_num": int, # 页码 (1-based)
"text": str, # 该页提取的文本
"start_pos": int, # 在全文中的起始字符位置
"end_pos": int, # 在全文中的结束字符位置
"image_md5": str, # 页面图片 MD5
"minio_image_path": str, # MinIO 存储路径,如 "doc-xxx/pages/page_1.png"
}
ChunkPayload - Milvus 存储结构
ChunkPayload = {
"chunk_id": str, # 唯一ID,格式: {doc_id}-chunk-{index}
"doc_id": str, # 文档ID
"doc_name": str, # 文档名
"content": str, # 文本内容
"chunk_index": int, # 分块索引
"clause_id": str, # 条款ID(如有),如 "第一条"
"page_nums": [int], # 所属页面列表,如 [3, 4] 表示跨页
"page_image_md5s": [str], # 页面图片 MD5 列表
"minio_image_paths": [str],# 页面图片路径列表
}
处理流程
步骤 1:按页读取 PDF
输入: PDF 文件路径
输出: List[PageInfo](不含图片信息)
def parse_pdf_by_page(file_path: str) -> List[PageInfo]:
pages = []
cumulative_pos = 0
with pdfplumber.open(file_path) as pdf:
for i, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
start_pos = cumulative_pos
end_pos = cumulative_pos + len(text)
pages.append({
"page_num": i,
"text": text,
"start_pos": start_pos,
"end_pos": end_pos,
"image_md5": None,
"minio_image_path": None,
})
cumulative_pos = end_pos
return pages
步骤 2:渲染页面图片
输入: PDF 文件路径,List[PageInfo]
输出: 更新 PageInfo 的 image_md5 和 minio_image_path
def render_pages_to_images(file_path: str, pages: List[PageInfo], doc_id: str) -> List[PageInfo]:
import hashlib
from pdf2image import convert_from_path
images = convert_from_path(file_path)
for page_info in pages:
page_num = page_info["page_num"]
image = images[page_num - 1]
# 转为 bytes 计算 MD5
img_bytes = io.BytesIO()
image.save(img_bytes, format='PNG')
img_data = img_bytes.getvalue()
md5 = hashlib.md5(img_data).hexdigest()
# 上传 MinIO
minio_path = f"{doc_id}/pages/page_{page_num}.png"
minio_service.upload_file(minio_path, img_data, "image/png")
# 更新 PageInfo
page_info["image_md5"] = md5
page_info["minio_image_path"] = minio_path
return pages
步骤 3:文本分块
输入: 全文文本
输出: List[RawChunk](含位置信息)
def chunk_text_with_position(text: str) -> List[RawChunk]:
chunks = []
# 优先按条款分块
clause_chunks = chunk_by_clause(text)
if clause_chunks:
# 需要重新计算位置信息
for chunk in clause_chunks:
# 根据内容在全文中查找位置
start_pos = text.find(chunk["content"])
end_pos = start_pos + len(chunk["content"])
chunks.append({
"content": chunk["content"],
"clause_id": chunk.get("clause_id"),
"start_pos": start_pos,
"end_pos": end_pos,
})
else:
# 按固定大小分块
chunks = chunk_by_size_with_position(text)
return chunks
步骤 4:关联页面信息
输入: List[RawChunk], List[PageInfo]
输出: List[ChunkPayload]
def associate_chunks_with_pages(chunks: List[RawChunk], pages: List[PageInfo], doc_id: str, doc_name: str) -> List[ChunkPayload]:
payloads = []
for i, chunk in enumerate(chunks):
chunk_start = chunk["start_pos"]
chunk_end = chunk["end_pos"]
# 查找重叠的页面
page_nums = []
page_md5s = []
page_paths = []
for page in pages:
# 判断 chunk 是否与该页有重叠
if chunk_start < page["end_pos"] and chunk_end > page["start_pos"]:
page_nums.append(page["page_num"])
page_md5s.append(page["image_md5"])
page_paths.append(page["minio_image_path"])
payloads.append({
"chunk_id": f"{doc_id}-chunk-{i}",
"doc_id": doc_id,
"doc_name": doc_name,
"content": chunk["content"],
"chunk_index": i,
"clause_id": chunk.get("clause_id"),
"page_nums": page_nums,
"page_image_md5s": page_md5s,
"minio_image_paths": page_paths,
})
return payloads
步骤 5:存入 Milvus
输入: List[ChunkPayload]
def store_chunks_to_milvus(chunks: List[ChunkPayload]):
# 生成向量
contents = [chunk["content"] for chunk in chunks]
vectors = embedding_service.get_embeddings(contents)
# 构建插入数据
data = [
{
"id": chunk["chunk_id"],
"vector": vectors[i],
"content": chunk["content"],
"doc_id": chunk["doc_id"],
"doc_name": chunk["doc_name"],
"chunk_index": chunk["chunk_index"],
"clause_id": chunk.get("clause_id", ""),
"page_nums": chunk["page_nums"],
"page_image_md5s": chunk["page_image_md5s"],
"minio_image_paths": chunk["minio_image_paths"],
}
for i, chunk in enumerate(chunks)
]
milvus_service.insert(data)
文件改动清单
| 文件 | 改动 |
|---|---|
app/services/document.py |
新增 parse_pdf_by_page() 方法 |
app/services/document.py |
新增 render_pages_to_images() 方法 |
app/utils/chunking.py |
新增 chunk_by_size_with_position() 方法 |
app/utils/chunking.py |
改进 chunk_by_clause() 返回位置信息 |
app/workflows/document_workflow.py |
改造 run_parse_workflow() 集成新流程 |
app/services/milvus.py |
更新 collection schema 增加 page 相关字段 |
requirements.txt |
新增 pdf2image 依赖 |
Milvus Collection Schema 更新
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=64, is_primary=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=embedding_dim),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="doc_name", dtype=DataType.VARCHAR, max_length=255),
FieldSchema(name="chunk_index", dtype=DataType.INT64),
FieldSchema(name="clause_id", dtype=DataType.VARCHAR, max_length=32),
FieldSchema(name="page_nums", dtype=DataType.ARRAY, element_type=DataType.INT64, max_capacity=10),
FieldSchema(name="page_image_md5s", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_length=32, max_capacity=10),
FieldSchema(name="minio_image_paths", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_length=256, max_capacity=10),
]
存储开销估算
| 类型 | 单页大小 | 100页文档 |
|---|---|---|
| PNG 图片 (300 DPI) | ~300 KB | ~30 MB |
| Chunk metadata | ~500 bytes | ~50 KB |
| 向量 (1536 dim) | 6 KB | 视 chunk 数量 |
注意事项
- 大 PDF 处理:渲染图片较慢,建议异步处理,用户可查看进度
- 跨页 chunk:
page_nums列表最多支持 10 页,超过则截断并记录警告 - 图片格式:使用 PNG 保证清晰度,可选 JPEG 节省空间
- MD5 唯一性:不同文档的相同页面内容会有不同 MD5(因包含文档上下文)