Files
2025-09-26 17:15:54 +08:00

190 lines
8.2 KiB
Python

from enum import Enum
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
Base = declarative_base()
class IndexJobStatus(Enum):
"""Enumeration for index job status"""
PENDING = 'pending' # todo
PROCESSING = 'processing'
SUCCESS = 'success'
PARTIAL_SUCCESS = 'partial_success'
FAILED = 'failed'
class IndexObjectStatus(Enum):
"""Enumeration for index object status"""
SUCCESS = 'success'
PROCESSING = 'processing'
FAILED = 'failed'
class IndexJob(Base): # type: ignore
"""Index job model, represents a single index run"""
__tablename__ = 'index_run'
id = Column(Integer, primary_key=True, autoincrement=True)
start_time = Column(DateTime, nullable=True)
finished_time = Column(DateTime)
status: Mapped[str] = mapped_column(String(20), default=IndexJobStatus.PENDING.value)
detailed_message = Column(Text,nullable=True)
doc_lower_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
doc_upper_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
metadata_lower_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
metadata_upper_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
total_process_count = Column(Integer)
success_object_count = Column(Integer, default=0)
failed_object_count = Column(Integer, default=0)
datasource_name: Mapped[str] = mapped_column(String(255), nullable=False)
class IndexObject(Base):
"""Index object model, represents a document or metadata file to be processed"""
__tablename__ = 'index_object'
object_key: Mapped[str] = mapped_column(String(255), primary_key=True)
type = Column(String(20), nullable=False)
doc_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
metadata_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
status: Mapped[str] = mapped_column(String(20), default=IndexObjectStatus.PROCESSING.value)
try_count: Mapped[int] = mapped_column(Integer, default=0)
last_run_id = Column(Integer)
last_start_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
last_finished_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
detailed_message: Mapped[str] = mapped_column(Text,nullable=True)
last_fail_doc_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
last_fail_metadata_modifed_time:Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
datasource_name: Mapped[str] = mapped_column(String(255), primary_key=True)
def init_database(database_uri: str = '') -> Any:
engine = create_engine(database_uri)
Base.metadata.create_all(engine)
return engine
class DatabaseInterface(ABC):
"""Database interface for the refactored system"""
@abstractmethod
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
"""Get index object by key"""
pass
@abstractmethod
def save_index_object(self, index_object: IndexObject) -> None:
"""Save index object"""
pass
@abstractmethod
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus,
message: str = None) -> None:
"""Update processing status"""
pass
class InMemoryDatabase(DatabaseInterface):
"""In-memory database implementation for testing"""
def __init__(self):
self._objects: Dict[str, IndexObject] = {}
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
"""Get index object by key"""
return self._objects.get(object_key)
def save_index_object(self, index_object: IndexObject) -> None:
"""Save index object"""
index_object.updated_at = datetime.datetime.now()
if index_object.created_at is None:
index_object.created_at = datetime.datetime.now()
self._objects[index_object.object_key] = index_object
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus,
message: str = None) -> None:
"""Update processing status"""
if object_key in self._objects:
self._objects[object_key].status = status
self._objects[object_key].error_message = message
self._objects[object_key].updated_at = datetime.datetime.now()
else:
# Create new object if it doesn't exist
obj = IndexObject(
object_key=object_key,
status=status,
error_message=message,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
self._objects[object_key] = obj
class LegacyDatabaseAdapter(DatabaseInterface):
"""Adapter to bridge the old database module with the new interface"""
def __init__(self, database_engine):
self.database_engine = database_engine
self._session_factory = None
def _get_session_factory(self):
"""Get session factory (lazy initialization)"""
if self._session_factory is None:
from sqlalchemy.orm import sessionmaker
self._session_factory = sessionmaker(bind=self.database_engine)
return self._session_factory
def get_index_object(self, object_key: str,datasource_name:str) -> Optional[IndexObject]:
"""Get index object by key"""
session_factory = self._get_session_factory()
with session_factory() as session:
return session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name})
def save_index_object(self, index_object: IndexObject) -> None:
"""Save index object"""
object_key = index_object.object_key
datasource_name = index_object.datasource_name
session_factory = self._get_session_factory()
with session_factory() as session:
old_obj = session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name})
if old_obj:
# Update existing
old_obj.doc_modifed_time = index_object.doc_modifed_time
old_obj.metadata_modifed_time = index_object.metadata_modifed_time
old_obj.try_count = index_object.try_count
old_obj.status = index_object.status
old_obj.last_fail_doc_modifed_time = index_object.last_fail_doc_modifed_time
old_obj.last_fail_metadata_modifed_time = index_object.last_fail_metadata_modifed_time
old_obj.datasource_name = index_object.datasource_name
# Note: legacy IndexObject might not have all fields
else:
# Create new
old_obj = IndexObject(
object_key=index_object.object_key,
type=index_object.type,
doc_modifed_time=index_object.doc_modifed_time,
metadata_modifed_time=index_object.metadata_modifed_time,
try_count=index_object.try_count,
status=index_object.status,
last_fail_doc_modifed_time=index_object.last_fail_doc_modifed_time,
last_fail_metadata_modifed_time=index_object.last_fail_metadata_modifed_time,
datasource_name=index_object.datasource_name
)
session.add(old_obj)
session.commit()
def update_processing_status(self, object_key: str,datasource_name:str, status: IndexObjectStatus, message: str = None) -> None:
"""Update processing status"""
session_factory = self._get_session_factory()
with session_factory() as session:
old_obj = session.query(IndexObject).get({"object_key":object_key,"datasource_name":datasource_name})
if old_obj:
old_obj.status = status.value
old_obj.detailed_message = message
session.commit()