""" business_layer.py This module contains the business logic for document processing.""" import os from abc import ABC, abstractmethod from typing import List, Optional, Dict, Any from dataclasses import dataclass import traceback import datetime from collections import Counter from azure.ai.documentintelligence import DocumentIntelligenceClient from azure.core.credentials import AzureKeyCredential from azure.core.pipeline.policies import RetryPolicy from app_config import ApplicationConfig, ServiceFactory from chunk_service import chunk_di_doc from entity_models import Document, ChunkingResult,DiResult from database import DatabaseInterface, IndexObject, IndexObjectStatus,LegacyDatabaseAdapter from di_extractor import di_extract from blob_service import blob_exists, blob_upload_content, blob_upload_object, downloadToLocalFolder, load_content from utils import replace_urls_in_content, write_content,write_document,asdict_with_dynamic from azure_index_service import upload_merge_index, delete_documents_by_field,query_by_field from vllm_extractor import process_document_figures class SingletonFormRecognizerClient: instance = None def __new__(cls, *args, **kwargs): if not cls.instance: extract_method = os.environ.get("extract_method", "default") if extract_method == "vision-llm": cls.instance = object() # dummy object else: url = os.getenv("form_rec_resource") key = os.getenv("form_rec_key") if url and key: print("SingletonFormRecognizerClient: Creating instance of Form recognizer per process") retry = RetryPolicy(total_retries=5,connect_retries=3,read_retries=3,backoff_factor=0.8,retry_backoff_max=60) cls.instance = DocumentIntelligenceClient(endpoint=url, credential=AzureKeyCredential(key), retry_policy=retry, connection_timeout=1200,read_timeout=1200) else: print("SingletonFormRecognizerClient: Skipping since credentials not provided. Assuming NO form recognizer extensions(like .pdf) in directory") cls.instance = object() # dummy object return cls.instance def __getstate__(self)->tuple[Any,Any]: return self.url, self.key def __setstate__(self, state): url, key = state retry = RetryPolicy(total_retries=5,connect_retries=3,read_retries=3,backoff_factor=0.8,retry_backoff_max=60) self.instance = DocumentIntelligenceClient(endpoint=url, credential=AzureKeyCredential(key), retry_policy=retry, connection_timeout=1200,read_timeout=1200) @dataclass class ProcessingContext: """Processing Context""" object_key: str data_config: Dict[str, Any] metadata: Dict[str, Any] retry_count: int = 0 error_message: Optional[str] = None current_tmp_directory: str = "" datasource_name: str = "" config: ApplicationConfig | None = None @dataclass class ProcessingResult: """Processing Result""" status: IndexObjectStatus object_key: str message: str processing_time: float chunks_count: int = 0 error: Optional[Exception] = None # Keep only the DocumentRepository interface, other services directly use the specific implementation class DocumentRepository(ABC): """Document Repository Interface""" @abstractmethod def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]: """Get index object""" pass @abstractmethod def save_index_object(self, index_object: IndexObject) -> None: """Save index object""" pass @abstractmethod def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None: """Update processing status""" pass # Application service layer class DocumentProcessingOrchestrator: """Document Processing Orchestrator (Application Service Layer)""" def __init__(self, extraction_service: 'DocumentIntelligenceExtractionService', chunking_service: 'DefaultDocumentChunkingService', indexing_service: 'AzureSearchIndexingService', metadata_service: 'BlobMetadataService', repository: DocumentRepository): self.extraction_service = extraction_service self.chunking_service = chunking_service self.indexing_service = indexing_service self.metadata_service = metadata_service self.repository = repository def process_document(self, context: ProcessingContext) -> ProcessingResult: """Main process for handling a single document""" start_time = datetime.datetime.now() # 1. Get or create index object index_object = self._get_or_create_index_object(context) # if not index_object: # raise ValueError(f"Failed to create or retrieve index object for {context.object_key}") try: # 2. Check retry count # If the current processing object's time is updated, reset the retry count, and execute the subsequent logic. The comparison dimensions are the last failed document modification time and metadata modification time if index_object.last_fail_doc_modifed_time != context.metadata.get("doc_modified_time") or index_object.last_fail_metadata_modifed_time != context.metadata.get("metadata_modified_time"): index_object.try_count = 0 if index_object.status in ["processing", "failed"]: # Check if the maximum retry count has been reached if index_object.try_count >= 3: return ProcessingResult(status=IndexObjectStatus.FAILED, object_key=context.object_key, message=f"Object has been retried {index_object.try_count} times, skipping processing", processing_time=0) # Increase the retry count and save immediately index_object.try_count += 1 # Immediately save the retry count update self.repository.save_index_object(index_object) # 3. Update status to processing self.repository.update_processing_status(context.object_key,context.datasource_name, IndexObjectStatus.PROCESSING) # 4. Check if processing is needed (metadata and document modification times) meta_update_flag = self._should_process_metadata(index_object, context) doc_update_flag = self._should_process_document(index_object, context) chunks_count = 0 # 5. Process metadata index (if update is needed) if meta_update_flag: self._process_metadata_indexes(context) # 6. Process document and chunk indexes (Important: Only process when meta_update_flag OR doc_update_flag=True) if meta_update_flag or doc_update_flag: chunks_count = self._process_document_and_chunks(context, doc_update_flag) # 7. Update the modification time of the index object if meta_update_flag: index_object.metadata_modifed_time = context.metadata.get("metadata_modified_time") if doc_update_flag: index_object.doc_modifed_time = context.metadata.get("doc_modified_time") index_object.status = IndexObjectStatus.SUCCESS.value if index_object.metadata_modifed_time is None: index_object.metadata_modifed_time = context.metadata.get("metadata_modified_time") self.repository.save_index_object(index_object) processing_time = (datetime.datetime.now() - start_time).total_seconds() return ProcessingResult(status=IndexObjectStatus.SUCCESS, object_key=context.object_key, message=f"Successfully processed {chunks_count} chunks", processing_time=processing_time, chunks_count=chunks_count) except Exception as e: error_message:str = traceback.format_exc() index_object.status = IndexObjectStatus.FAILED.value index_object.last_fail_doc_modifed_time = context.metadata.get("doc_modified_time") index_object.last_fail_metadata_modifed_time = context.metadata.get("metadata_modified_time") self.repository.save_index_object(index_object) processing_time = (datetime.datetime.now() - start_time).total_seconds() return ProcessingResult(status=IndexObjectStatus.FAILED, object_key=context.object_key, message=f"Processing failed: {error_message}", processing_time=processing_time, error=e ) def _get_or_create_index_object(self, context: ProcessingContext) -> IndexObject: """Get or create index object""" index_object = self.repository.get_index_object(context.object_key,context.datasource_name) if not index_object: index_object = IndexObject( object_key=context.object_key, type="document", status=IndexObjectStatus.PROCESSING.value, datasource_name=context.datasource_name ) self.repository.save_index_object(index_object) return index_object def _should_process(self, index_object: IndexObject, context: ProcessingContext) -> bool: """Determine whether processing is needed (keep the original logic for backward compatibility)""" return self._should_process_metadata(index_object, context) or self._should_process_document(index_object, context) def _should_process_metadata(self, index_object: IndexObject, context: ProcessingContext) -> bool: """Determine whether metadata processing is needed""" if 'metadata_modified_time' in context.metadata: metadata_modified_time = context.metadata['metadata_modified_time'] if index_object.metadata_modifed_time is None: return True if metadata_modified_time is not None and metadata_modified_time > index_object.metadata_modifed_time: return True return False def _should_process_document(self, index_object: IndexObject, context: ProcessingContext) -> bool: """Determine whether document processing is needed""" if 'doc_modified_time' in context.metadata: doc_modified_time = context.metadata['doc_modified_time'] if index_object.doc_modifed_time is None: return True if doc_modified_time is not None and doc_modified_time > index_object.doc_modifed_time: return True return False def _process_metadata_indexes(self, context: ProcessingContext) -> None: """Process metadata index""" # Push metadata index - only process index with data_type of ["metadata"] meta_index_schemas = [schema for schema in context.data_config["index_schemas"] if Counter(schema["data_type"]) == Counter(["metadata"])] if not any(meta_index_schemas): return # Get metadata - from metadata service doc_meta = self.metadata_service.get_metadata(context.object_key) # Metadata must not be empty, use empty dictionary as default value if not doc_meta: raise ValueError(f"Metadata for object {context.object_key} not found") for meta_index_schema in meta_index_schemas: self.indexing_service.index_metadata(doc_meta, meta_index_schema, context) def _process_document_and_chunks(self, context: ProcessingContext, doc_update_flag: bool) -> int: """Process document and chunk indexes, return the number of processed chunks""" doc_dict = {} chunk_dict = [] chunks_count = 0 # Update document dictionary with metadata doc_meta = self.metadata_service.get_metadata(context.object_key) language_code = doc_meta.get("language_code", "zh-Hans") # Default to "zh-Hans" if not specified # Future error or skip operation if no doc_meta configuration file if not doc_meta: doc_meta={} # If the document needs to be updated, re-extract and chunk if doc_update_flag: # Extract document document = self.extraction_service.extract_document(context, language_code) document.title = os.path.splitext(context.object_key)[0] # Chunk processing chunking_result = self.chunking_service.chunk_document(document, context) chunks_count = len(chunking_result.chunks) # Convert to dictionary format doc_dict = self._convert_document_to_dict(document) chunk_dict = [self._convert_document_to_dict(chunk) for chunk in chunking_result.chunks] # Process document index - data_type is ["metadata","document"] document_index_schemas = [schema for schema in context.data_config["index_schemas"] if Counter(schema["data_type"]) == Counter(["metadata","document"]) or Counter(schema["data_type"]) == Counter(["document"])] for document_index_schema in document_index_schemas: if not doc_update_flag: # Get existing document data from Azure Search Index existing_docs = self.indexing_service.get_existing_document_data( context.object_key, document_index_schema["index_name"], document_index_schema["update_by_field"] ) if existing_docs: doc_dict = existing_docs doc_dict.update({k: doc_meta[k] for k in document_index_schema["fields"] if k in doc_meta}) # Upload document index self.indexing_service.index_document_with_schema(doc_dict, document_index_schema, context) # Process chunk index - data_type is ["metadata","document","chunk"] chunk_index_schemas = [schema for schema in context.data_config["index_schemas"] if Counter(schema["data_type"]) == Counter(["metadata","document","chunk"]) or Counter(schema["data_type"]) == Counter(["chunk"])] for index_schema in chunk_index_schemas: current_chunk_dict = chunk_dict # Use existing chunk_dict current_chunks_count = chunks_count # Use existing chunks_count if not doc_update_flag: # Get existing chunk data from Azure Search Index current_chunk_dict = self.indexing_service.get_existing_chunk_data(context.object_key, index_schema["index_name"], index_schema["update_by_field"]) current_chunks_count = len(current_chunk_dict) if current_chunk_dict else 0 # Update the total chunks_count (for return value) chunks_count = current_chunks_count for chunk in current_chunk_dict if current_chunk_dict else []: chunk.update({k: doc_meta[k] for k in index_schema["fields"] if k in doc_meta}) # Delete old chunk data self.indexing_service.delete_chunks_by_field(index_schema["index_name"], index_schema["update_by_field"], doc_dict.get(index_schema["update_by_field"], context.object_key)) # Upload new chunk data if current_chunk_dict: self.indexing_service.index_chunks_with_schema(current_chunk_dict, index_schema, context) return chunks_count def _convert_document_to_dict(self, document:Document) -> Dict[str, Any]: """Convert Document object to dictionary""" try: # Use the original asdict_with_dynamic function to maintain compatibility return asdict_with_dynamic(document) except Exception: # If asdict_with_dynamic fails, use the fallback method if hasattr(document, '__dict__'): return document.__dict__.copy() elif hasattr(document, 'to_dict'): return document.to_dict() else: # If all fails, return empty dictionary return {} # Infrastructure layer implementation class SqlAlchemyDocumentRepository(DocumentRepository): """SQLAlchemy-based document repository implementation""" def __init__(self, database_interface: DatabaseInterface): self.database_interface = database_interface def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]: """Get index object""" return self.database_interface.get_index_object(object_key,datasource_name) def save_index_object(self, index_object: IndexObject) -> None: """Save index object""" self.database_interface.save_index_object(index_object) def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None: """Update processing status""" # Convert business layer status to database status self.database_interface.update_processing_status(object_key,datasource_name, status, message) # Concrete implementation class class DocumentIntelligenceExtractionService: """Document extraction service based on Document Intelligence""" def __init__(self, form_recognizer_client: DocumentIntelligenceClient, vllm_endpoint, vllm_key, tmp_directory, data_directory=None,di_sas_url=None, figure_sas_url=None): self.form_recognizer_client: DocumentIntelligenceClient = form_recognizer_client self.vllm_endpoint: str = vllm_endpoint self.vllm_key: str = vllm_key self.tmp_directory: str = tmp_directory self.data_directory: str = data_directory or "" self.di_sas_url: str = di_sas_url self.figure_sas_url: str = figure_sas_url def extract_document(self, context: ProcessingContext,language:str) -> Document: """Extract document content using Document Intelligence""" # Get data_dir config, use instance variable if not present data_dir = context.data_config.get("data_dir", self.data_directory) # Download document file - use correct parameter order local_file_paths = downloadToLocalFolder(blob_url=context.data_config["data_path"], data_dir=data_dir, local_folder=self.tmp_directory, name_starts_with=context.object_key) if not local_file_paths or len(local_file_paths) == 0: raise ValueError(f"File {context.object_key} not found in blob storage") di_blob_file_name = context.object_key + str(context.metadata["doc_modified_time"]) + ".json" di_result:DiResult = None # Try to download the di result from the blob. If you can download it, you will no longer di_extract if self.di_sas_url and blob_exists(self.di_sas_url, di_blob_file_name): content:str = load_content(blob_sas_url=self.di_sas_url, file_name=di_blob_file_name) if content: di_result = DiResult.from_json(content) # type: ignore if not di_result: di_result = di_extract(source_file_path=local_file_paths.pop(), di_client=self.form_recognizer_client, directory_path=self.tmp_directory, figure_sas_url=self.figure_sas_url, language=language) try: process_document_figures(di_result=di_result,config=context.config) except Exception as e: print(f"Error processing document figures: {e}") finally: # The result after understanding is written directly to the blob to prevent subsequent reprocessing blob_upload_object(blob_sas_url=self.di_sas_url, file_name=di_blob_file_name, obj=di_result) under_image_content = replace_urls_in_content(content=di_result.di_content, replacements=di_result.figures) # Save extracted content to local file (same as original logic) write_content(content=under_image_content, directory_path=self.tmp_directory, file_name=context.object_key) blob_upload_content(blob_sas_url=self.di_sas_url, file_name=di_blob_file_name+".md", content=under_image_content) return Document(content=under_image_content, filepath=context.object_key) class DefaultDocumentChunkingService: """Default document chunking service""" def __init__(self, tmp_directory: str = None): self.tmp_directory = tmp_directory def chunk_document(self, document: Document, context: ProcessingContext) -> ChunkingResult: """Chunk document""" # Call the original chunking method chunking_result = chunk_di_doc(document, data_config=context.data_config, tmp_path=context.current_tmp_directory) # If tmp_directory is configured, save chunk result to local file if self.tmp_directory: write_document( chunking_result.chunks, file_path=context.object_key, directory_path=self.tmp_directory, rel_file_path=context.object_key ) return chunking_result class AzureSearchIndexingService: """Azure Search-based indexing service""" def __init__(self): pass def index_document(self, document: Document, context: ProcessingContext) -> bool: """Index document""" # Get document index schema document_schemas = [schema for schema in context.data_config["index_schemas"] if set(schema["data_type"]) == {"metadata", "document"}] doc_dict = asdict_with_dynamic(document) doc_dict.update(context.metadata) for schema in document_schemas: if not upload_merge_index(index_config=schema, docs=[doc_dict], merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory): return False return True def index_chunks(self, chunks: List[Document], context: ProcessingContext) -> bool: """Index document chunks""" # Get chunk index schema chunk_schemas = [schema for schema in context.data_config["index_schemas"] if set(schema["data_type"]) == {"metadata", "document", "chunk"}] chunk_dict = [asdict_with_dynamic(chunk) for chunk in chunks] for schema in chunk_schemas: # First delete old chunk data delete_documents_by_field(schema["index_name"], schema["update_by_field"], context.object_key) # Add metadata to each chunk for chunk in chunk_dict: chunk.update(context.metadata) # Upload new chunk data if not upload_merge_index( index_config=schema, docs=chunk_dict, merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory ): return False return True def get_existing_document_data(self, object_key: str, index_name: str, field_name: str) -> Optional[dict[str,Any]]: """Get existing document data from Azure Search Index""" results = query_by_field( index_name=index_name, field_name=field_name, value=object_key ) return results[0] if results else None def get_existing_chunk_data(self, object_key: str, index_name: str, field_name: str) -> List[dict[str,Any]]: """Get existing chunk data from Azure Search Index""" results = query_by_field( index_name=index_name, field_name=field_name, value=object_key ) return results if results else [] def index_metadata(self, metadata: dict[str,Any], schema: Any, context: ProcessingContext) -> bool: """Index metadata""" return upload_merge_index(index_config=schema, docs=[metadata], merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory ) def index_document_with_schema(self, doc_dict: Dict[str,Any], schema: Any, context: ProcessingContext) -> bool: """Index document using specified schema""" return upload_merge_index( index_config=schema, docs=[doc_dict], merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory ) def index_chunks_with_schema(self, chunk_dict: List[Dict[str,Any]], schema: Any, context: ProcessingContext) -> bool: """Index chunks using specified schema""" return upload_merge_index( index_config=schema, docs=chunk_dict, merge_fields=context.data_config["merge_fields"], current_tmp_directory=context.current_tmp_directory ) def delete_chunks_by_field(self, index_name: str, field_name: str, field_value: str) -> bool: """Delete chunks by field""" try: delete_documents_by_field(index_name, field_name, field_value) return True except Exception: return False class BlobMetadataService: """Metadata service based on Blob storage""" def __init__(self, datasource: Dict[str, Any]): self.datasource = datasource def get_metadata(self, object_key: str) -> Dict[str, Any]: """Get metadata""" if "metadata" not in self.datasource: return {} return self.datasource["metadata"].get(object_key, {}) # Update the factory class with specific implementations class DocumentProcessingFactory: """Document processing factory class""" def __init__(self, service_factory: ServiceFactory, tmp_directory:str, datasource: Optional[Dict[str, Any]] = None, config:ApplicationConfig = None): """ Initialize factory Args: service_factory: Service factory (used to get database engine) datasource: Data source configuration """ self.service_factory: ServiceFactory = service_factory self.datasource = datasource or {} self.shared_tmp_directory = tmp_directory self.config:ApplicationConfig = config def create_orchestrator(self) -> DocumentProcessingOrchestrator: """Create document processing orchestrator""" extraction_service = self._create_extraction_service() chunking_service = self._create_chunking_service() indexing_service = self._create_indexing_service() metadata_service = self._create_metadata_service() repository = self._create_repository() return DocumentProcessingOrchestrator( extraction_service=extraction_service, chunking_service=chunking_service, indexing_service=indexing_service, metadata_service=metadata_service, repository=repository ) def _create_extraction_service(self) -> 'DocumentIntelligenceExtractionService': """Create document extraction service""" # Use the factory shared temporary directory (same as original app.py logic) tmp_directory = self.shared_tmp_directory # Get configuration from environment variables (same as original worker.py logic) vllm_endpoint = os.environ.get("captioning_model_endpoint", "") vllm_key = os.environ.get("captioning_model_key", "") form_recognizer_client = SingletonFormRecognizerClient() return DocumentIntelligenceExtractionService( form_recognizer_client=form_recognizer_client, vllm_endpoint=vllm_endpoint, vllm_key=vllm_key, tmp_directory=tmp_directory, data_directory="", # Will be dynamically fetched from data_config di_sas_url=self.config.azure_services.di_blob_account_url, figure_sas_url=self.config.azure_services.figure_blob_account_url ) def _create_chunking_service(self) -> 'DefaultDocumentChunkingService': """Create document chunking service""" # Use the factory shared temporary directory tmp_directory = self.shared_tmp_directory return DefaultDocumentChunkingService(tmp_directory=tmp_directory) def _create_indexing_service(self) -> 'AzureSearchIndexingService': """Create indexing service""" return AzureSearchIndexingService() def _create_metadata_service(self) -> 'BlobMetadataService': """Create metadata service""" return BlobMetadataService(self.datasource) def _create_repository(self) -> DocumentRepository: """Create document repository""" database_interface = LegacyDatabaseAdapter(self.service_factory.get_database_engine()) return SqlAlchemyDocumentRepository(database_interface)