""" Document Task Processor Integrates business logic and database operations """ import datetime import json import logging from typing import Any, Optional from sqlalchemy import and_ from sqlalchemy.orm import sessionmaker from app_config import ServiceFactory from task_processor import Task, TaskProcessorInterface from business_layer import ApplicationConfig, DocumentProcessingFactory, ProcessingContext from database import IndexObject, IndexObjectStatus, IndexJob from utils import custom_serializer class DocumentTaskProcessor(TaskProcessorInterface): """Document task processor""" def __init__(self, config:ApplicationConfig, service_factory:ServiceFactory, tmp_directory:str, database_engine:Any, logger: Optional[logging.Logger] , datasource: dict[str,Any] ,data_config:dict[str,Any]): self.config = config self.service_factory = service_factory self.database_engine = database_engine self.logger = logger or logging.getLogger(__name__) self.datasource = datasource or {} self.processing_factory = DocumentProcessingFactory(service_factory=service_factory, tmp_directory=tmp_directory, datasource=datasource, config=config) self.data_config: dict[str, Any] = data_config self.datasource_name: str = data_config.get("datasource_name", "default") def process(self, task: Task) -> Any: """Process document task""" if not isinstance(task.payload, ProcessingContext): raise ValueError(f"Expected ProcessingContext, got {type(task.payload)}") context = task.payload detailed_message:dict[str,Any] = {} detailed_message["start_time"] = datetime.datetime.now(datetime.timezone.utc) Session = sessionmaker(bind=self.database_engine) session = Session() try: # 1. Query or create IndexObject record index_object_db = session.query(IndexObject).get({"object_key":context.object_key,"datasource_name":context.datasource_name}) if not index_object_db: self.logger.info(f"Creating new IndexObject entry for {context.object_key}") index_object_db = IndexObject( object_key=context.object_key, type="document", status=IndexObjectStatus.PROCESSING.value, try_count=0, datasource_name=context.datasource_name ) session.add(index_object_db) session.commit() # 2. Only update task-related fields, no longer update business fields index_object_db.last_start_time = datetime.datetime.now(datetime.timezone.utc) current_job = session.query(IndexJob).filter(and_(IndexJob.status == "processing",IndexJob.datasource_name== context.datasource_name)).order_by(IndexJob.id.desc()).first() if current_job: index_object_db.last_run_id = current_job.id session.commit() # 3. Execute business processing self.logger.info(f"Processing document: {context.object_key}") orchestrator = self.processing_factory.create_orchestrator() result = orchestrator.process_document(context) # 4. Only update task-related fields, no longer update business fields detailed_message["success"] = result.status == IndexObjectStatus.SUCCESS detailed_message["chunks_count"] = result.chunks_count detailed_message["processing_time"] = result.processing_time detailed_message["message"] = result.message if result.status != IndexObjectStatus.SUCCESS: self.logger.error(f"Failed to process {context.object_key}: {result.message}") detailed_message["error"] = result.message if result.error: detailed_message["error_details"] = str(result.error) else: self.logger.info(f"Successfully processed {context.object_key}") index_object_db.last_finished_time = datetime.datetime.now(datetime.timezone.utc) detailed_message["end_time"] = datetime.datetime.now(datetime.timezone.utc) index_object_db.detailed_message = json.dumps(detailed_message, default=custom_serializer, ensure_ascii=False) session.commit() # If processing failed, raise exception to trigger retry mechanism if result.status == IndexObjectStatus.FAILED: raise Exception(result.message) return result except Exception as e: # Handle exceptions - only update database in case of unexpected exceptions # Business logic failures are already handled above self.logger.error(f"Error processing {context.object_key}: {e}") raise finally: session.close()