Files
catonline_ai/vw-document-ai-indexer/business_layer.py

624 lines
29 KiB
Python
Raw Permalink Normal View History

2025-09-26 17:15:54 +08:00
""" business_layer.py
This module contains the business logic for document processing."""
import os
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
import traceback
import datetime
from collections import Counter
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
from azure.core.pipeline.policies import RetryPolicy
from app_config import ApplicationConfig, ServiceFactory
from chunk_service import chunk_di_doc
from entity_models import Document, ChunkingResult,DiResult
from database import DatabaseInterface, IndexObject, IndexObjectStatus,LegacyDatabaseAdapter
from di_extractor import di_extract
from blob_service import blob_exists, blob_upload_content, blob_upload_object, downloadToLocalFolder, load_content
from utils import replace_urls_in_content, write_content,write_document,asdict_with_dynamic
from azure_index_service import upload_merge_index, delete_documents_by_field,query_by_field
from vllm_extractor import process_document_figures
class SingletonFormRecognizerClient:
instance = None
def __new__(cls, *args, **kwargs):
if not cls.instance:
extract_method = os.environ.get("extract_method", "default")
if extract_method == "vision-llm":
cls.instance = object() # dummy object
else:
url = os.getenv("form_rec_resource")
key = os.getenv("form_rec_key")
if url and key:
print("SingletonFormRecognizerClient: Creating instance of Form recognizer per process")
retry = RetryPolicy(total_retries=5,connect_retries=3,read_retries=3,backoff_factor=0.8,retry_backoff_max=60)
cls.instance = DocumentIntelligenceClient(endpoint=url, credential=AzureKeyCredential(key), retry_policy=retry, connection_timeout=1200,read_timeout=1200)
else:
print("SingletonFormRecognizerClient: Skipping since credentials not provided. Assuming NO form recognizer extensions(like .pdf) in directory")
cls.instance = object() # dummy object
return cls.instance
def __getstate__(self)->tuple[Any,Any]:
return self.url, self.key
def __setstate__(self, state):
url, key = state
retry = RetryPolicy(total_retries=5,connect_retries=3,read_retries=3,backoff_factor=0.8,retry_backoff_max=60)
self.instance = DocumentIntelligenceClient(endpoint=url, credential=AzureKeyCredential(key), retry_policy=retry, connection_timeout=1200,read_timeout=1200)
@dataclass
class ProcessingContext:
"""Processing Context"""
object_key: str
data_config: Dict[str, Any]
metadata: Dict[str, Any]
retry_count: int = 0
error_message: Optional[str] = None
current_tmp_directory: str = ""
datasource_name: str = ""
config: ApplicationConfig | None = None
@dataclass
class ProcessingResult:
"""Processing Result"""
status: IndexObjectStatus
object_key: str
message: str
processing_time: float
chunks_count: int = 0
error: Optional[Exception] = None
# Keep only the DocumentRepository interface, other services directly use the specific implementation
class DocumentRepository(ABC):
"""Document Repository Interface"""
@abstractmethod
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
"""Get index object"""
pass
@abstractmethod
def save_index_object(self, index_object: IndexObject) -> None:
"""Save index object"""
pass
@abstractmethod
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None:
"""Update processing status"""
pass
# Application service layer
class DocumentProcessingOrchestrator:
"""Document Processing Orchestrator (Application Service Layer)"""
def __init__(self,
extraction_service: 'DocumentIntelligenceExtractionService',
chunking_service: 'DefaultDocumentChunkingService',
indexing_service: 'AzureSearchIndexingService',
metadata_service: 'BlobMetadataService',
repository: DocumentRepository):
self.extraction_service = extraction_service
self.chunking_service = chunking_service
self.indexing_service = indexing_service
self.metadata_service = metadata_service
self.repository = repository
def process_document(self, context: ProcessingContext) -> ProcessingResult:
"""Main process for handling a single document"""
start_time = datetime.datetime.now()
# 1. Get or create index object
index_object = self._get_or_create_index_object(context)
# if not index_object:
# raise ValueError(f"Failed to create or retrieve index object for {context.object_key}")
try:
# 2. Check retry count
# If the current processing object's time is updated, reset the retry count, and execute the subsequent logic. The comparison dimensions are the last failed document modification time and metadata modification time
if index_object.last_fail_doc_modifed_time != context.metadata.get("doc_modified_time") or index_object.last_fail_metadata_modifed_time != context.metadata.get("metadata_modified_time"):
index_object.try_count = 0
if index_object.status in ["processing", "failed"]:
# Check if the maximum retry count has been reached
if index_object.try_count >= 3:
return ProcessingResult(status=IndexObjectStatus.FAILED, object_key=context.object_key, message=f"Object has been retried {index_object.try_count} times, skipping processing", processing_time=0)
# Increase the retry count and save immediately
index_object.try_count += 1
# Immediately save the retry count update
self.repository.save_index_object(index_object)
# 3. Update status to processing
self.repository.update_processing_status(context.object_key,context.datasource_name, IndexObjectStatus.PROCESSING)
# 4. Check if processing is needed (metadata and document modification times)
meta_update_flag = self._should_process_metadata(index_object, context)
doc_update_flag = self._should_process_document(index_object, context)
chunks_count = 0
# 5. Process metadata index (if update is needed)
if meta_update_flag:
self._process_metadata_indexes(context)
# 6. Process document and chunk indexes (Important: Only process when meta_update_flag OR doc_update_flag=True)
if meta_update_flag or doc_update_flag:
chunks_count = self._process_document_and_chunks(context, doc_update_flag)
# 7. Update the modification time of the index object
if meta_update_flag:
index_object.metadata_modifed_time = context.metadata.get("metadata_modified_time")
if doc_update_flag:
index_object.doc_modifed_time = context.metadata.get("doc_modified_time")
index_object.status = IndexObjectStatus.SUCCESS.value
if index_object.metadata_modifed_time is None:
index_object.metadata_modifed_time = context.metadata.get("metadata_modified_time")
self.repository.save_index_object(index_object)
processing_time = (datetime.datetime.now() - start_time).total_seconds()
return ProcessingResult(status=IndexObjectStatus.SUCCESS, object_key=context.object_key, message=f"Successfully processed {chunks_count} chunks", processing_time=processing_time, chunks_count=chunks_count)
except Exception as e:
error_message:str = traceback.format_exc()
index_object.status = IndexObjectStatus.FAILED.value
index_object.last_fail_doc_modifed_time = context.metadata.get("doc_modified_time")
index_object.last_fail_metadata_modifed_time = context.metadata.get("metadata_modified_time")
self.repository.save_index_object(index_object)
processing_time = (datetime.datetime.now() - start_time).total_seconds()
return ProcessingResult(status=IndexObjectStatus.FAILED, object_key=context.object_key, message=f"Processing failed: {error_message}", processing_time=processing_time, error=e )
def _get_or_create_index_object(self, context: ProcessingContext) -> IndexObject:
"""Get or create index object"""
index_object = self.repository.get_index_object(context.object_key,context.datasource_name)
if not index_object:
index_object = IndexObject(
object_key=context.object_key,
type="document",
status=IndexObjectStatus.PROCESSING.value,
datasource_name=context.datasource_name
)
self.repository.save_index_object(index_object)
return index_object
def _should_process(self, index_object: IndexObject, context: ProcessingContext) -> bool:
"""Determine whether processing is needed (keep the original logic for backward compatibility)"""
return self._should_process_metadata(index_object, context) or self._should_process_document(index_object, context)
def _should_process_metadata(self, index_object: IndexObject, context: ProcessingContext) -> bool:
"""Determine whether metadata processing is needed"""
if 'metadata_modified_time' in context.metadata:
metadata_modified_time = context.metadata['metadata_modified_time']
if index_object.metadata_modifed_time is None:
return True
if metadata_modified_time is not None and metadata_modified_time > index_object.metadata_modifed_time:
return True
return False
def _should_process_document(self, index_object: IndexObject, context: ProcessingContext) -> bool:
"""Determine whether document processing is needed"""
if 'doc_modified_time' in context.metadata:
doc_modified_time = context.metadata['doc_modified_time']
if index_object.doc_modifed_time is None:
return True
if doc_modified_time is not None and doc_modified_time > index_object.doc_modifed_time:
return True
return False
def _process_metadata_indexes(self, context: ProcessingContext) -> None:
"""Process metadata index"""
# Push metadata index - only process index with data_type of ["metadata"]
meta_index_schemas = [schema for schema in context.data_config["index_schemas"] if Counter(schema["data_type"]) == Counter(["metadata"])]
if not any(meta_index_schemas):
return
# Get metadata - from metadata service
doc_meta = self.metadata_service.get_metadata(context.object_key)
# Metadata must not be empty, use empty dictionary as default value
if not doc_meta:
raise ValueError(f"Metadata for object {context.object_key} not found")
for meta_index_schema in meta_index_schemas:
self.indexing_service.index_metadata(doc_meta, meta_index_schema, context)
def _process_document_and_chunks(self, context: ProcessingContext, doc_update_flag: bool) -> int:
"""Process document and chunk indexes, return the number of processed chunks"""
doc_dict = {}
chunk_dict = []
chunks_count = 0
# Update document dictionary with metadata
doc_meta = self.metadata_service.get_metadata(context.object_key)
language_code = doc_meta.get("language_code", "zh-Hans") # Default to "zh-Hans" if not specified
# Future error or skip operation if no doc_meta configuration file
if not doc_meta:
doc_meta={}
# If the document needs to be updated, re-extract and chunk
if doc_update_flag:
# Extract document
document = self.extraction_service.extract_document(context, language_code)
document.title = os.path.splitext(context.object_key)[0]
# Chunk processing
chunking_result = self.chunking_service.chunk_document(document, context)
chunks_count = len(chunking_result.chunks)
# Convert to dictionary format
doc_dict = self._convert_document_to_dict(document)
chunk_dict = [self._convert_document_to_dict(chunk) for chunk in chunking_result.chunks]
# Process document index - data_type is ["metadata","document"]
document_index_schemas = [schema for schema in context.data_config["index_schemas"] if Counter(schema["data_type"]) == Counter(["metadata","document"]) or Counter(schema["data_type"]) == Counter(["document"])]
for document_index_schema in document_index_schemas:
if not doc_update_flag:
# Get existing document data from Azure Search Index
existing_docs = self.indexing_service.get_existing_document_data(
context.object_key, document_index_schema["index_name"],
document_index_schema["update_by_field"]
)
if existing_docs:
doc_dict = existing_docs
doc_dict.update({k: doc_meta[k] for k in document_index_schema["fields"] if k in doc_meta})
# Upload document index
self.indexing_service.index_document_with_schema(doc_dict, document_index_schema, context)
# Process chunk index - data_type is ["metadata","document","chunk"]
chunk_index_schemas = [schema for schema in context.data_config["index_schemas"] if Counter(schema["data_type"]) == Counter(["metadata","document","chunk"]) or Counter(schema["data_type"]) == Counter(["chunk"])]
for index_schema in chunk_index_schemas:
current_chunk_dict = chunk_dict # Use existing chunk_dict
current_chunks_count = chunks_count # Use existing chunks_count
if not doc_update_flag:
# Get existing chunk data from Azure Search Index
current_chunk_dict = self.indexing_service.get_existing_chunk_data(context.object_key, index_schema["index_name"], index_schema["update_by_field"])
current_chunks_count = len(current_chunk_dict) if current_chunk_dict else 0
# Update the total chunks_count (for return value)
chunks_count = current_chunks_count
for chunk in current_chunk_dict if current_chunk_dict else []:
chunk.update({k: doc_meta[k] for k in index_schema["fields"] if k in doc_meta})
# Delete old chunk data
self.indexing_service.delete_chunks_by_field(index_schema["index_name"], index_schema["update_by_field"], doc_dict.get(index_schema["update_by_field"], context.object_key))
# Upload new chunk data
if current_chunk_dict:
self.indexing_service.index_chunks_with_schema(current_chunk_dict, index_schema, context)
return chunks_count
def _convert_document_to_dict(self, document:Document) -> Dict[str, Any]:
"""Convert Document object to dictionary"""
try:
# Use the original asdict_with_dynamic function to maintain compatibility
return asdict_with_dynamic(document)
except Exception:
# If asdict_with_dynamic fails, use the fallback method
if hasattr(document, '__dict__'):
return document.__dict__.copy()
elif hasattr(document, 'to_dict'):
return document.to_dict()
else:
# If all fails, return empty dictionary
return {}
# Infrastructure layer implementation
class SqlAlchemyDocumentRepository(DocumentRepository):
"""SQLAlchemy-based document repository implementation"""
def __init__(self, database_interface: DatabaseInterface):
self.database_interface = database_interface
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
"""Get index object"""
return self.database_interface.get_index_object(object_key,datasource_name)
def save_index_object(self, index_object: IndexObject) -> None:
"""Save index object"""
self.database_interface.save_index_object(index_object)
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus,
message: str = None) -> None:
"""Update processing status"""
# Convert business layer status to database status
self.database_interface.update_processing_status(object_key,datasource_name, status, message)
# Concrete implementation class
class DocumentIntelligenceExtractionService:
"""Document extraction service based on Document Intelligence"""
def __init__(self, form_recognizer_client: DocumentIntelligenceClient, vllm_endpoint, vllm_key, tmp_directory, data_directory=None,di_sas_url=None, figure_sas_url=None):
self.form_recognizer_client: DocumentIntelligenceClient = form_recognizer_client
self.vllm_endpoint: str = vllm_endpoint
self.vllm_key: str = vllm_key
self.tmp_directory: str = tmp_directory
self.data_directory: str = data_directory or ""
self.di_sas_url: str = di_sas_url
self.figure_sas_url: str = figure_sas_url
def extract_document(self, context: ProcessingContext,language:str) -> Document:
"""Extract document content using Document Intelligence"""
# Get data_dir config, use instance variable if not present
data_dir = context.data_config.get("data_dir", self.data_directory)
# Download document file - use correct parameter order
local_file_paths = downloadToLocalFolder(blob_url=context.data_config["data_path"], data_dir=data_dir, local_folder=self.tmp_directory, name_starts_with=context.object_key)
if not local_file_paths or len(local_file_paths) == 0:
raise ValueError(f"File {context.object_key} not found in blob storage")
di_blob_file_name = context.object_key + str(context.metadata["doc_modified_time"]) + ".json"
di_result:DiResult = None
# Try to download the di result from the blob. If you can download it, you will no longer di_extract
if self.di_sas_url and blob_exists(self.di_sas_url, di_blob_file_name):
content:str = load_content(blob_sas_url=self.di_sas_url, file_name=di_blob_file_name)
if content:
di_result = DiResult.from_json(content) # type: ignore
if not di_result:
di_result = di_extract(source_file_path=local_file_paths.pop(), di_client=self.form_recognizer_client, directory_path=self.tmp_directory, figure_sas_url=self.figure_sas_url, language=language)
try:
process_document_figures(di_result=di_result,config=context.config)
except Exception as e:
print(f"Error processing document figures: {e}")
finally:
# The result after understanding is written directly to the blob to prevent subsequent reprocessing
blob_upload_object(blob_sas_url=self.di_sas_url, file_name=di_blob_file_name, obj=di_result)
under_image_content = replace_urls_in_content(content=di_result.di_content, replacements=di_result.figures)
# Save extracted content to local file (same as original logic)
write_content(content=under_image_content, directory_path=self.tmp_directory, file_name=context.object_key)
blob_upload_content(blob_sas_url=self.di_sas_url, file_name=di_blob_file_name+".md", content=under_image_content)
return Document(content=under_image_content, filepath=context.object_key)
class DefaultDocumentChunkingService:
"""Default document chunking service"""
def __init__(self, tmp_directory: str = None):
self.tmp_directory = tmp_directory
def chunk_document(self, document: Document, context: ProcessingContext) -> ChunkingResult:
"""Chunk document"""
# Call the original chunking method
chunking_result = chunk_di_doc(document, data_config=context.data_config, tmp_path=context.current_tmp_directory)
# If tmp_directory is configured, save chunk result to local file
if self.tmp_directory:
write_document( chunking_result.chunks, file_path=context.object_key, directory_path=self.tmp_directory, rel_file_path=context.object_key )
return chunking_result
class AzureSearchIndexingService:
"""Azure Search-based indexing service"""
def __init__(self):
pass
def index_document(self, document: Document, context: ProcessingContext) -> bool:
"""Index document"""
# Get document index schema
document_schemas = [schema for schema in context.data_config["index_schemas"]
if set(schema["data_type"]) == {"metadata", "document"}]
doc_dict = asdict_with_dynamic(document)
doc_dict.update(context.metadata)
for schema in document_schemas:
if not upload_merge_index(index_config=schema, docs=[doc_dict], merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory):
return False
return True
def index_chunks(self, chunks: List[Document], context: ProcessingContext) -> bool:
"""Index document chunks"""
# Get chunk index schema
chunk_schemas = [schema for schema in context.data_config["index_schemas"]
if set(schema["data_type"]) == {"metadata", "document", "chunk"}]
chunk_dict = [asdict_with_dynamic(chunk) for chunk in chunks]
for schema in chunk_schemas:
# First delete old chunk data
delete_documents_by_field(schema["index_name"], schema["update_by_field"], context.object_key)
# Add metadata to each chunk
for chunk in chunk_dict:
chunk.update(context.metadata)
# Upload new chunk data
if not upload_merge_index(
index_config=schema,
docs=chunk_dict,
merge_fields=context.data_config["merge_fields"],
current_tmp_directory=context.current_tmp_directory
):
return False
return True
def get_existing_document_data(self, object_key: str, index_name: str, field_name: str) -> Optional[dict[str,Any]]:
"""Get existing document data from Azure Search Index"""
results = query_by_field(
index_name=index_name,
field_name=field_name,
value=object_key
)
return results[0] if results else None
def get_existing_chunk_data(self, object_key: str, index_name: str, field_name: str) -> List[dict[str,Any]]:
"""Get existing chunk data from Azure Search Index"""
results = query_by_field( index_name=index_name, field_name=field_name, value=object_key )
return results if results else []
def index_metadata(self, metadata: dict[str,Any], schema: Any, context: ProcessingContext) -> bool:
"""Index metadata"""
return upload_merge_index(index_config=schema, docs=[metadata], merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory )
def index_document_with_schema(self, doc_dict: Dict[str,Any], schema: Any, context: ProcessingContext) -> bool:
"""Index document using specified schema"""
return upload_merge_index(
index_config=schema,
docs=[doc_dict],
merge_fields=context.data_config["merge_fields"],
current_tmp_directory=context.current_tmp_directory
)
def index_chunks_with_schema(self, chunk_dict: List[Dict[str,Any]], schema: Any, context: ProcessingContext) -> bool:
"""Index chunks using specified schema"""
return upload_merge_index(
index_config=schema,
docs=chunk_dict,
merge_fields=context.data_config["merge_fields"],
current_tmp_directory=context.current_tmp_directory
)
def delete_chunks_by_field(self, index_name: str, field_name: str, field_value: str) -> bool:
"""Delete chunks by field"""
try:
delete_documents_by_field(index_name, field_name, field_value)
return True
except Exception:
return False
class BlobMetadataService:
"""Metadata service based on Blob storage"""
def __init__(self, datasource: Dict[str, Any]):
self.datasource = datasource
def get_metadata(self, object_key: str) -> Dict[str, Any]:
"""Get metadata"""
if "metadata" not in self.datasource:
return {}
return self.datasource["metadata"].get(object_key, {})
# Update the factory class with specific implementations
class DocumentProcessingFactory:
"""Document processing factory class"""
def __init__(self, service_factory: ServiceFactory, tmp_directory:str, datasource: Optional[Dict[str, Any]] = None, config:ApplicationConfig = None):
"""
Initialize factory
Args:
service_factory: Service factory (used to get database engine)
datasource: Data source configuration
"""
self.service_factory: ServiceFactory = service_factory
self.datasource = datasource or {}
self.shared_tmp_directory = tmp_directory
self.config:ApplicationConfig = config
def create_orchestrator(self) -> DocumentProcessingOrchestrator:
"""Create document processing orchestrator"""
extraction_service = self._create_extraction_service()
chunking_service = self._create_chunking_service()
indexing_service = self._create_indexing_service()
metadata_service = self._create_metadata_service()
repository = self._create_repository()
return DocumentProcessingOrchestrator(
extraction_service=extraction_service,
chunking_service=chunking_service,
indexing_service=indexing_service,
metadata_service=metadata_service,
repository=repository
)
def _create_extraction_service(self) -> 'DocumentIntelligenceExtractionService':
"""Create document extraction service"""
# Use the factory shared temporary directory (same as original app.py logic)
tmp_directory = self.shared_tmp_directory
# Get configuration from environment variables (same as original worker.py logic)
vllm_endpoint = os.environ.get("captioning_model_endpoint", "")
vllm_key = os.environ.get("captioning_model_key", "")
form_recognizer_client = SingletonFormRecognizerClient()
return DocumentIntelligenceExtractionService(
form_recognizer_client=form_recognizer_client,
vllm_endpoint=vllm_endpoint,
vllm_key=vllm_key,
tmp_directory=tmp_directory,
data_directory="", # Will be dynamically fetched from data_config
di_sas_url=self.config.azure_services.di_blob_account_url,
figure_sas_url=self.config.azure_services.figure_blob_account_url
)
def _create_chunking_service(self) -> 'DefaultDocumentChunkingService':
"""Create document chunking service"""
# Use the factory shared temporary directory
tmp_directory = self.shared_tmp_directory
return DefaultDocumentChunkingService(tmp_directory=tmp_directory)
def _create_indexing_service(self) -> 'AzureSearchIndexingService':
"""Create indexing service"""
return AzureSearchIndexingService()
def _create_metadata_service(self) -> 'BlobMetadataService':
"""Create metadata service"""
return BlobMetadataService(self.datasource)
def _create_repository(self) -> DocumentRepository:
"""Create document repository"""
database_interface = LegacyDatabaseAdapter(self.service_factory.get_database_engine())
return SqlAlchemyDocumentRepository(database_interface)