371 lines
19 KiB
Python
371 lines
19 KiB
Python
"""Main application entry point for document processing."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
import os
|
|
import traceback
|
|
from typing import Optional, List, Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
from dataclasses import dataclass
|
|
import argparse
|
|
import datetime
|
|
from sqlalchemy import and_
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
|
|
from app_config import ApplicationConfig, ServiceFactory
|
|
from business_layer import ProcessingContext
|
|
from document_task_processor import DocumentTaskProcessor
|
|
from task_processor import ProcessingStats, Task, TaskProcessor
|
|
|
|
from database import init_database,IndexObject,IndexJob
|
|
from utils import custom_serializer, init_current_data_directory,max_datetime_safe, min_datetime_safe
|
|
from blob_service import check_files, check_meta,load_metadata
|
|
from azure_index_service import index_init
|
|
|
|
@dataclass
|
|
class ApplicationContext:
|
|
"""Application context."""
|
|
config: ApplicationConfig
|
|
service_factory: ServiceFactory
|
|
database_engine: Any
|
|
logger: logging.Logger
|
|
|
|
class DocumentProcessingApplication:
|
|
"""Main class for document processing application."""
|
|
def __init__(self, config_path: str, env_path: str = "env.yaml"):
|
|
self.config_path = config_path
|
|
self.env_path = env_path
|
|
self.context: ApplicationContext = None # type: ignore
|
|
self.logger = logging.getLogger(__name__)
|
|
self.console_logger = logging.getLogger("data_preparation")
|
|
async def initialize(self):
|
|
"""Initialize the application."""
|
|
try:
|
|
# Load config - load environment and business config separately
|
|
config = ApplicationConfig.from_env_and_config_files(config_yaml_path=self.config_path, env_yaml_path=self.env_path)
|
|
config.validate()
|
|
# Set up logging
|
|
self._setup_app_logging()
|
|
# Create service factory
|
|
service_factory = ServiceFactory(config)
|
|
# Initialize database (create tables)
|
|
database_engine = init_database(config.database.uri)
|
|
self.logger.info("Database initialized successfully")
|
|
# Validate database engine
|
|
service_engine = service_factory.get_database_engine()
|
|
if database_engine.url != service_engine.url:
|
|
self.logger.warning("Database engines have different URLs, using init_database result")
|
|
database_engine = service_engine
|
|
# Create application context
|
|
self.context = ApplicationContext(config=config, service_factory=service_factory, database_engine=database_engine, logger=self.logger)
|
|
# Initialize task processor
|
|
self._initialize_task_processor()
|
|
|
|
self.console_logger.info("Application initialized successfully")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to initialize application: {e}")
|
|
raise
|
|
def _setup_app_logging(self):
|
|
self.console_logger.handlers = []
|
|
self.console_logger.setLevel(logging.DEBUG)
|
|
self.console_logger.propagate = False
|
|
# Console output - only show progress and key info
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
console_handler.setLevel(logging.DEBUG)
|
|
self.console_logger.addHandler(console_handler)
|
|
|
|
def _setup_logging(self, log_file: str = '~'):
|
|
"""Set up logging configuration."""
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.INFO)
|
|
# Remove existing handlers
|
|
for handler in root_logger.handlers[:]:
|
|
root_logger.removeHandler(handler)
|
|
file_path = f"{log_file}/.chunked/.run.log"
|
|
# File output - log all details
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
file_handler = logging.FileHandler(file_path, encoding='utf-8')
|
|
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_formatter)
|
|
file_handler.setLevel(logging.INFO)
|
|
root_logger.addHandler(file_handler)
|
|
self.console_logger.addHandler(file_handler)
|
|
|
|
|
|
async def _initialize_datasource(self, data_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Initialize datasource."""
|
|
try:
|
|
self.console_logger.info("Loading metadata from blob storage...")
|
|
sorted_list = await asyncio.to_thread(load_metadata, data_config["data_path"], self.context.config.current_tmp_directory, data_config["data_dir"])
|
|
doc_metadata_map: dict[str, dict[str, Any]] = {}
|
|
for item in sorted_list:
|
|
key = item["filepath"]
|
|
# Assume there is a timestamp field, keep the latest
|
|
if key not in doc_metadata_map or item.get("timestamp", 0) > doc_metadata_map[key].get("timestamp", 0):
|
|
doc_metadata_map[key] = item
|
|
datasource = {"metadata": doc_metadata_map}
|
|
self.console_logger.info(f"Loaded {len(doc_metadata_map)} metadata entries")
|
|
return datasource
|
|
except Exception as e:
|
|
self.logger.error(f"Error initializing datasource: {e}")
|
|
raise
|
|
def _initialize_task_processor(self):
|
|
"""Initialize task processor (basic init only)."""
|
|
if not self.context:
|
|
raise RuntimeError("Application context not initialized")
|
|
# Basic task processor config, actual processor will be created per data config
|
|
self.logger.info("Task processor configuration initialized")
|
|
|
|
|
|
async def run(self):
|
|
"""Run the application."""
|
|
if not self.context:
|
|
raise RuntimeError("Application not initialized")
|
|
try:
|
|
self.console_logger.info("Starting document processing application")
|
|
for i, data_config in enumerate(self.context.config.data_configs, 1):
|
|
self.console_logger.info(f"Processing data source {i}/{len(self.context.config.data_configs)}")
|
|
await self._process_data_config(data_config)
|
|
self.console_logger.info("Document processing application completed")
|
|
except Exception as e:
|
|
self.logger.error(f"Application error: {e}")
|
|
raise
|
|
|
|
async def _process_data_config(self, data_config: Dict[str, Any]):
|
|
"""Process a single data config."""
|
|
data_path = data_config.get('data_path', '/')
|
|
self.console_logger.info(f"Processing data source: {data_path}")
|
|
if not self.context:
|
|
raise RuntimeError("Application context not initialized")
|
|
try:
|
|
base_path: str = data_config.get('base_path', '')
|
|
|
|
self.context.config.current_tmp_directory = init_current_data_directory(base_path)
|
|
self._setup_logging(self.context.config.current_tmp_directory)
|
|
# 1. Initialize datasource (load metadata)
|
|
datasource = await self._initialize_datasource(data_config)
|
|
# 2. Get objects to process
|
|
objects_to_process = await self._get_objects_to_process(data_config)
|
|
if not objects_to_process:
|
|
self.console_logger.info("No new documents to process")
|
|
return
|
|
self.console_logger.info(f"Found {len(objects_to_process)} documents to process")
|
|
|
|
# 3. Initialize search index schema (ensure search index is created and configured)
|
|
await self._initialize_search_index(data_config, self.context.config)
|
|
|
|
# 4. Create task processor with datasource
|
|
task_processor_impl = DocumentTaskProcessor(config=self.context.config, service_factory=self.context.service_factory, tmp_directory=self.context.config.current_tmp_directory, database_engine=self.context.database_engine, logger=self.logger, datasource=datasource,data_config=data_config)
|
|
|
|
# 5. Task processor
|
|
simple_processor = TaskProcessor(task_processor=task_processor_impl, max_workers=self.context.config.processing.max_workers, logger=self.console_logger, database_engine=self.context.database_engine,data_config=data_config)
|
|
# Create tasks
|
|
tasks = self._create_tasks(objects_to_process, data_config,self.context.config)
|
|
self.console_logger.info(f"Starting processing of {len(tasks)} tasks")
|
|
# Synchronously process all tasks
|
|
await asyncio.to_thread(simple_processor.process_tasks, tasks)
|
|
|
|
# Get processing stats
|
|
stats = ProcessingStats(total_tasks=simple_processor.total_tasks, completed_tasks=simple_processor.completed_tasks, failed_tasks=simple_processor.failed_tasks, start_time=simple_processor.start_time or datetime.datetime.now())
|
|
self.console_logger.info(json.dumps(stats, ensure_ascii=False, default=custom_serializer))
|
|
|
|
# Update job status
|
|
datasource_name = data_config.get("datasource_name", "default")
|
|
await self._update_index_job_status(stats, datasource_name)
|
|
except Exception as e:
|
|
self.console_logger.error(f"Error processing data config: {traceback.format_exc()}")
|
|
self.console_logger.error(f"Error processing data config: {str(e)}")
|
|
raise
|
|
|
|
|
|
async def _get_objects_to_process(self, data_config: Dict[str, Any]) -> List[IndexObject]:
|
|
"""Get objects to process."""
|
|
try:
|
|
# 1. Get last successful processing time from database
|
|
datasource_name = data_config.get("datasource_name", "default")
|
|
Session = sessionmaker(bind=self.context.database_engine)
|
|
session = Session()
|
|
try:
|
|
last_success_doc_job = session.query(IndexJob).filter(
|
|
and_(
|
|
IndexJob.status == "success",
|
|
IndexJob.doc_upper_time.is_not(None),
|
|
IndexJob.datasource_name == datasource_name
|
|
)
|
|
).order_by(IndexJob.id.desc()).first()
|
|
|
|
last_success_meta_job = session.query(IndexJob).filter(
|
|
and_(
|
|
IndexJob.status == "success",
|
|
IndexJob.metadata_upper_time.is_not(None),
|
|
IndexJob.datasource_name == datasource_name
|
|
)
|
|
).order_by(IndexJob.id.desc()).first()
|
|
|
|
doc_upper_time = last_success_doc_job.doc_upper_time if last_success_doc_job and last_success_doc_job.doc_upper_time else None
|
|
metadata_upper_time = last_success_meta_job.metadata_upper_time if last_success_meta_job and last_success_meta_job.metadata_upper_time else None
|
|
self.console_logger.info(f"Checking for updates in datasource '{datasource_name}' since doc: {doc_upper_time}, metadata: {metadata_upper_time}")
|
|
finally:
|
|
session.close()
|
|
# 2. Check file updates (only get files updated after baseline)
|
|
new_files = await asyncio.to_thread(check_files, data_config["data_path"], doc_upper_time)
|
|
# 3. Check metadata updates (only get metadata updated after baseline)
|
|
new_metas:list[dict[Any, Any]] = await asyncio.to_thread(check_meta, data_config["data_path"], metadata_upper_time, self.context.config.current_tmp_directory, data_config["data_dir"])
|
|
self.console_logger.info(f"Found {len(new_files)} updated files and {len(new_metas)} updated metadata entries")
|
|
|
|
# Crop new_metas and new_files, and only get 100 corresponding to new_metas and new_files. According to the name field, according to process_file_num: 100. If the name of new_files is not directly removed in new_metas
|
|
if data_config["process_file_num"]>0:
|
|
new_files = [file_info for file_info in new_files if file_info["name"] in {meta["name"] for meta in new_metas}]
|
|
if len(new_files) > data_config["process_file_num"]:
|
|
new_files = new_files[:data_config["process_file_num"]]
|
|
# Filter new_metas according to the latest number of new_files
|
|
new_metas = [meta_info for meta_info in new_metas if meta_info["name"] in {file_info["name"] for file_info in new_files}]
|
|
|
|
self.console_logger.info(f"After filtering, {len(new_files)} files and {len(new_metas)} metadata entries to process")
|
|
|
|
# 4. Merge file and metadata info, create processing objects
|
|
objects_to_process:list[IndexObject] = []
|
|
for file_info in new_files:
|
|
index_object = IndexObject(object_key=file_info["name"], type="document", doc_modifed_time=file_info.get("doc_upper_time"))
|
|
objects_to_process.append(index_object)
|
|
for meta_info in new_metas:
|
|
existing_obj = next((obj for obj in objects_to_process if obj.object_key == meta_info["name"]), None)
|
|
if existing_obj:
|
|
existing_obj.metadata_modifed_time = meta_info.get("meta_upper_time")
|
|
else:
|
|
index_object = IndexObject(object_key=meta_info["name"], type="document", metadata_modifed_time=meta_info.get("meta_upper_time"))
|
|
objects_to_process.append(index_object)
|
|
# 5. If there are objects to process, create a new job record
|
|
if objects_to_process:
|
|
await self._create_index_job(objects_to_process, data_config.get("datasource_name", "default"))
|
|
return objects_to_process
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting objects to process: {e}")
|
|
raise
|
|
|
|
|
|
async def _create_index_job(self, objects_to_process: List[IndexObject], datasource_name: str):
|
|
"""Create index job record."""
|
|
try:
|
|
Session = sessionmaker(bind=self.context.database_engine)
|
|
session = Session()
|
|
try:
|
|
index_job_db = IndexJob(
|
|
start_time=datetime.datetime.now(datetime.timezone.utc),
|
|
status="processing",
|
|
total_process_count=len(objects_to_process),
|
|
datasource_name=datasource_name
|
|
)
|
|
for index_object in objects_to_process:
|
|
index_job_db.doc_upper_time = max_datetime_safe(index_object.doc_modifed_time, index_job_db.doc_upper_time)
|
|
index_job_db.doc_lower_time = min_datetime_safe(index_object.doc_modifed_time, index_job_db.doc_lower_time)
|
|
index_job_db.metadata_upper_time = max_datetime_safe(index_object.metadata_modifed_time, index_job_db.metadata_upper_time)
|
|
index_job_db.metadata_lower_time = min_datetime_safe(index_object.metadata_modifed_time, index_job_db.metadata_lower_time)
|
|
# Set datasource_name for each index object
|
|
index_object.datasource_name = datasource_name
|
|
session.add(index_job_db)
|
|
session.commit()
|
|
self.console_logger.info(f"Created processing job for {len(objects_to_process)} objects in datasource: {datasource_name}")
|
|
finally:
|
|
session.close()
|
|
except Exception as e:
|
|
self.console_logger.error(f"Error creating index job: {e}")
|
|
raise
|
|
|
|
|
|
async def _update_index_job_status(self, stats: ProcessingStats, datasource_name: str = "default"):
|
|
"""Update index job status."""
|
|
try:
|
|
Session = sessionmaker(bind=self.context.database_engine)
|
|
session = Session()
|
|
try:
|
|
current_job = session.query(IndexJob).filter(
|
|
and_(
|
|
IndexJob.status == "processing",
|
|
IndexJob.datasource_name == datasource_name
|
|
)
|
|
).order_by(IndexJob.id.desc()).first()
|
|
if current_job:
|
|
if stats.failed_tasks == 0 and stats.completed_tasks == stats.total_tasks:
|
|
current_job.status = "success"
|
|
elif stats.completed_tasks > 0 and stats.failed_tasks > 0:
|
|
current_job.status = "partial_success"
|
|
else:
|
|
current_job.status = "failed"
|
|
current_job.end_time = datetime.datetime.now(datetime.timezone.utc)
|
|
current_job.success_count = stats.completed_tasks
|
|
current_job.failed_count = stats.failed_tasks
|
|
session.commit()
|
|
self.console_logger.info(f"Job completed for datasource '{datasource_name}': {current_job.status}")
|
|
finally:
|
|
session.close()
|
|
except Exception as e:
|
|
self.console_logger.error(f"Error updating job status: {e}")
|
|
|
|
def _create_tasks(self, objects: List[IndexObject], data_config: Dict[str, Any], config: ApplicationConfig) -> List[Task]:
|
|
"""Create task list."""
|
|
tasks:list[Task] = []
|
|
datasource_name = data_config.get("datasource_name", "default")
|
|
for obj in objects:
|
|
context = ProcessingContext(
|
|
object_key=obj.object_key,
|
|
data_config=data_config,
|
|
metadata={
|
|
"doc_modified_time": obj.doc_modifed_time,
|
|
"metadata_modified_time": obj.metadata_modifed_time
|
|
},
|
|
current_tmp_directory=self.context.config.current_tmp_directory,
|
|
datasource_name=datasource_name,
|
|
config=config
|
|
)
|
|
task = Task(id = obj.object_key , payload=context, priority=0)
|
|
tasks.append(task)
|
|
return tasks
|
|
async def shutdown(self):
|
|
"""Shutdown application."""
|
|
self.console_logger.info("Application shutdown completed")
|
|
|
|
|
|
@asynccontextmanager
|
|
async def application_context(self):
|
|
"""Application context manager."""
|
|
await self.initialize()
|
|
try:
|
|
yield self
|
|
finally:
|
|
await self.shutdown()
|
|
async def _initialize_search_index(self, data_config: Dict[str, Any],applicationconfig: ApplicationConfig):
|
|
"""Initialize search index schema, ensure search index is created and configured."""
|
|
try:
|
|
self.console_logger.info("Initializing search index schema...")
|
|
await asyncio.to_thread(index_init, data_config, applicationconfig.azure_services.search_admin_key, applicationconfig.azure_services.search_service_name)
|
|
self.console_logger.info("Search index schema initialized successfully")
|
|
except Exception as e:
|
|
self.console_logger.error(f"Error initializing search index: {e}")
|
|
raise
|
|
|
|
async def main():
|
|
"""Main function."""
|
|
parser = argparse.ArgumentParser(description="Document Processing Application (Refactored)")
|
|
parser.add_argument("--config", type=str, default="config.yaml", help="Business configuration file path")
|
|
parser.add_argument("--env", type=str, default="env.yaml", help="Environment variables file path")
|
|
parser.add_argument("--log-level", type=str, default="INFO", help="Log level")
|
|
args = parser.parse_args()
|
|
app = DocumentProcessingApplication(args.config, args.env)
|
|
try:
|
|
async with app.application_context():
|
|
await app.run()
|
|
except KeyboardInterrupt:
|
|
print("Application interrupted by user")
|
|
except Exception as e:
|
|
print(f"Application error: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|