137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
import os
|
||
import tempfile
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
from fastapi import FastAPI, UploadFile, File, HTTPException
|
||
from pydantic import BaseModel
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DEVICE = os.getenv("DEVICE", "cpu")
|
||
UPLOAD_DIR = Path(os.getenv("UPLOAD_DIR", "/app/uploads"))
|
||
PARSED_DIR = Path(os.getenv("PARSED_DIR", "/app/parsed"))
|
||
|
||
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||
PARSED_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
app = FastAPI(title="MinerU 文档解析服务")
|
||
|
||
SUPPORTED_TYPES = {
|
||
"application/pdf": "pdf",
|
||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
|
||
"application/msword": "doc",
|
||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx",
|
||
}
|
||
|
||
|
||
def parse_pdf_mineru(pdf_path: str) -> str:
|
||
"""使用 MinerU 解析 PDF"""
|
||
try:
|
||
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
|
||
from magic_pdf.pipe.UnicodeFormulaPDFPipe import UnicodeFormulaPDFPipe
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
writer = FileBasedDataWriter(tmpdir)
|
||
pipe = UnicodeFormulaPDFPipe(pdf_path, writer)
|
||
pipe.pipe_classify()
|
||
pipe.pipe_analyze()
|
||
pipe.pipe_parse()
|
||
md_content = pipe.pipe_mk_uni_format(tmpdir, drop_mode="none")
|
||
return md_content or ""
|
||
except Exception as e:
|
||
logger.warning(f"MinerU 解析失败,降级到 PyMuPDF:{e}")
|
||
return parse_pdf_pymupdf(pdf_path)
|
||
|
||
|
||
def parse_pdf_pymupdf(pdf_path: str) -> str:
|
||
"""降级:使用 PyMuPDF 提取文本"""
|
||
try:
|
||
import fitz # PyMuPDF
|
||
doc = fitz.open(pdf_path)
|
||
pages = []
|
||
for i, page in enumerate(doc):
|
||
text = page.get_text()
|
||
if text.strip():
|
||
pages.append(f"## 第 {i+1} 页\n\n{text}")
|
||
return "\n\n".join(pages)
|
||
except Exception as e:
|
||
return f"[解析失败:{e}]"
|
||
|
||
|
||
def parse_docx(file_path: str) -> str:
|
||
"""解析 Word 文档"""
|
||
try:
|
||
from docx import Document
|
||
doc = Document(file_path)
|
||
parts = []
|
||
for para in doc.paragraphs:
|
||
if para.text.strip():
|
||
style = para.style.name if para.style else ""
|
||
if "Heading" in style:
|
||
level = style.replace("Heading ", "").strip()
|
||
try:
|
||
prefix = "#" * int(level)
|
||
except ValueError:
|
||
prefix = "##"
|
||
parts.append(f"{prefix} {para.text}")
|
||
else:
|
||
parts.append(para.text)
|
||
for table in doc.tables:
|
||
rows = []
|
||
for row in table.rows:
|
||
rows.append(" | ".join(cell.text.strip() for cell in row.cells))
|
||
if rows:
|
||
parts.append("\n".join(rows))
|
||
return "\n\n".join(parts)
|
||
except Exception as e:
|
||
return f"[Word 解析失败:{e}]"
|
||
|
||
|
||
class ParseResponse(BaseModel):
|
||
filename: str
|
||
markdown: str
|
||
page_count: int
|
||
parser: str
|
||
|
||
|
||
@app.post("/mineru-parse", response_model=ParseResponse)
|
||
async def mineru_parse(file: UploadFile = File(...)) -> ParseResponse:
|
||
content = await file.read()
|
||
suffix = Path(file.filename or "doc.pdf").suffix.lower()
|
||
|
||
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
|
||
tmp.write(content)
|
||
tmp_path = tmp.name
|
||
|
||
try:
|
||
if suffix == ".pdf":
|
||
markdown = parse_pdf_mineru(tmp_path)
|
||
parser = "mineru"
|
||
elif suffix in (".docx", ".doc"):
|
||
markdown = parse_docx(tmp_path)
|
||
parser = "python-docx"
|
||
else:
|
||
raise HTTPException(status_code=415, detail=f"不支持的文件类型:{suffix}")
|
||
|
||
page_count = markdown.count("## 第") if suffix == ".pdf" else markdown.count("\n\n")
|
||
return ParseResponse(
|
||
filename=file.filename or "unknown",
|
||
markdown=markdown,
|
||
page_count=max(page_count, 1),
|
||
parser=parser,
|
||
)
|
||
finally:
|
||
os.unlink(tmp_path)
|
||
|
||
|
||
@app.post("/parse-document", response_model=ParseResponse)
|
||
async def parse_document(file: UploadFile = File(...)) -> ParseResponse:
|
||
return await mineru_parse(file)
|
||
|
||
|
||
@app.get("/health")
|
||
def health():
|
||
return {"status": "ok", "device": DEVICE}
|