Files
AIRegulation-DocAnalysis/backend/app/services/document_processor.py
ash66 30c7bda389 Refactor document handling and update Milvus collection settings
- Removed multiple failed document entries from `documents.json`.
- Added a new document entry with updated metadata and changed the index name to `regulations_dense_1024_v2`.
- Updated architecture documentation to reflect changes in the Milvus collection name.
- Adjusted requirements by removing the sqlalchemy dependency.
- Modified test cases to align with new document structure and naming conventions.
- Introduced a new test file for Milvus vector index runtime recovery and error handling.
- Updated assertions in various test files to ensure compatibility with the new schema.
2026-05-26 20:21:31 +08:00

87 lines
2.8 KiB
Python

"""Provide service-layer logic for document processor."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from app.shared.bootstrap import get_document_command_service, get_retrieval_service
# Keep service responsibilities explicit so downstream behavior stays predictable.
@dataclass
class ProcessingResult:
"""Represent the Processing Result type."""
doc_id: str
doc_name: str
success: bool
num_chunks: int = 0
message: str = ""
markdown_text: str = ""
summary: str = ""
summary_latency_ms: int = 0
class DocumentProcessor:
"""Represent the Document Processor type."""
def __init__(self, *args, generate_summary: bool = False, **kwargs):
"""Initialize the Document Processor instance."""
self.generate_summary = generate_summary
def process(
self,
file_path: str,
doc_id: Optional[str] = None,
doc_name: Optional[str] = None,
regulation_type: str = "",
version: str = "",
) -> ProcessingResult:
"""Handle process for the Document Processor instance."""
path = Path(file_path)
content = path.read_bytes()
result = get_document_command_service().upload_and_process(
doc_id=doc_id,
file_name=path.name,
content=content,
content_type="application/octet-stream",
doc_name=doc_name or path.name,
regulation_type=regulation_type,
version=version,
generate_summary=self.generate_summary,
)
return ProcessingResult(
doc_id=result.doc_id,
doc_name=result.doc_name,
success=result.status != "failed",
num_chunks=result.num_chunks,
message=result.message,
summary=result.summary,
summary_latency_ms=result.summary_latency_ms,
)
def search(self, query: str, top_k: int = 10, filters: str | None = None) -> list[dict]:
"""Handle search for the Document Processor instance."""
results = get_retrieval_service().retrieve(query=query, top_k=top_k, filters=filters)
return [
{
"id": item.chunk_id,
"content": item.text,
"score": item.score,
"metadata": {
"doc_id": item.doc_id,
"doc_name": item.doc_title,
"chunk_id": item.chunk_id,
"section_title": item.section_title,
"page_number": item.page_start,
**item.metadata,
},
}
for item in results
]
def close(self):
"""Release the resources held by this component."""
return None