Files
AIRegulation-DocAnalysis/backend/app/domain/documents/ports.py

239 lines
6.8 KiB
Python
Raw Normal View History

"""Define domain ports for documents."""
from __future__ import annotations
from abc import ABC, abstractmethod
2026-05-26 12:34:12 +08:00
from .models import Chunk, Document, DocumentArtifact, DocumentProcessingRun, DocumentStatus, DocumentStatusEvent, ParsedDocument
# Keep domain contracts explicit so adapters can swap implementations cleanly.
class DocumentRepository(ABC):
"""Provide the Document Repository repository implementation."""
@abstractmethod
def create(self, document: Document) -> Document:
"""Handle create for the Document Repository instance."""
pass
@abstractmethod
def update(self, document: Document) -> Document:
"""Handle update for the Document Repository instance."""
pass
@abstractmethod
def get(self, doc_id: str) -> Document | None:
"""Handle get for the Document Repository instance."""
pass
@abstractmethod
def list(self, limit: int | None = None) -> list[Document]:
"""Handle list for the Document Repository instance."""
pass
@abstractmethod
def delete(self, doc_id: str) -> bool:
"""Delete a document record. Returns True if deleted, False if not found."""
pass
@abstractmethod
def update_status(
self,
doc_id: str,
status: DocumentStatus,
*,
error_message: str = "",
chunk_count: int | None = None,
summary: str | None = None,
summary_latency_ms: int | None = None,
parser_name: str | None = None,
index_name: str | None = None,
metadata: dict | None = None,
) -> Document | None:
"""Update status for the Document Repository instance."""
pass
class DocumentBinaryStore(ABC):
"""Provide the Document Binary Store store implementation."""
@abstractmethod
def save(
self,
*,
object_name: str,
data: bytes,
content_type: str,
metadata: dict[str, str] | None = None,
) -> None:
"""Handle save for the Document Binary Store instance."""
pass
@abstractmethod
def read(self, object_name: str) -> bytes:
"""Handle read for the Document Binary Store instance."""
pass
@abstractmethod
def delete(self, object_name: str) -> None:
"""Handle delete for the Document Binary Store instance."""
pass
class DocumentParser(ABC):
"""Provide the Document Parser parser."""
@abstractmethod
def parse(self, *, file_path: str, doc_id: str, doc_name: str) -> ParsedDocument:
"""Handle parse for the Document Parser instance."""
pass
class ChunkBuilder(ABC):
"""Provide the Chunk Builder builder."""
@abstractmethod
def build(
self,
*,
parsed_document: ParsedDocument,
regulation_type: str,
version: str,
) -> list[Chunk]:
"""Handle build for the Chunk Builder instance."""
pass
class ParseArtifactStore(ABC):
"""Persist parse artifacts (structure nodes and semantic blocks) for relational queries."""
@abstractmethod
def save(
self,
doc_id: str,
structure_nodes: list[dict],
semantic_blocks: list[dict],
) -> None:
"""Persist structure nodes and semantic blocks for a document."""
pass
@abstractmethod
def delete(self, doc_id: str) -> None:
"""Remove all parse artifacts for a document."""
pass
@abstractmethod
def get_semantic_blocks(self, doc_id: str) -> list[dict]:
"""Return all semantic blocks for a document."""
pass
@abstractmethod
def get_structure_nodes(self, doc_id: str) -> list[dict]:
"""Return all structure nodes for a document."""
pass
2026-05-26 12:34:12 +08:00
class DocumentProcessingStore(ABC):
"""Persist document processing runs, events, and artifact references."""
@abstractmethod
def create_run(self, run: DocumentProcessingRun) -> DocumentProcessingRun:
"""Create a new processing run record."""
pass
@abstractmethod
def mark_run_stored(
self,
run_id: str,
*,
stored_at: object | None = None,
metadata: dict | None = None,
) -> DocumentProcessingRun | None:
"""Mark a run as having persisted the source file."""
pass
@abstractmethod
def mark_run_parsed(
self,
run_id: str,
*,
parser_backend: str,
layout_count: int,
structure_node_count: int,
semantic_block_count: int,
vector_chunk_count: int,
parsed_at: object | None = None,
metadata: dict | None = None,
) -> DocumentProcessingRun | None:
"""Record parse completion details for a run."""
pass
@abstractmethod
def mark_run_indexed(
self,
run_id: str,
*,
chunk_count: int,
index_name: str,
indexed_at: object | None = None,
finished_at: object | None = None,
metadata: dict | None = None,
) -> DocumentProcessingRun | None:
"""Mark a run as successfully indexed."""
pass
@abstractmethod
def mark_run_failed(
self,
run_id: str,
*,
failure_stage: str,
error_message: str,
finished_at: object | None = None,
metadata: dict | None = None,
) -> DocumentProcessingRun | None:
"""Mark a run as failed."""
pass
@abstractmethod
def append_status_event(self, event: DocumentStatusEvent) -> DocumentStatusEvent:
"""Append a document status event."""
pass
@abstractmethod
def replace_artifacts_for_run(self, run_id: str, artifacts: list[DocumentArtifact]) -> list[DocumentArtifact]:
"""Replace all artifacts for a run with the provided list."""
pass
@abstractmethod
def delete_by_document(self, doc_id: str) -> None:
"""Delete all processing data for a document."""
pass
@abstractmethod
def list_runs_by_document(self, doc_id: str) -> list[DocumentProcessingRun]:
"""List all processing runs for a document."""
pass
@abstractmethod
def get_run(self, run_id: str) -> DocumentProcessingRun | None:
"""Return one processing run by identifier."""
pass
@abstractmethod
def list_status_events_by_document(self, doc_id: str) -> list[DocumentStatusEvent]:
"""List status events for a document."""
pass
@abstractmethod
def list_status_events_by_run(self, run_id: str) -> list[DocumentStatusEvent]:
"""List status events for a run."""
pass
@abstractmethod
def list_artifacts_by_document(self, doc_id: str) -> list[DocumentArtifact]:
"""List artifact references for a document."""
pass
@abstractmethod
def list_artifacts_by_run(self, run_id: str) -> list[DocumentArtifact]:
"""List artifact references for a run."""
pass