Files
catonline_ai/vw-document-ai-indexer/document_task_processor.py

104 lines
4.9 KiB
Python
Raw Permalink Normal View History

2025-09-26 17:15:54 +08:00
"""
Document Task Processor
Integrates business logic and database operations
"""
import datetime
import json
import logging
from typing import Any, Optional
from sqlalchemy import and_
from sqlalchemy.orm import sessionmaker
from app_config import ServiceFactory
from task_processor import Task, TaskProcessorInterface
from business_layer import ApplicationConfig, DocumentProcessingFactory, ProcessingContext
from database import IndexObject, IndexObjectStatus, IndexJob
from utils import custom_serializer
class DocumentTaskProcessor(TaskProcessorInterface):
"""Document task processor"""
def __init__(self, config:ApplicationConfig, service_factory:ServiceFactory, tmp_directory:str, database_engine:Any, logger: Optional[logging.Logger] , datasource: dict[str,Any] ,data_config:dict[str,Any]):
self.config = config
self.service_factory = service_factory
self.database_engine = database_engine
self.logger = logger or logging.getLogger(__name__)
self.datasource = datasource or {}
self.processing_factory = DocumentProcessingFactory(service_factory=service_factory, tmp_directory=tmp_directory, datasource=datasource, config=config)
self.data_config: dict[str, Any] = data_config
self.datasource_name: str = data_config.get("datasource_name", "default")
def process(self, task: Task) -> Any:
"""Process document task"""
if not isinstance(task.payload, ProcessingContext):
raise ValueError(f"Expected ProcessingContext, got {type(task.payload)}")
context = task.payload
detailed_message:dict[str,Any] = {}
detailed_message["start_time"] = datetime.datetime.now(datetime.timezone.utc)
Session = sessionmaker(bind=self.database_engine)
session = Session()
try:
# 1. Query or create IndexObject record
index_object_db = session.query(IndexObject).get({"object_key":context.object_key,"datasource_name":context.datasource_name})
if not index_object_db:
self.logger.info(f"Creating new IndexObject entry for {context.object_key}")
index_object_db = IndexObject(
object_key=context.object_key,
type="document",
status=IndexObjectStatus.PROCESSING.value,
try_count=0,
datasource_name=context.datasource_name
)
session.add(index_object_db)
session.commit()
# 2. Only update task-related fields, no longer update business fields
index_object_db.last_start_time = datetime.datetime.now(datetime.timezone.utc)
current_job = session.query(IndexJob).filter(and_(IndexJob.status == "processing",IndexJob.datasource_name== context.datasource_name)).order_by(IndexJob.id.desc()).first()
if current_job:
index_object_db.last_run_id = current_job.id
session.commit()
# 3. Execute business processing
self.logger.info(f"Processing document: {context.object_key}")
orchestrator = self.processing_factory.create_orchestrator()
result = orchestrator.process_document(context)
# 4. Only update task-related fields, no longer update business fields
detailed_message["success"] = result.status == IndexObjectStatus.SUCCESS
detailed_message["chunks_count"] = result.chunks_count
detailed_message["processing_time"] = result.processing_time
detailed_message["message"] = result.message
if result.status != IndexObjectStatus.SUCCESS:
self.logger.error(f"Failed to process {context.object_key}: {result.message}")
detailed_message["error"] = result.message
if result.error:
detailed_message["error_details"] = str(result.error)
else:
self.logger.info(f"Successfully processed {context.object_key}")
index_object_db.last_finished_time = datetime.datetime.now(datetime.timezone.utc)
detailed_message["end_time"] = datetime.datetime.now(datetime.timezone.utc)
index_object_db.detailed_message = json.dumps(detailed_message, default=custom_serializer, ensure_ascii=False)
session.commit()
# If processing failed, raise exception to trigger retry mechanism
if result.status == IndexObjectStatus.FAILED:
raise Exception(result.message)
return result
except Exception as e:
# Handle exceptions - only update database in case of unexpected exceptions
# Business logic failures are already handled above
self.logger.error(f"Error processing {context.object_key}: {e}")
raise
finally:
session.close()