Files
catonline_ai/vw-document-ai-indexer/document_task_processor.py
2025-09-26 17:15:54 +08:00

104 lines
4.9 KiB
Python

"""
Document Task Processor
Integrates business logic and database operations
"""
import datetime
import json
import logging
from typing import Any, Optional
from sqlalchemy import and_
from sqlalchemy.orm import sessionmaker
from app_config import ServiceFactory
from task_processor import Task, TaskProcessorInterface
from business_layer import ApplicationConfig, DocumentProcessingFactory, ProcessingContext
from database import IndexObject, IndexObjectStatus, IndexJob
from utils import custom_serializer
class DocumentTaskProcessor(TaskProcessorInterface):
"""Document task processor"""
def __init__(self, config:ApplicationConfig, service_factory:ServiceFactory, tmp_directory:str, database_engine:Any, logger: Optional[logging.Logger] , datasource: dict[str,Any] ,data_config:dict[str,Any]):
self.config = config
self.service_factory = service_factory
self.database_engine = database_engine
self.logger = logger or logging.getLogger(__name__)
self.datasource = datasource or {}
self.processing_factory = DocumentProcessingFactory(service_factory=service_factory, tmp_directory=tmp_directory, datasource=datasource, config=config)
self.data_config: dict[str, Any] = data_config
self.datasource_name: str = data_config.get("datasource_name", "default")
def process(self, task: Task) -> Any:
"""Process document task"""
if not isinstance(task.payload, ProcessingContext):
raise ValueError(f"Expected ProcessingContext, got {type(task.payload)}")
context = task.payload
detailed_message:dict[str,Any] = {}
detailed_message["start_time"] = datetime.datetime.now(datetime.timezone.utc)
Session = sessionmaker(bind=self.database_engine)
session = Session()
try:
# 1. Query or create IndexObject record
index_object_db = session.query(IndexObject).get({"object_key":context.object_key,"datasource_name":context.datasource_name})
if not index_object_db:
self.logger.info(f"Creating new IndexObject entry for {context.object_key}")
index_object_db = IndexObject(
object_key=context.object_key,
type="document",
status=IndexObjectStatus.PROCESSING.value,
try_count=0,
datasource_name=context.datasource_name
)
session.add(index_object_db)
session.commit()
# 2. Only update task-related fields, no longer update business fields
index_object_db.last_start_time = datetime.datetime.now(datetime.timezone.utc)
current_job = session.query(IndexJob).filter(and_(IndexJob.status == "processing",IndexJob.datasource_name== context.datasource_name)).order_by(IndexJob.id.desc()).first()
if current_job:
index_object_db.last_run_id = current_job.id
session.commit()
# 3. Execute business processing
self.logger.info(f"Processing document: {context.object_key}")
orchestrator = self.processing_factory.create_orchestrator()
result = orchestrator.process_document(context)
# 4. Only update task-related fields, no longer update business fields
detailed_message["success"] = result.status == IndexObjectStatus.SUCCESS
detailed_message["chunks_count"] = result.chunks_count
detailed_message["processing_time"] = result.processing_time
detailed_message["message"] = result.message
if result.status != IndexObjectStatus.SUCCESS:
self.logger.error(f"Failed to process {context.object_key}: {result.message}")
detailed_message["error"] = result.message
if result.error:
detailed_message["error_details"] = str(result.error)
else:
self.logger.info(f"Successfully processed {context.object_key}")
index_object_db.last_finished_time = datetime.datetime.now(datetime.timezone.utc)
detailed_message["end_time"] = datetime.datetime.now(datetime.timezone.utc)
index_object_db.detailed_message = json.dumps(detailed_message, default=custom_serializer, ensure_ascii=False)
session.commit()
# If processing failed, raise exception to trigger retry mechanism
if result.status == IndexObjectStatus.FAILED:
raise Exception(result.message)
return result
except Exception as e:
# Handle exceptions - only update database in case of unexpected exceptions
# Business logic failures are already handled above
self.logger.error(f"Error processing {context.object_key}: {e}")
raise
finally:
session.close()