Files
AIRegulation-DocAnalysis/backend/app/services/parser/pdf_parser.py

256 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Provide service-layer logic for pdf parser."""
import fitz # PyMuPDF
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
from loguru import logger
import re
@dataclass
class PDFPageContent:
"""Represent the P D F Page Content type."""
page_number: int
text: str
tables: List[str] = field(default_factory=list)
images: List[str] = field(default_factory=list) # Keep service responsibilities explicit so downstream behavior stays predictable.
blocks: List[Dict] = field(default_factory=list)
@dataclass
class PDFDocumentContent:
"""Represent the P D F Document Content type."""
file_path: str
total_pages: int
pages: List[PDFPageContent]
metadata: Dict[str, str] = field(default_factory=dict)
markdown_text: str = ""
class PDFParser:
"""Provide the P D F Parser parser."""
def __init__(self):
"""Initialize the P D F Parser instance."""
self.pdf = None
def parse(self, file_path: str, extract_tables: bool = True, extract_images: bool = False) -> PDFDocumentContent:
"""Handle parse for the P D F Parser instance."""
logger.info(f"开始解析PDF文档: {file_path}")
try:
self.pdf = fitz.open(file_path)
doc_content = PDFDocumentContent(
file_path=file_path,
total_pages=self.pdf.page_count,
pages=[]
)
# Keep service responsibilities explicit so downstream behavior stays predictable.
doc_content.metadata = self._extract_metadata()
# Keep service responsibilities explicit so downstream behavior stays predictable.
for page_num in range(self.pdf.page_count):
page = self.pdf[page_num]
page_content = self._parse_page(page, page_num + 1, extract_tables, extract_images)
doc_content.pages.append(page_content)
# Keep service responsibilities explicit so downstream behavior stays predictable.
doc_content.markdown_text = self._generate_markdown(doc_content)
self.pdf.close()
logger.success(f"PDF解析完成{doc_content.total_pages}")
return doc_content
except Exception as e:
logger.error(f"PDF解析失败: {e}")
raise
def _extract_metadata(self) -> Dict[str, str]:
"""Handle extract metadata for this module for the P D F Parser instance."""
metadata = {}
try:
meta = self.pdf.metadata
metadata = {
"title": meta.get("title", ""),
"author": meta.get("author", ""),
"subject": meta.get("subject", ""),
"keywords": meta.get("keywords", ""),
"creator": meta.get("creator", ""),
"producer": meta.get("producer", ""),
"creation_date": meta.get("creationDate", ""),
"mod_date": meta.get("modDate", ""),
}
except Exception as e:
logger.warning(f"提取元数据失败: {e}")
return metadata
def _parse_page(self, page: fitz.Page, page_num: int,
extract_tables: bool, extract_images: bool) -> PDFPageContent:
"""Handle parse page for this module for the P D F Parser instance."""
page_content = PDFPageContent(page_number=page_num, text="")
# Keep service responsibilities explicit so downstream behavior stays predictable.
blocks = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE)["blocks"]
page_content.blocks = blocks
# Keep service responsibilities explicit so downstream behavior stays predictable.
text = page.get_text("text", flags=fitz.TEXT_PRESERVE_WHITESPACE)
page_content.text = text.strip()
# Keep service responsibilities explicit so downstream behavior stays predictable.
if extract_tables:
tables = self._extract_tables_from_page(page)
page_content.tables = tables
# Keep service responsibilities explicit so downstream behavior stays predictable.
if extract_images:
images = self._extract_images_from_page(page, page_num)
page_content.images = images
return page_content
def _extract_tables_from_page(self, page: fitz.Page) -> List[str]:
"""Handle extract tables from page for this module for the P D F Parser instance."""
tables = []
try:
# Keep service responsibilities explicit so downstream behavior stays predictable.
# Keep service responsibilities explicit so downstream behavior stays predictable.
tabs = page.find_tables()
if tabs:
for tab in tabs:
table_text = tab.extract()
# Keep service responsibilities explicit so downstream behavior stays predictable.
markdown_table = self._table_to_markdown(table_text)
tables.append(markdown_table)
except AttributeError:
# Keep service responsibilities explicit so downstream behavior stays predictable.
logger.warning("PyMuPDF版本不支持表格提取请升级到2.4+版本")
except Exception as e:
logger.warning(f"表格提取失败: {e}")
return tables
def _table_to_markdown(self, table_data: List[List[str]]) -> str:
"""Handle table to markdown for this module for the P D F Parser instance."""
if not table_data or len(table_data) < 1:
return ""
lines = []
# Keep service responsibilities explicit so downstream behavior stays predictable.
if len(table_data) >= 1:
header = table_data[0]
lines.append("| " + " | ".join(str(cell).strip() for cell in header) + " |")
lines.append("| " + " | ".join("---" for _ in header) + " |")
# Keep service responsibilities explicit so downstream behavior stays predictable.
for row in table_data[1:]:
lines.append("| " + " | ".join(str(cell).strip() for cell in row) + " |")
return "\n".join(lines)
def _extract_images_from_page(self, page: fitz.Page, page_num: int) -> List[str]:
"""Handle extract images from page for this module for the P D F Parser instance."""
images = []
# Keep service responsibilities explicit so downstream behavior stays predictable.
# Keep service responsibilities explicit so downstream behavior stays predictable.
try:
image_list = page.get_images()
for img_index, img in enumerate(image_list):
xref = img[0]
images.append(f"image_p{page_num}_i{img_index}_xref{xref}")
except Exception as e:
logger.warning(f"图片提取失败: {e}")
return images
def _generate_markdown(self, doc_content: PDFDocumentContent) -> str:
"""Handle generate markdown for this module for the P D F Parser instance."""
lines = []
# Keep service responsibilities explicit so downstream behavior stays predictable.
title = doc_content.metadata.get("title", "")
if title:
lines.append(f"# {title}\n")
else:
lines.append(f"# {doc_content.file_path}\n")
# Keep service responsibilities explicit so downstream behavior stays predictable.
lines.append("\n## 文档信息\n")
for key, value in doc_content.metadata.items():
if value and key in ["author", "subject", "keywords", "creation_date"]:
lines.append(f"- **{key}**: {value}")
# Keep service responsibilities explicit so downstream behavior stays predictable.
lines.append("\n## 正文\n")
for page in doc_content.pages:
# Keep service responsibilities explicit so downstream behavior stays predictable.
lines.append(f"\n---\n**第 {page.page_number} 页**\n")
# Keep service responsibilities explicit so downstream behavior stays predictable.
text = self._process_page_text(page.text, page.blocks)
lines.append(text)
# Keep service responsibilities explicit so downstream behavior stays predictable.
for table in page.tables:
lines.append("\n" + table + "\n")
return "\n".join(lines)
def _process_page_text(self, text: str, blocks: List[Dict]) -> str:
"""Handle process page text for this module for the P D F Parser instance."""
# Keep service responsibilities explicit so downstream behavior stays predictable.
processed_text = text
# Keep service responsibilities explicit so downstream behavior stays predictable.
# Keep service responsibilities explicit so downstream behavior stays predictable.
processed_text = self._detect_headers(text, blocks)
return processed_text
def _detect_headers(self, text: str, blocks: List[Dict]) -> str:
"""Handle detect headers for this module for the P D F Parser instance."""
lines = text.split("\n")
processed_lines = []
for line in lines:
line = line.strip()
if not line:
continue
# Keep service responsibilities explicit so downstream behavior stays predictable.
# Keep service responsibilities explicit so downstream behavior stays predictable.
if re.match(r'^第[一二三四五六七八九十百]+章\s', line):
processed_lines.append(f"\n## {line}\n")
elif re.match(r'^第[一二三四五六七八九十百]+节\s', line):
processed_lines.append(f"\n### {line}\n")
elif re.match(r'^第[一二三四五六七八九十百]+条\s', line):
processed_lines.append(f"\n#### {line}\n")
elif re.match(r'^[一二三四五六七八九十]+\s*[、.]', line):
# Keep service responsibilities explicit so downstream behavior stays predictable.
processed_lines.append(f"- {line}")
else:
processed_lines.append(line)
return "\n".join(processed_lines)
def parse_to_markdown(self, file_path: str) -> str:
"""Parse to markdown for the P D F Parser instance."""
doc_content = self.parse(file_path)
return doc_content.markdown_text
def parse_pdf(file_path: str, **kwargs) -> PDFDocumentContent:
"""Parse pdf."""
parser = PDFParser()
return parser.parse(file_path, **kwargs)
def parse_pdf_to_markdown(file_path: str) -> str:
"""Parse pdf to markdown."""
parser = PDFParser()
return parser.parse_to_markdown(file_path)