"""Main application entry point for document processing.""" import asyncio import json import logging import sys import os import traceback from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from dataclasses import dataclass import argparse import datetime from sqlalchemy import and_ from sqlalchemy.orm import sessionmaker from app_config import ApplicationConfig, ServiceFactory from business_layer import ProcessingContext from document_task_processor import DocumentTaskProcessor from task_processor import ProcessingStats, Task, TaskProcessor from database import init_database,IndexObject,IndexJob from utils import custom_serializer, init_current_data_directory,max_datetime_safe, min_datetime_safe from blob_service import check_files, check_meta,load_metadata from azure_index_service import index_init @dataclass class ApplicationContext: """Application context.""" config: ApplicationConfig service_factory: ServiceFactory database_engine: Any logger: logging.Logger class DocumentProcessingApplication: """Main class for document processing application.""" def __init__(self, config_path: str, env_path: str = "env.yaml"): self.config_path = config_path self.env_path = env_path self.context: ApplicationContext = None # type: ignore self.logger = logging.getLogger(__name__) self.console_logger = logging.getLogger("data_preparation") async def initialize(self): """Initialize the application.""" try: # Load config - load environment and business config separately config = ApplicationConfig.from_env_and_config_files(config_yaml_path=self.config_path, env_yaml_path=self.env_path) config.validate() # Set up logging self._setup_app_logging() # Create service factory service_factory = ServiceFactory(config) # Initialize database (create tables) database_engine = init_database(config.database.uri) self.logger.info("Database initialized successfully") # Validate database engine service_engine = service_factory.get_database_engine() if database_engine.url != service_engine.url: self.logger.warning("Database engines have different URLs, using init_database result") database_engine = service_engine # Create application context self.context = ApplicationContext(config=config, service_factory=service_factory, database_engine=database_engine, logger=self.logger) # Initialize task processor self._initialize_task_processor() self.console_logger.info("Application initialized successfully") except Exception as e: self.logger.error(f"Failed to initialize application: {e}") raise def _setup_app_logging(self): self.console_logger.handlers = [] self.console_logger.setLevel(logging.DEBUG) self.console_logger.propagate = False # Console output - only show progress and key info console_handler = logging.StreamHandler(sys.stdout) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_formatter) console_handler.setLevel(logging.DEBUG) self.console_logger.addHandler(console_handler) def _setup_logging(self, log_file: str = '~'): """Set up logging configuration.""" root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # Remove existing handlers for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) file_path = f"{log_file}/.chunked/.run.log" # File output - log all details os.makedirs(os.path.dirname(file_path), exist_ok=True) file_handler = logging.FileHandler(file_path, encoding='utf-8') file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) file_handler.setLevel(logging.INFO) root_logger.addHandler(file_handler) self.console_logger.addHandler(file_handler) async def _initialize_datasource(self, data_config: Dict[str, Any]) -> Dict[str, Any]: """Initialize datasource.""" try: self.console_logger.info("Loading metadata from blob storage...") sorted_list = await asyncio.to_thread(load_metadata, data_config["data_path"], self.context.config.current_tmp_directory, data_config["data_dir"]) doc_metadata_map: dict[str, dict[str, Any]] = {} for item in sorted_list: key = item["filepath"] # Assume there is a timestamp field, keep the latest if key not in doc_metadata_map or item.get("timestamp", 0) > doc_metadata_map[key].get("timestamp", 0): doc_metadata_map[key] = item datasource = {"metadata": doc_metadata_map} self.console_logger.info(f"Loaded {len(doc_metadata_map)} metadata entries") return datasource except Exception as e: self.logger.error(f"Error initializing datasource: {e}") raise def _initialize_task_processor(self): """Initialize task processor (basic init only).""" if not self.context: raise RuntimeError("Application context not initialized") # Basic task processor config, actual processor will be created per data config self.logger.info("Task processor configuration initialized") async def run(self): """Run the application.""" if not self.context: raise RuntimeError("Application not initialized") try: self.console_logger.info("Starting document processing application") for i, data_config in enumerate(self.context.config.data_configs, 1): self.console_logger.info(f"Processing data source {i}/{len(self.context.config.data_configs)}") await self._process_data_config(data_config) self.console_logger.info("Document processing application completed") except Exception as e: self.logger.error(f"Application error: {e}") raise async def _process_data_config(self, data_config: Dict[str, Any]): """Process a single data config.""" data_path = data_config.get('data_path', '/') self.console_logger.info(f"Processing data source: {data_path}") if not self.context: raise RuntimeError("Application context not initialized") try: base_path: str = data_config.get('base_path', '') self.context.config.current_tmp_directory = init_current_data_directory(base_path) self._setup_logging(self.context.config.current_tmp_directory) # 1. Initialize datasource (load metadata) datasource = await self._initialize_datasource(data_config) # 2. Get objects to process objects_to_process = await self._get_objects_to_process(data_config) if not objects_to_process: self.console_logger.info("No new documents to process") return self.console_logger.info(f"Found {len(objects_to_process)} documents to process") # 3. Initialize search index schema (ensure search index is created and configured) await self._initialize_search_index(data_config, self.context.config) # 4. Create task processor with datasource task_processor_impl = DocumentTaskProcessor(config=self.context.config, service_factory=self.context.service_factory, tmp_directory=self.context.config.current_tmp_directory, database_engine=self.context.database_engine, logger=self.logger, datasource=datasource,data_config=data_config) # 5. Task processor simple_processor = TaskProcessor(task_processor=task_processor_impl, max_workers=self.context.config.processing.max_workers, logger=self.console_logger, database_engine=self.context.database_engine,data_config=data_config) # Create tasks tasks = self._create_tasks(objects_to_process, data_config,self.context.config) self.console_logger.info(f"Starting processing of {len(tasks)} tasks") # Synchronously process all tasks await asyncio.to_thread(simple_processor.process_tasks, tasks) # Get processing stats stats = ProcessingStats(total_tasks=simple_processor.total_tasks, completed_tasks=simple_processor.completed_tasks, failed_tasks=simple_processor.failed_tasks, start_time=simple_processor.start_time or datetime.datetime.now()) self.console_logger.info(json.dumps(stats, ensure_ascii=False, default=custom_serializer)) # Update job status datasource_name = data_config.get("datasource_name", "default") await self._update_index_job_status(stats, datasource_name) except Exception as e: self.console_logger.error(f"Error processing data config: {traceback.format_exc()}") self.console_logger.error(f"Error processing data config: {str(e)}") raise async def _get_objects_to_process(self, data_config: Dict[str, Any]) -> List[IndexObject]: """Get objects to process.""" try: # 1. Get last successful processing time from database datasource_name = data_config.get("datasource_name", "default") Session = sessionmaker(bind=self.context.database_engine) session = Session() try: last_success_doc_job = session.query(IndexJob).filter( and_( IndexJob.status == "success", IndexJob.doc_upper_time.is_not(None), IndexJob.datasource_name == datasource_name ) ).order_by(IndexJob.id.desc()).first() last_success_meta_job = session.query(IndexJob).filter( and_( IndexJob.status == "success", IndexJob.metadata_upper_time.is_not(None), IndexJob.datasource_name == datasource_name ) ).order_by(IndexJob.id.desc()).first() doc_upper_time = last_success_doc_job.doc_upper_time if last_success_doc_job and last_success_doc_job.doc_upper_time else None metadata_upper_time = last_success_meta_job.metadata_upper_time if last_success_meta_job and last_success_meta_job.metadata_upper_time else None self.console_logger.info(f"Checking for updates in datasource '{datasource_name}' since doc: {doc_upper_time}, metadata: {metadata_upper_time}") finally: session.close() # 2. Check file updates (only get files updated after baseline) new_files = await asyncio.to_thread(check_files, data_config["data_path"], doc_upper_time) # 3. Check metadata updates (only get metadata updated after baseline) new_metas:list[dict[Any, Any]] = await asyncio.to_thread(check_meta, data_config["data_path"], metadata_upper_time, self.context.config.current_tmp_directory, data_config["data_dir"]) self.console_logger.info(f"Found {len(new_files)} updated files and {len(new_metas)} updated metadata entries") # Crop new_metas and new_files, and only get 100 corresponding to new_metas and new_files. According to the name field, according to process_file_num: 100. If the name of new_files is not directly removed in new_metas if data_config["process_file_num"]>0: new_files = [file_info for file_info in new_files if file_info["name"] in {meta["name"] for meta in new_metas}] if len(new_files) > data_config["process_file_num"]: new_files = new_files[:data_config["process_file_num"]] # Filter new_metas according to the latest number of new_files new_metas = [meta_info for meta_info in new_metas if meta_info["name"] in {file_info["name"] for file_info in new_files}] self.console_logger.info(f"After filtering, {len(new_files)} files and {len(new_metas)} metadata entries to process") # 4. Merge file and metadata info, create processing objects objects_to_process:list[IndexObject] = [] for file_info in new_files: index_object = IndexObject(object_key=file_info["name"], type="document", doc_modifed_time=file_info.get("doc_upper_time")) objects_to_process.append(index_object) for meta_info in new_metas: existing_obj = next((obj for obj in objects_to_process if obj.object_key == meta_info["name"]), None) if existing_obj: existing_obj.metadata_modifed_time = meta_info.get("meta_upper_time") else: index_object = IndexObject(object_key=meta_info["name"], type="document", metadata_modifed_time=meta_info.get("meta_upper_time")) objects_to_process.append(index_object) # 5. If there are objects to process, create a new job record if objects_to_process: await self._create_index_job(objects_to_process, data_config.get("datasource_name", "default")) return objects_to_process except Exception as e: self.logger.error(f"Error getting objects to process: {e}") raise async def _create_index_job(self, objects_to_process: List[IndexObject], datasource_name: str): """Create index job record.""" try: Session = sessionmaker(bind=self.context.database_engine) session = Session() try: index_job_db = IndexJob( start_time=datetime.datetime.now(datetime.timezone.utc), status="processing", total_process_count=len(objects_to_process), datasource_name=datasource_name ) for index_object in objects_to_process: index_job_db.doc_upper_time = max_datetime_safe(index_object.doc_modifed_time, index_job_db.doc_upper_time) index_job_db.doc_lower_time = min_datetime_safe(index_object.doc_modifed_time, index_job_db.doc_lower_time) index_job_db.metadata_upper_time = max_datetime_safe(index_object.metadata_modifed_time, index_job_db.metadata_upper_time) index_job_db.metadata_lower_time = min_datetime_safe(index_object.metadata_modifed_time, index_job_db.metadata_lower_time) # Set datasource_name for each index object index_object.datasource_name = datasource_name session.add(index_job_db) session.commit() self.console_logger.info(f"Created processing job for {len(objects_to_process)} objects in datasource: {datasource_name}") finally: session.close() except Exception as e: self.console_logger.error(f"Error creating index job: {e}") raise async def _update_index_job_status(self, stats: ProcessingStats, datasource_name: str = "default"): """Update index job status.""" try: Session = sessionmaker(bind=self.context.database_engine) session = Session() try: current_job = session.query(IndexJob).filter( and_( IndexJob.status == "processing", IndexJob.datasource_name == datasource_name ) ).order_by(IndexJob.id.desc()).first() if current_job: if stats.failed_tasks == 0 and stats.completed_tasks == stats.total_tasks: current_job.status = "success" elif stats.completed_tasks > 0 and stats.failed_tasks > 0: current_job.status = "partial_success" else: current_job.status = "failed" current_job.end_time = datetime.datetime.now(datetime.timezone.utc) current_job.success_count = stats.completed_tasks current_job.failed_count = stats.failed_tasks session.commit() self.console_logger.info(f"Job completed for datasource '{datasource_name}': {current_job.status}") finally: session.close() except Exception as e: self.console_logger.error(f"Error updating job status: {e}") def _create_tasks(self, objects: List[IndexObject], data_config: Dict[str, Any], config: ApplicationConfig) -> List[Task]: """Create task list.""" tasks:list[Task] = [] datasource_name = data_config.get("datasource_name", "default") for obj in objects: context = ProcessingContext( object_key=obj.object_key, data_config=data_config, metadata={ "doc_modified_time": obj.doc_modifed_time, "metadata_modified_time": obj.metadata_modifed_time }, current_tmp_directory=self.context.config.current_tmp_directory, datasource_name=datasource_name, config=config ) task = Task(id = obj.object_key , payload=context, priority=0) tasks.append(task) return tasks async def shutdown(self): """Shutdown application.""" self.console_logger.info("Application shutdown completed") @asynccontextmanager async def application_context(self): """Application context manager.""" await self.initialize() try: yield self finally: await self.shutdown() async def _initialize_search_index(self, data_config: Dict[str, Any],applicationconfig: ApplicationConfig): """Initialize search index schema, ensure search index is created and configured.""" try: self.console_logger.info("Initializing search index schema...") await asyncio.to_thread(index_init, data_config, applicationconfig.azure_services.search_admin_key, applicationconfig.azure_services.search_service_name) self.console_logger.info("Search index schema initialized successfully") except Exception as e: self.console_logger.error(f"Error initializing search index: {e}") raise async def main(): """Main function.""" parser = argparse.ArgumentParser(description="Document Processing Application (Refactored)") parser.add_argument("--config", type=str, default="config.yaml", help="Business configuration file path") parser.add_argument("--env", type=str, default="env.yaml", help="Environment variables file path") parser.add_argument("--log-level", type=str, default="INFO", help="Log level") args = parser.parse_args() app = DocumentProcessingApplication(args.config, args.env) try: async with app.application_context(): await app.run() except KeyboardInterrupt: print("Application interrupted by user") except Exception as e: print(f"Application error: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())