Files
AIRegulation-DocAnalysis/backend/app/infrastructure/parser/aliyun_layout_normalizer.py

337 lines
11 KiB
Python
Raw Normal View History

"""Normalize Aliyun Docmind layouts into production document structures."""
from __future__ import annotations
import re
from typing import Any
# Keep layout normalization rules centralized so parser and demos stay aligned.
MAX_CHARS = 600
OVERLAP_CHARS = 80
TOC_TITLES = {"目次", "目录"}
TITLE_SUBTYPES = {"doc_title", "para_title"}
TEXT_SUBTYPES = {"para", "none"}
FIGURE_TYPES = {"figure", "figure_name", "figure_note"}
FIGURE_SUBTYPES = {"picture", "pic_title", "pic_caption"}
def normalize_text(text: str) -> str:
"""Normalize raw text content emitted by the parser."""
text = text.replace("\r", "\n")
text = text.replace(" ", " ")
text = re.sub(r"\n+", "\n", text)
text = re.sub(r"[ \t]+", " ", text)
return text.strip()
def get_page(layout: dict[str, Any]) -> int:
"""Return the page number for a layout record."""
return layout.get("pageNum", layout.get("pageNumber", 0))
def get_text(layout: dict[str, Any]) -> str:
"""Return the most useful text content for a layout record."""
text = normalize_text(layout.get("text", ""))
if text:
return text
return normalize_text(layout.get("markdownContent", ""))
def is_title(layout: dict[str, Any]) -> bool:
"""Return whether the layout should be treated as a title."""
return layout.get("type") == "title" or layout.get("subType") in TITLE_SUBTYPES
def is_text(layout: dict[str, Any]) -> bool:
"""Return whether the layout should be treated as plain paragraph text."""
return layout.get("type") == "text" and layout.get("subType", "none") in TEXT_SUBTYPES
def is_figure(layout: dict[str, Any]) -> bool:
"""Return whether the layout should be treated as figure-related content."""
return layout.get("type") in FIGURE_TYPES or layout.get("subType") in FIGURE_SUBTYPES
def is_table(layout: dict[str, Any]) -> bool:
"""Return whether the layout should be treated as a table."""
return layout.get("type") == "table"
def is_toc_layout(layout: dict[str, Any]) -> bool:
"""Return whether the layout appears to belong to a table of contents."""
text = get_text(layout)
if text in TOC_TITLES:
return True
if get_page(layout) == 1 and re.match(r"^\d+(\.\d+)*\s+.+[.。…]{2,}\s*\d+$", text):
return True
return False
def extract_table_text(layout: dict[str, Any]) -> str:
"""Flatten nested table cells into retrievable plain text."""
rows: list[str] = []
for cell in layout.get("cells", []):
texts: list[str] = []
for cell_layout in cell.get("layouts", []):
cell_text = normalize_text(cell_layout.get("text", ""))
if cell_text:
texts.append(cell_text)
if texts:
rows.append(" ".join(texts))
return "\n".join(rows).strip()
def build_structure_nodes(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Build the title hierarchy emitted to downstream storage."""
nodes: list[dict[str, Any]] = []
for layout in layouts:
if not is_title(layout):
continue
text = get_text(layout)
if not text or text in TOC_TITLES:
continue
nodes.append(
{
"unique_id": layout.get("uniqueId"),
"page": get_page(layout),
"index": layout.get("index", 0),
"level": layout.get("level", 0),
"title": text,
"type": layout.get("type"),
"sub_type": layout.get("subType"),
}
)
return nodes
def update_section_path(
section_stack: list[dict[str, Any]],
layout: dict[str, Any],
) -> list[dict[str, Any]]:
"""Update the current heading stack with a newly observed title layout."""
level = layout.get("level", 0)
title = get_text(layout)
while section_stack and section_stack[-1]["level"] >= level:
section_stack.pop()
section_stack.append(
{
"level": level,
"title": title,
"page": get_page(layout),
"unique_id": layout.get("uniqueId"),
}
)
return section_stack
def section_path_titles(section_stack: list[dict[str, Any]]) -> list[str]:
"""Return the title-only view of the current heading stack."""
return [item["title"] for item in section_stack]
def flush_text_block(
blocks: list[dict[str, Any]],
semantic_blocks: list[dict[str, Any]],
block_id: int,
) -> int:
"""Flush buffered paragraph layouts into a single semantic block."""
if not blocks:
return block_id
texts = [item["text"] for item in blocks if item["text"]]
merged_text = "\n".join(texts).strip()
if not merged_text:
return block_id
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "section_text",
"page_start": min(item["page"] for item in blocks),
"page_end": max(item["page"] for item in blocks),
"section_path": blocks[0]["section_path"],
"section_level": blocks[0]["section_level"],
"section_title": blocks[0]["section_title"],
"source_ids": [item["unique_id"] for item in blocks if item.get("unique_id")],
"text": merged_text,
}
)
return block_id + 1
def build_semantic_blocks(layouts: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Build semantic content blocks from raw Aliyun layouts."""
semantic_blocks: list[dict[str, Any]] = []
section_stack: list[dict[str, Any]] = []
pending_text_blocks: list[dict[str, Any]] = []
block_id = 1
skip_toc_page = False
for layout in layouts:
text = get_text(layout)
page = get_page(layout)
if is_toc_layout(layout):
skip_toc_page = True
continue
if skip_toc_page and page == 1:
continue
if skip_toc_page and page != 1:
skip_toc_page = False
if is_title(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
section_stack = update_section_path(section_stack, layout)
continue
section_path = section_path_titles(section_stack)
section_title = section_path[-1] if section_path else "未分类"
section_level = len(section_path)
if is_table(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
table_text = extract_table_text(layout)
if table_text:
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "table",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": table_text,
}
)
block_id += 1
continue
if is_figure(layout):
block_id = flush_text_block(pending_text_blocks, semantic_blocks, block_id)
pending_text_blocks = []
if text:
semantic_blocks.append(
{
"semantic_id": f"semantic-{block_id}",
"block_type": "figure",
"page_start": page,
"page_end": page,
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
"source_ids": [layout.get("uniqueId")],
"text": text,
}
)
block_id += 1
continue
if is_text(layout) and text:
pending_text_blocks.append(
{
"page": page,
"text": text,
"unique_id": layout.get("uniqueId"),
"section_path": section_path,
"section_level": section_level,
"section_title": section_title,
}
)
flush_text_block(pending_text_blocks, semantic_blocks, block_id)
return semantic_blocks
def split_text_with_overlap(text: str, max_chars: int, overlap_chars: int) -> list[str]:
"""Split long text into overlapping windows for embedding."""
text = text.strip()
if len(text) <= max_chars:
return [text] if text else []
parts: list[str] = []
start = 0
while start < len(text):
end = min(len(text), start + max_chars)
parts.append(text[start:end].strip())
if end >= len(text):
break
start = max(0, end - overlap_chars)
return [part for part in parts if part]
def build_vector_chunks(
semantic_blocks: list[dict[str, Any]],
*,
doc_id: str,
doc_title: str,
max_chars: int,
overlap_chars: int,
) -> list[dict[str, Any]]:
"""Build retrieval chunks from semantic blocks."""
vector_chunks: list[dict[str, Any]] = []
chunk_index = 1
for block in semantic_blocks:
pieces = split_text_with_overlap(block["text"], max_chars, overlap_chars)
for piece_index, piece in enumerate(pieces, start=1):
if block["section_path"]:
header = f"标准:{doc_title}\n章节:{' > '.join(block['section_path'])}\n\n"
else:
header = f"标准:{doc_title}\n\n"
# Preserve enriched embedding text so retrieval keeps section context.
vector_chunks.append(
{
"doc_id": doc_id,
"doc_title": doc_title,
"chunk_id": f"chunk-{chunk_index}",
"chunk_index": chunk_index,
"semantic_id": block["semantic_id"],
"chunk_type": block["block_type"],
"piece_index": piece_index,
"page_start": block["page_start"],
"page_end": block["page_end"],
"section_path": block["section_path"],
"section_level": block["section_level"],
"section_title": block["section_title"],
"source_ids": block["source_ids"],
"text": piece,
"embedding_text": header + piece,
}
)
chunk_index += 1
return vector_chunks
def convert_layouts(
layouts: list[dict[str, Any]],
*,
doc_id: str,
doc_title: str,
max_chars: int,
overlap_chars: int,
) -> dict[str, Any]:
"""Convert raw Aliyun layouts into the three-layer ingest payload."""
structure_nodes = build_structure_nodes(layouts)
semantic_blocks = build_semantic_blocks(layouts)
vector_chunks = build_vector_chunks(
semantic_blocks,
doc_id=doc_id,
doc_title=doc_title,
max_chars=max_chars,
overlap_chars=overlap_chars,
)
return {
"doc_id": doc_id,
"doc_title": doc_title,
"structure_nodes": structure_nodes,
"semantic_blocks": semantic_blocks,
"vector_chunks": vector_chunks,
}