104 lines
4.9 KiB
Python
104 lines
4.9 KiB
Python
"""
|
|
Document Task Processor
|
|
Integrates business logic and database operations
|
|
"""
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
from typing import Any, Optional
|
|
|
|
from sqlalchemy import and_
|
|
from sqlalchemy.orm import sessionmaker
|
|
from app_config import ServiceFactory
|
|
from task_processor import Task, TaskProcessorInterface
|
|
from business_layer import ApplicationConfig, DocumentProcessingFactory, ProcessingContext
|
|
from database import IndexObject, IndexObjectStatus, IndexJob
|
|
from utils import custom_serializer
|
|
|
|
|
|
class DocumentTaskProcessor(TaskProcessorInterface):
|
|
"""Document task processor"""
|
|
|
|
def __init__(self, config:ApplicationConfig, service_factory:ServiceFactory, tmp_directory:str, database_engine:Any, logger: Optional[logging.Logger] , datasource: dict[str,Any] ,data_config:dict[str,Any]):
|
|
self.config = config
|
|
self.service_factory = service_factory
|
|
self.database_engine = database_engine
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
self.datasource = datasource or {}
|
|
self.processing_factory = DocumentProcessingFactory(service_factory=service_factory, tmp_directory=tmp_directory, datasource=datasource, config=config)
|
|
self.data_config: dict[str, Any] = data_config
|
|
self.datasource_name: str = data_config.get("datasource_name", "default")
|
|
|
|
def process(self, task: Task) -> Any:
|
|
"""Process document task"""
|
|
if not isinstance(task.payload, ProcessingContext):
|
|
raise ValueError(f"Expected ProcessingContext, got {type(task.payload)}")
|
|
|
|
context = task.payload
|
|
detailed_message:dict[str,Any] = {}
|
|
detailed_message["start_time"] = datetime.datetime.now(datetime.timezone.utc)
|
|
|
|
Session = sessionmaker(bind=self.database_engine)
|
|
session = Session()
|
|
|
|
try:
|
|
# 1. Query or create IndexObject record
|
|
index_object_db = session.query(IndexObject).get({"object_key":context.object_key,"datasource_name":context.datasource_name})
|
|
if not index_object_db:
|
|
self.logger.info(f"Creating new IndexObject entry for {context.object_key}")
|
|
index_object_db = IndexObject(
|
|
object_key=context.object_key,
|
|
type="document",
|
|
status=IndexObjectStatus.PROCESSING.value,
|
|
try_count=0,
|
|
datasource_name=context.datasource_name
|
|
)
|
|
session.add(index_object_db)
|
|
session.commit()
|
|
|
|
# 2. Only update task-related fields, no longer update business fields
|
|
index_object_db.last_start_time = datetime.datetime.now(datetime.timezone.utc)
|
|
current_job = session.query(IndexJob).filter(and_(IndexJob.status == "processing",IndexJob.datasource_name== context.datasource_name)).order_by(IndexJob.id.desc()).first()
|
|
if current_job:
|
|
index_object_db.last_run_id = current_job.id
|
|
|
|
session.commit()
|
|
|
|
# 3. Execute business processing
|
|
self.logger.info(f"Processing document: {context.object_key}")
|
|
orchestrator = self.processing_factory.create_orchestrator()
|
|
result = orchestrator.process_document(context)
|
|
|
|
# 4. Only update task-related fields, no longer update business fields
|
|
detailed_message["success"] = result.status == IndexObjectStatus.SUCCESS
|
|
detailed_message["chunks_count"] = result.chunks_count
|
|
detailed_message["processing_time"] = result.processing_time
|
|
detailed_message["message"] = result.message
|
|
if result.status != IndexObjectStatus.SUCCESS:
|
|
self.logger.error(f"Failed to process {context.object_key}: {result.message}")
|
|
detailed_message["error"] = result.message
|
|
if result.error:
|
|
detailed_message["error_details"] = str(result.error)
|
|
else:
|
|
self.logger.info(f"Successfully processed {context.object_key}")
|
|
|
|
index_object_db.last_finished_time = datetime.datetime.now(datetime.timezone.utc)
|
|
detailed_message["end_time"] = datetime.datetime.now(datetime.timezone.utc)
|
|
index_object_db.detailed_message = json.dumps(detailed_message, default=custom_serializer, ensure_ascii=False)
|
|
session.commit()
|
|
|
|
# If processing failed, raise exception to trigger retry mechanism
|
|
if result.status == IndexObjectStatus.FAILED:
|
|
raise Exception(result.message)
|
|
|
|
return result
|
|
except Exception as e:
|
|
# Handle exceptions - only update database in case of unexpected exceptions
|
|
# Business logic failures are already handled above
|
|
self.logger.error(f"Error processing {context.object_key}: {e}")
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|