from enum import Enum from abc import ABC, abstractmethod from typing import Optional, Dict, Any import datetime from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text from sqlalchemy.orm import Mapped, declarative_base, mapped_column Base = declarative_base() class IndexJobStatus(Enum): """Enumeration for index job status""" PENDING = 'pending' # todo PROCESSING = 'processing' SUCCESS = 'success' PARTIAL_SUCCESS = 'partial_success' FAILED = 'failed' class IndexObjectStatus(Enum): """Enumeration for index object status""" SUCCESS = 'success' PROCESSING = 'processing' FAILED = 'failed' class IndexJob(Base): # type: ignore """Index job model, represents a single index run""" __tablename__ = 'index_run' id = Column(Integer, primary_key=True, autoincrement=True) start_time = Column(DateTime, nullable=True) finished_time = Column(DateTime) status: Mapped[str] = mapped_column(String(20), default=IndexJobStatus.PENDING.value) detailed_message = Column(Text,nullable=True) doc_lower_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) doc_upper_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) metadata_lower_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) metadata_upper_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) total_process_count = Column(Integer) success_object_count = Column(Integer, default=0) failed_object_count = Column(Integer, default=0) datasource_name: Mapped[str] = mapped_column(String(255), nullable=False) class IndexObject(Base): """Index object model, represents a document or metadata file to be processed""" __tablename__ = 'index_object' object_key: Mapped[str] = mapped_column(String(255), primary_key=True) type = Column(String(20), nullable=False) doc_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) metadata_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) status: Mapped[str] = mapped_column(String(20), default=IndexObjectStatus.PROCESSING.value) try_count: Mapped[int] = mapped_column(Integer, default=0) last_run_id = Column(Integer) last_start_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) last_finished_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) detailed_message: Mapped[str] = mapped_column(Text,nullable=True) last_fail_doc_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) last_fail_metadata_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime) datasource_name: Mapped[str] = mapped_column(String(255), primary_key=True) def init_database(database_uri: str = '') -> Any: engine = create_engine(database_uri) Base.metadata.create_all(engine) return engine class DatabaseInterface(ABC): """Database interface for the refactored system""" @abstractmethod def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]: """Get index object by key""" pass @abstractmethod def save_index_object(self, index_object: IndexObject) -> None: """Save index object""" pass @abstractmethod def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None: """Update processing status""" pass class InMemoryDatabase(DatabaseInterface): """In-memory database implementation for testing""" def __init__(self): self._objects: Dict[str, IndexObject] = {} def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]: """Get index object by key""" return self._objects.get(object_key) def save_index_object(self, index_object: IndexObject) -> None: """Save index object""" index_object.updated_at = datetime.datetime.now() if index_object.created_at is None: index_object.created_at = datetime.datetime.now() self._objects[index_object.object_key] = index_object def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None: """Update processing status""" if object_key in self._objects: self._objects[object_key].status = status self._objects[object_key].error_message = message self._objects[object_key].updated_at = datetime.datetime.now() else: # Create new object if it doesn't exist obj = IndexObject( object_key=object_key, status=status, error_message=message, created_at=datetime.datetime.now(), updated_at=datetime.datetime.now() ) self._objects[object_key] = obj class LegacyDatabaseAdapter(DatabaseInterface): """Adapter to bridge the old database module with the new interface""" def __init__(self, database_engine): self.database_engine = database_engine self._session_factory = None def _get_session_factory(self): """Get session factory (lazy initialization)""" if self._session_factory is None: from sqlalchemy.orm import sessionmaker self._session_factory = sessionmaker(bind=self.database_engine) return self._session_factory def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]: """Get index object by key""" session_factory = self._get_session_factory() with session_factory() as session: return session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name}) def save_index_object(self, index_object: IndexObject) -> None: """Save index object""" object_key = index_object.object_key datasource_name = index_object.datasource_name session_factory = self._get_session_factory() with session_factory() as session: old_obj = session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name}) if old_obj: # Update existing old_obj.doc_modifed_time = index_object.doc_modifed_time old_obj.metadata_modifed_time = index_object.metadata_modifed_time old_obj.try_count = index_object.try_count old_obj.status = index_object.status old_obj.last_fail_doc_modifed_time = index_object.last_fail_doc_modifed_time old_obj.last_fail_metadata_modifed_time = index_object.last_fail_metadata_modifed_time old_obj.datasource_name = index_object.datasource_name # Note: legacy IndexObject might not have all fields else: # Create new old_obj = IndexObject( object_key=index_object.object_key, type=index_object.type, doc_modifed_time=index_object.doc_modifed_time, metadata_modifed_time=index_object.metadata_modifed_time, try_count=index_object.try_count, status=index_object.status, last_fail_doc_modifed_time=index_object.last_fail_doc_modifed_time, last_fail_metadata_modifed_time=index_object.last_fail_metadata_modifed_time, datasource_name=index_object.datasource_name ) session.add(old_obj) session.commit() def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None: """Update processing status""" session_factory = self._get_session_factory() with session_factory() as session: old_obj = session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name}) if old_obj: old_obj.status = status.value old_obj.detailed_message = message session.commit()