190 lines
8.2 KiB
Python
190 lines
8.2 KiB
Python
from enum import Enum
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional, Dict, Any
|
|
import datetime
|
|
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
|
|
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
|
|
|
|
Base = declarative_base()
|
|
|
|
class IndexJobStatus(Enum):
|
|
"""Enumeration for index job status"""
|
|
PENDING = 'pending' # todo
|
|
PROCESSING = 'processing'
|
|
SUCCESS = 'success'
|
|
PARTIAL_SUCCESS = 'partial_success'
|
|
FAILED = 'failed'
|
|
|
|
class IndexObjectStatus(Enum):
|
|
"""Enumeration for index object status"""
|
|
SUCCESS = 'success'
|
|
PROCESSING = 'processing'
|
|
FAILED = 'failed'
|
|
|
|
class IndexJob(Base): # type: ignore
|
|
"""Index job model, represents a single index run"""
|
|
__tablename__ = 'index_run'
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
start_time = Column(DateTime, nullable=True)
|
|
finished_time = Column(DateTime)
|
|
status: Mapped[str] = mapped_column(String(20), default=IndexJobStatus.PENDING.value)
|
|
detailed_message = Column(Text,nullable=True)
|
|
doc_lower_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
doc_upper_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
metadata_lower_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
metadata_upper_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
total_process_count = Column(Integer)
|
|
success_object_count = Column(Integer, default=0)
|
|
failed_object_count = Column(Integer, default=0)
|
|
datasource_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
|
|
class IndexObject(Base):
|
|
"""Index object model, represents a document or metadata file to be processed"""
|
|
__tablename__ = 'index_object'
|
|
object_key: Mapped[str] = mapped_column(String(255), primary_key=True)
|
|
type = Column(String(20), nullable=False)
|
|
doc_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
metadata_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
status: Mapped[str] = mapped_column(String(20), default=IndexObjectStatus.PROCESSING.value)
|
|
try_count: Mapped[int] = mapped_column(Integer, default=0)
|
|
last_run_id = Column(Integer)
|
|
last_start_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
last_finished_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
detailed_message: Mapped[str] = mapped_column(Text,nullable=True)
|
|
last_fail_doc_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
last_fail_metadata_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
|
|
datasource_name: Mapped[str] = mapped_column(String(255), primary_key=True)
|
|
|
|
def init_database(database_uri: str = '') -> Any:
|
|
engine = create_engine(database_uri)
|
|
Base.metadata.create_all(engine)
|
|
return engine
|
|
|
|
|
|
|
|
class DatabaseInterface(ABC):
|
|
"""Database interface for the refactored system"""
|
|
|
|
@abstractmethod
|
|
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
|
|
"""Get index object by key"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def save_index_object(self, index_object: IndexObject) -> None:
|
|
"""Save index object"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus,
|
|
message: str = None) -> None:
|
|
"""Update processing status"""
|
|
pass
|
|
|
|
|
|
class InMemoryDatabase(DatabaseInterface):
|
|
"""In-memory database implementation for testing"""
|
|
|
|
def __init__(self):
|
|
self._objects: Dict[str, IndexObject] = {}
|
|
|
|
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
|
|
"""Get index object by key"""
|
|
return self._objects.get(object_key)
|
|
|
|
def save_index_object(self, index_object: IndexObject) -> None:
|
|
"""Save index object"""
|
|
index_object.updated_at = datetime.datetime.now()
|
|
if index_object.created_at is None:
|
|
index_object.created_at = datetime.datetime.now()
|
|
self._objects[index_object.object_key] = index_object
|
|
|
|
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus,
|
|
message: str = None) -> None:
|
|
"""Update processing status"""
|
|
if object_key in self._objects:
|
|
self._objects[object_key].status = status
|
|
self._objects[object_key].error_message = message
|
|
self._objects[object_key].updated_at = datetime.datetime.now()
|
|
else:
|
|
# Create new object if it doesn't exist
|
|
obj = IndexObject(
|
|
object_key=object_key,
|
|
status=status,
|
|
error_message=message,
|
|
created_at=datetime.datetime.now(),
|
|
updated_at=datetime.datetime.now()
|
|
)
|
|
self._objects[object_key] = obj
|
|
|
|
|
|
class LegacyDatabaseAdapter(DatabaseInterface):
|
|
"""Adapter to bridge the old database module with the new interface"""
|
|
|
|
def __init__(self, database_engine):
|
|
self.database_engine = database_engine
|
|
self._session_factory = None
|
|
|
|
def _get_session_factory(self):
|
|
"""Get session factory (lazy initialization)"""
|
|
if self._session_factory is None:
|
|
from sqlalchemy.orm import sessionmaker
|
|
self._session_factory = sessionmaker(bind=self.database_engine)
|
|
return self._session_factory
|
|
|
|
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
|
|
"""Get index object by key"""
|
|
|
|
session_factory = self._get_session_factory()
|
|
with session_factory() as session:
|
|
return session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name})
|
|
|
|
|
|
def save_index_object(self, index_object: IndexObject) -> None:
|
|
"""Save index object"""
|
|
object_key = index_object.object_key
|
|
datasource_name = index_object.datasource_name
|
|
|
|
session_factory = self._get_session_factory()
|
|
with session_factory() as session:
|
|
old_obj = session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name})
|
|
if old_obj:
|
|
# Update existing
|
|
old_obj.doc_modifed_time = index_object.doc_modifed_time
|
|
old_obj.metadata_modifed_time = index_object.metadata_modifed_time
|
|
old_obj.try_count = index_object.try_count
|
|
old_obj.status = index_object.status
|
|
old_obj.last_fail_doc_modifed_time = index_object.last_fail_doc_modifed_time
|
|
old_obj.last_fail_metadata_modifed_time = index_object.last_fail_metadata_modifed_time
|
|
old_obj.datasource_name = index_object.datasource_name
|
|
# Note: legacy IndexObject might not have all fields
|
|
else:
|
|
# Create new
|
|
old_obj = IndexObject(
|
|
object_key=index_object.object_key,
|
|
type=index_object.type,
|
|
doc_modifed_time=index_object.doc_modifed_time,
|
|
metadata_modifed_time=index_object.metadata_modifed_time,
|
|
try_count=index_object.try_count,
|
|
status=index_object.status,
|
|
last_fail_doc_modifed_time=index_object.last_fail_doc_modifed_time,
|
|
last_fail_metadata_modifed_time=index_object.last_fail_metadata_modifed_time,
|
|
datasource_name=index_object.datasource_name
|
|
)
|
|
|
|
session.add(old_obj)
|
|
session.commit()
|
|
|
|
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None:
|
|
"""Update processing status"""
|
|
|
|
session_factory = self._get_session_factory()
|
|
with session_factory() as session:
|
|
old_obj = session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name})
|
|
if old_obj:
|
|
old_obj.status = status.value
|
|
old_obj.detailed_message = message
|
|
session.commit()
|
|
|
|
|