Files
AIRegulation-DocAnalysis/tests/test_document_processing_store.py
2026-05-26 12:34:12 +08:00

217 lines
7.8 KiB
Python

"""Test PostgreSQL-backed document processing history storage."""
from __future__ import annotations
import uuid
from datetime import UTC, datetime
import psycopg2
import pytest
from app.domain.documents import Document, DocumentArtifact, DocumentProcessingRun, DocumentStatus, DocumentStatusEvent
from app.infrastructure.storage.postgres_document_processing_store import PostgresDocumentProcessingStore
from app.infrastructure.storage.postgres_document_repository import PostgresDocumentRepository
# Keep these tests focused on relational storage behavior only.
def _build_document(doc_id: str) -> Document:
"""Create a minimal document row required by the foreign keys."""
return Document(
doc_id=doc_id,
doc_name="Processing Test",
file_name="processing-test.pdf",
object_name=f"{doc_id}/processing-test.pdf",
content_type="application/pdf",
size_bytes=128,
status=DocumentStatus.PENDING,
)
def _connectivity_ready() -> bool:
"""Return whether the configured PostgreSQL instance is reachable for integration tests."""
try:
repository = PostgresDocumentRepository()
except psycopg2.Error:
return False
try:
repository.list(limit=1)
return True
except psycopg2.Error:
return False
pytestmark = pytest.mark.skipif(not _connectivity_ready(), reason="PostgreSQL test backend is not reachable")
def test_postgres_document_processing_store_supports_full_run_lifecycle():
"""Persist run, event, and artifact history and read it back as dataclasses."""
repository = PostgresDocumentRepository()
store = PostgresDocumentProcessingStore()
doc_id = f"proc-{uuid.uuid4().hex[:10]}"
run_id = f"run-{uuid.uuid4().hex[:10]}"
base_time = datetime.now(UTC)
repository.create(_build_document(doc_id))
try:
created = store.create_run(
DocumentProcessingRun(
run_id=run_id,
doc_id=doc_id,
trigger_type="upload",
run_status="running",
parser_backend="aliyun",
chunk_backend="aliyun",
embedding_model="text-embedding-v3",
started_at=base_time,
metadata={"origin": "test"},
)
)
stored = store.mark_run_stored(run_id, stored_at=base_time, metadata={"stored": True})
parsed = store.mark_run_parsed(
run_id,
parser_backend="fake_parser",
layout_count=2,
structure_node_count=3,
semantic_block_count=4,
vector_chunk_count=5,
parsed_at=base_time,
metadata={"parse_task_id": "task-1"},
)
indexed = store.mark_run_indexed(
run_id,
chunk_count=6,
index_name="regulations_dense_1024_v1",
indexed_at=base_time,
finished_at=base_time,
metadata={"collection": "regulations_dense_1024_v1"},
)
event = store.append_status_event(
DocumentStatusEvent(
event_id=f"evt-{uuid.uuid4().hex[:10]}",
doc_id=doc_id,
run_id=run_id,
from_status="parsed",
to_status="indexed",
stage="index",
message="Indexed successfully",
metadata={"chunk_count": 6},
occurred_at=base_time,
)
)
artifacts = store.replace_artifacts_for_run(
run_id,
[
DocumentArtifact(
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
doc_id=doc_id,
run_id=run_id,
artifact_type="layouts",
object_name=f"artifacts/{doc_id}/layouts.json",
content_type="application/json",
created_at=base_time,
),
DocumentArtifact(
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
doc_id=doc_id,
run_id=run_id,
artifact_type="vector_chunks",
object_name=f"artifacts/{doc_id}/vector_chunks.json",
content_type="application/json",
created_at=base_time,
),
],
)
fetched = store.get_run(run_id)
run_rows = store.list_runs_by_document(doc_id)
event_rows = store.list_status_events_by_document(doc_id)
artifact_rows = store.list_artifacts_by_run(run_id)
assert created.run_id == run_id
assert stored is not None and stored.stored_at is not None
assert parsed is not None and parsed.parser_backend == "fake_parser"
assert indexed is not None and indexed.run_status == "succeeded"
assert fetched is not None and fetched.chunk_count == 6
assert isinstance(run_rows[0], DocumentProcessingRun)
assert isinstance(event_rows[0], DocumentStatusEvent)
assert isinstance(artifact_rows[0], DocumentArtifact)
assert event_rows[0].event_id == event.event_id
assert {artifact.artifact_type for artifact in artifacts} == {artifact.artifact_type for artifact in artifact_rows}
finally:
store.delete_by_document(doc_id)
repository.delete(doc_id)
def test_postgres_document_processing_store_replaces_artifacts_and_deletes_document_data():
"""Replace artifact rows idempotently and remove all history rows for one document."""
repository = PostgresDocumentRepository()
store = PostgresDocumentProcessingStore()
doc_id = f"proc-{uuid.uuid4().hex[:10]}"
run_id = f"run-{uuid.uuid4().hex[:10]}"
repository.create(_build_document(doc_id))
try:
store.create_run(
DocumentProcessingRun(
run_id=run_id,
doc_id=doc_id,
trigger_type="retry",
run_status="running",
)
)
first = store.replace_artifacts_for_run(
run_id,
[
DocumentArtifact(
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
doc_id=doc_id,
run_id=run_id,
artifact_type="layouts",
object_name=f"artifacts/{doc_id}/layouts-v1.json",
content_type="application/json",
)
],
)
second = store.replace_artifacts_for_run(
run_id,
[
DocumentArtifact(
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
doc_id=doc_id,
run_id=run_id,
artifact_type="layouts",
object_name=f"artifacts/{doc_id}/layouts-v2.json",
content_type="application/json",
)
],
)
store.append_status_event(
DocumentStatusEvent(
event_id=f"evt-{uuid.uuid4().hex[:10]}",
doc_id=doc_id,
run_id=run_id,
from_status="pending",
to_status="failed",
stage="parse",
message="failed",
)
)
failed = store.mark_run_failed(run_id, failure_stage="parse", error_message="boom")
artifact_rows = store.list_artifacts_by_run(run_id)
assert len(first) == 1
assert len(second) == 1
assert len(artifact_rows) == 1
assert artifact_rows[0].object_name.endswith("layouts-v2.json")
assert failed is not None and failed.run_status == "failed"
store.delete_by_document(doc_id)
assert store.list_runs_by_document(doc_id) == []
assert store.list_status_events_by_document(doc_id) == []
assert store.list_artifacts_by_document(doc_id) == []
finally:
repository.delete(doc_id)