256 lines
11 KiB
Python
256 lines
11 KiB
Python
"""Provide service-layer logic for pdf parser."""
|
||
|
||
import fitz # PyMuPDF
|
||
from typing import List, Dict, Optional, Tuple
|
||
from dataclasses import dataclass, field
|
||
from loguru import logger
|
||
import re
|
||
|
||
|
||
@dataclass
|
||
class PDFPageContent:
|
||
"""Represent the P D F Page Content type."""
|
||
page_number: int
|
||
text: str
|
||
tables: List[str] = field(default_factory=list)
|
||
images: List[str] = field(default_factory=list) # Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
blocks: List[Dict] = field(default_factory=list)
|
||
|
||
|
||
@dataclass
|
||
class PDFDocumentContent:
|
||
"""Represent the P D F Document Content type."""
|
||
file_path: str
|
||
total_pages: int
|
||
pages: List[PDFPageContent]
|
||
metadata: Dict[str, str] = field(default_factory=dict)
|
||
markdown_text: str = ""
|
||
|
||
|
||
class PDFParser:
|
||
"""Provide the P D F Parser parser."""
|
||
|
||
def __init__(self):
|
||
"""Initialize the P D F Parser instance."""
|
||
self.pdf = None
|
||
|
||
def parse(self, file_path: str, extract_tables: bool = True, extract_images: bool = False) -> PDFDocumentContent:
|
||
"""Handle parse for the P D F Parser instance."""
|
||
logger.info(f"开始解析PDF文档: {file_path}")
|
||
|
||
try:
|
||
self.pdf = fitz.open(file_path)
|
||
doc_content = PDFDocumentContent(
|
||
file_path=file_path,
|
||
total_pages=self.pdf.page_count,
|
||
pages=[]
|
||
)
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
doc_content.metadata = self._extract_metadata()
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
for page_num in range(self.pdf.page_count):
|
||
page = self.pdf[page_num]
|
||
page_content = self._parse_page(page, page_num + 1, extract_tables, extract_images)
|
||
doc_content.pages.append(page_content)
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
doc_content.markdown_text = self._generate_markdown(doc_content)
|
||
|
||
self.pdf.close()
|
||
logger.success(f"PDF解析完成,共{doc_content.total_pages}页")
|
||
|
||
return doc_content
|
||
|
||
except Exception as e:
|
||
logger.error(f"PDF解析失败: {e}")
|
||
raise
|
||
|
||
def _extract_metadata(self) -> Dict[str, str]:
|
||
"""Handle extract metadata for this module for the P D F Parser instance."""
|
||
metadata = {}
|
||
try:
|
||
meta = self.pdf.metadata
|
||
metadata = {
|
||
"title": meta.get("title", ""),
|
||
"author": meta.get("author", ""),
|
||
"subject": meta.get("subject", ""),
|
||
"keywords": meta.get("keywords", ""),
|
||
"creator": meta.get("creator", ""),
|
||
"producer": meta.get("producer", ""),
|
||
"creation_date": meta.get("creationDate", ""),
|
||
"mod_date": meta.get("modDate", ""),
|
||
}
|
||
except Exception as e:
|
||
logger.warning(f"提取元数据失败: {e}")
|
||
return metadata
|
||
|
||
def _parse_page(self, page: fitz.Page, page_num: int,
|
||
extract_tables: bool, extract_images: bool) -> PDFPageContent:
|
||
"""Handle parse page for this module for the P D F Parser instance."""
|
||
page_content = PDFPageContent(page_number=page_num, text="")
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
blocks = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE)["blocks"]
|
||
page_content.blocks = blocks
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
text = page.get_text("text", flags=fitz.TEXT_PRESERVE_WHITESPACE)
|
||
page_content.text = text.strip()
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
if extract_tables:
|
||
tables = self._extract_tables_from_page(page)
|
||
page_content.tables = tables
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
if extract_images:
|
||
images = self._extract_images_from_page(page, page_num)
|
||
page_content.images = images
|
||
|
||
return page_content
|
||
|
||
def _extract_tables_from_page(self, page: fitz.Page) -> List[str]:
|
||
"""Handle extract tables from page for this module for the P D F Parser instance."""
|
||
tables = []
|
||
|
||
try:
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
tabs = page.find_tables()
|
||
if tabs:
|
||
for tab in tabs:
|
||
table_text = tab.extract()
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
markdown_table = self._table_to_markdown(table_text)
|
||
tables.append(markdown_table)
|
||
|
||
except AttributeError:
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
logger.warning("PyMuPDF版本不支持表格提取,请升级到2.4+版本")
|
||
except Exception as e:
|
||
logger.warning(f"表格提取失败: {e}")
|
||
|
||
return tables
|
||
|
||
def _table_to_markdown(self, table_data: List[List[str]]) -> str:
|
||
"""Handle table to markdown for this module for the P D F Parser instance."""
|
||
if not table_data or len(table_data) < 1:
|
||
return ""
|
||
|
||
lines = []
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
if len(table_data) >= 1:
|
||
header = table_data[0]
|
||
lines.append("| " + " | ".join(str(cell).strip() for cell in header) + " |")
|
||
lines.append("| " + " | ".join("---" for _ in header) + " |")
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
for row in table_data[1:]:
|
||
lines.append("| " + " | ".join(str(cell).strip() for cell in row) + " |")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _extract_images_from_page(self, page: fitz.Page, page_num: int) -> List[str]:
|
||
"""Handle extract images from page for this module for the P D F Parser instance."""
|
||
images = []
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
try:
|
||
image_list = page.get_images()
|
||
for img_index, img in enumerate(image_list):
|
||
xref = img[0]
|
||
images.append(f"image_p{page_num}_i{img_index}_xref{xref}")
|
||
except Exception as e:
|
||
logger.warning(f"图片提取失败: {e}")
|
||
return images
|
||
|
||
def _generate_markdown(self, doc_content: PDFDocumentContent) -> str:
|
||
"""Handle generate markdown for this module for the P D F Parser instance."""
|
||
lines = []
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
title = doc_content.metadata.get("title", "")
|
||
if title:
|
||
lines.append(f"# {title}\n")
|
||
else:
|
||
lines.append(f"# {doc_content.file_path}\n")
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
lines.append("\n## 文档信息\n")
|
||
for key, value in doc_content.metadata.items():
|
||
if value and key in ["author", "subject", "keywords", "creation_date"]:
|
||
lines.append(f"- **{key}**: {value}")
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
lines.append("\n## 正文\n")
|
||
|
||
for page in doc_content.pages:
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
lines.append(f"\n---\n**第 {page.page_number} 页**\n")
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
text = self._process_page_text(page.text, page.blocks)
|
||
lines.append(text)
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
for table in page.tables:
|
||
lines.append("\n" + table + "\n")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _process_page_text(self, text: str, blocks: List[Dict]) -> str:
|
||
"""Handle process page text for this module for the P D F Parser instance."""
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
processed_text = text
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
processed_text = self._detect_headers(text, blocks)
|
||
|
||
return processed_text
|
||
|
||
def _detect_headers(self, text: str, blocks: List[Dict]) -> str:
|
||
"""Handle detect headers for this module for the P D F Parser instance."""
|
||
lines = text.split("\n")
|
||
processed_lines = []
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
if re.match(r'^第[一二三四五六七八九十百]+章\s', line):
|
||
processed_lines.append(f"\n## {line}\n")
|
||
elif re.match(r'^第[一二三四五六七八九十百]+节\s', line):
|
||
processed_lines.append(f"\n### {line}\n")
|
||
elif re.match(r'^第[一二三四五六七八九十百]+条\s', line):
|
||
processed_lines.append(f"\n#### {line}\n")
|
||
elif re.match(r'^[一二三四五六七八九十]+\s*[、.]', line):
|
||
# Keep service responsibilities explicit so downstream behavior stays predictable.
|
||
processed_lines.append(f"- {line}")
|
||
else:
|
||
processed_lines.append(line)
|
||
|
||
return "\n".join(processed_lines)
|
||
|
||
def parse_to_markdown(self, file_path: str) -> str:
|
||
"""Parse to markdown for the P D F Parser instance."""
|
||
doc_content = self.parse(file_path)
|
||
return doc_content.markdown_text
|
||
|
||
|
||
def parse_pdf(file_path: str, **kwargs) -> PDFDocumentContent:
|
||
"""Parse pdf."""
|
||
parser = PDFParser()
|
||
return parser.parse(file_path, **kwargs)
|
||
|
||
|
||
def parse_pdf_to_markdown(file_path: str) -> str:
|
||
"""Parse pdf to markdown."""
|
||
parser = PDFParser()
|
||
return parser.parse_to_markdown(file_path)
|