217 lines
7.8 KiB
Python
217 lines
7.8 KiB
Python
|
|
"""Test PostgreSQL-backed document processing history storage."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import uuid
|
||
|
|
from datetime import UTC, datetime
|
||
|
|
|
||
|
|
import psycopg2
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from app.domain.documents import Document, DocumentArtifact, DocumentProcessingRun, DocumentStatus, DocumentStatusEvent
|
||
|
|
from app.infrastructure.storage.postgres_document_processing_store import PostgresDocumentProcessingStore
|
||
|
|
from app.infrastructure.storage.postgres_document_repository import PostgresDocumentRepository
|
||
|
|
# Keep these tests focused on relational storage behavior only.
|
||
|
|
|
||
|
|
|
||
|
|
def _build_document(doc_id: str) -> Document:
|
||
|
|
"""Create a minimal document row required by the foreign keys."""
|
||
|
|
return Document(
|
||
|
|
doc_id=doc_id,
|
||
|
|
doc_name="Processing Test",
|
||
|
|
file_name="processing-test.pdf",
|
||
|
|
object_name=f"{doc_id}/processing-test.pdf",
|
||
|
|
content_type="application/pdf",
|
||
|
|
size_bytes=128,
|
||
|
|
status=DocumentStatus.PENDING,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _connectivity_ready() -> bool:
|
||
|
|
"""Return whether the configured PostgreSQL instance is reachable for integration tests."""
|
||
|
|
try:
|
||
|
|
repository = PostgresDocumentRepository()
|
||
|
|
except psycopg2.Error:
|
||
|
|
return False
|
||
|
|
try:
|
||
|
|
repository.list(limit=1)
|
||
|
|
return True
|
||
|
|
except psycopg2.Error:
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
pytestmark = pytest.mark.skipif(not _connectivity_ready(), reason="PostgreSQL test backend is not reachable")
|
||
|
|
|
||
|
|
|
||
|
|
def test_postgres_document_processing_store_supports_full_run_lifecycle():
|
||
|
|
"""Persist run, event, and artifact history and read it back as dataclasses."""
|
||
|
|
repository = PostgresDocumentRepository()
|
||
|
|
store = PostgresDocumentProcessingStore()
|
||
|
|
doc_id = f"proc-{uuid.uuid4().hex[:10]}"
|
||
|
|
run_id = f"run-{uuid.uuid4().hex[:10]}"
|
||
|
|
base_time = datetime.now(UTC)
|
||
|
|
|
||
|
|
repository.create(_build_document(doc_id))
|
||
|
|
try:
|
||
|
|
created = store.create_run(
|
||
|
|
DocumentProcessingRun(
|
||
|
|
run_id=run_id,
|
||
|
|
doc_id=doc_id,
|
||
|
|
trigger_type="upload",
|
||
|
|
run_status="running",
|
||
|
|
parser_backend="aliyun",
|
||
|
|
chunk_backend="aliyun",
|
||
|
|
embedding_model="text-embedding-v3",
|
||
|
|
started_at=base_time,
|
||
|
|
metadata={"origin": "test"},
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
stored = store.mark_run_stored(run_id, stored_at=base_time, metadata={"stored": True})
|
||
|
|
parsed = store.mark_run_parsed(
|
||
|
|
run_id,
|
||
|
|
parser_backend="fake_parser",
|
||
|
|
layout_count=2,
|
||
|
|
structure_node_count=3,
|
||
|
|
semantic_block_count=4,
|
||
|
|
vector_chunk_count=5,
|
||
|
|
parsed_at=base_time,
|
||
|
|
metadata={"parse_task_id": "task-1"},
|
||
|
|
)
|
||
|
|
indexed = store.mark_run_indexed(
|
||
|
|
run_id,
|
||
|
|
chunk_count=6,
|
||
|
|
index_name="regulations_dense_1024_v1",
|
||
|
|
indexed_at=base_time,
|
||
|
|
finished_at=base_time,
|
||
|
|
metadata={"collection": "regulations_dense_1024_v1"},
|
||
|
|
)
|
||
|
|
event = store.append_status_event(
|
||
|
|
DocumentStatusEvent(
|
||
|
|
event_id=f"evt-{uuid.uuid4().hex[:10]}",
|
||
|
|
doc_id=doc_id,
|
||
|
|
run_id=run_id,
|
||
|
|
from_status="parsed",
|
||
|
|
to_status="indexed",
|
||
|
|
stage="index",
|
||
|
|
message="Indexed successfully",
|
||
|
|
metadata={"chunk_count": 6},
|
||
|
|
occurred_at=base_time,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
artifacts = store.replace_artifacts_for_run(
|
||
|
|
run_id,
|
||
|
|
[
|
||
|
|
DocumentArtifact(
|
||
|
|
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
|
||
|
|
doc_id=doc_id,
|
||
|
|
run_id=run_id,
|
||
|
|
artifact_type="layouts",
|
||
|
|
object_name=f"artifacts/{doc_id}/layouts.json",
|
||
|
|
content_type="application/json",
|
||
|
|
created_at=base_time,
|
||
|
|
),
|
||
|
|
DocumentArtifact(
|
||
|
|
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
|
||
|
|
doc_id=doc_id,
|
||
|
|
run_id=run_id,
|
||
|
|
artifact_type="vector_chunks",
|
||
|
|
object_name=f"artifacts/{doc_id}/vector_chunks.json",
|
||
|
|
content_type="application/json",
|
||
|
|
created_at=base_time,
|
||
|
|
),
|
||
|
|
],
|
||
|
|
)
|
||
|
|
|
||
|
|
fetched = store.get_run(run_id)
|
||
|
|
run_rows = store.list_runs_by_document(doc_id)
|
||
|
|
event_rows = store.list_status_events_by_document(doc_id)
|
||
|
|
artifact_rows = store.list_artifacts_by_run(run_id)
|
||
|
|
|
||
|
|
assert created.run_id == run_id
|
||
|
|
assert stored is not None and stored.stored_at is not None
|
||
|
|
assert parsed is not None and parsed.parser_backend == "fake_parser"
|
||
|
|
assert indexed is not None and indexed.run_status == "succeeded"
|
||
|
|
assert fetched is not None and fetched.chunk_count == 6
|
||
|
|
assert isinstance(run_rows[0], DocumentProcessingRun)
|
||
|
|
assert isinstance(event_rows[0], DocumentStatusEvent)
|
||
|
|
assert isinstance(artifact_rows[0], DocumentArtifact)
|
||
|
|
assert event_rows[0].event_id == event.event_id
|
||
|
|
assert {artifact.artifact_type for artifact in artifacts} == {artifact.artifact_type for artifact in artifact_rows}
|
||
|
|
finally:
|
||
|
|
store.delete_by_document(doc_id)
|
||
|
|
repository.delete(doc_id)
|
||
|
|
|
||
|
|
|
||
|
|
def test_postgres_document_processing_store_replaces_artifacts_and_deletes_document_data():
|
||
|
|
"""Replace artifact rows idempotently and remove all history rows for one document."""
|
||
|
|
repository = PostgresDocumentRepository()
|
||
|
|
store = PostgresDocumentProcessingStore()
|
||
|
|
doc_id = f"proc-{uuid.uuid4().hex[:10]}"
|
||
|
|
run_id = f"run-{uuid.uuid4().hex[:10]}"
|
||
|
|
|
||
|
|
repository.create(_build_document(doc_id))
|
||
|
|
try:
|
||
|
|
store.create_run(
|
||
|
|
DocumentProcessingRun(
|
||
|
|
run_id=run_id,
|
||
|
|
doc_id=doc_id,
|
||
|
|
trigger_type="retry",
|
||
|
|
run_status="running",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
first = store.replace_artifacts_for_run(
|
||
|
|
run_id,
|
||
|
|
[
|
||
|
|
DocumentArtifact(
|
||
|
|
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
|
||
|
|
doc_id=doc_id,
|
||
|
|
run_id=run_id,
|
||
|
|
artifact_type="layouts",
|
||
|
|
object_name=f"artifacts/{doc_id}/layouts-v1.json",
|
||
|
|
content_type="application/json",
|
||
|
|
)
|
||
|
|
],
|
||
|
|
)
|
||
|
|
second = store.replace_artifacts_for_run(
|
||
|
|
run_id,
|
||
|
|
[
|
||
|
|
DocumentArtifact(
|
||
|
|
artifact_id=f"art-{uuid.uuid4().hex[:10]}",
|
||
|
|
doc_id=doc_id,
|
||
|
|
run_id=run_id,
|
||
|
|
artifact_type="layouts",
|
||
|
|
object_name=f"artifacts/{doc_id}/layouts-v2.json",
|
||
|
|
content_type="application/json",
|
||
|
|
)
|
||
|
|
],
|
||
|
|
)
|
||
|
|
store.append_status_event(
|
||
|
|
DocumentStatusEvent(
|
||
|
|
event_id=f"evt-{uuid.uuid4().hex[:10]}",
|
||
|
|
doc_id=doc_id,
|
||
|
|
run_id=run_id,
|
||
|
|
from_status="pending",
|
||
|
|
to_status="failed",
|
||
|
|
stage="parse",
|
||
|
|
message="failed",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
failed = store.mark_run_failed(run_id, failure_stage="parse", error_message="boom")
|
||
|
|
|
||
|
|
artifact_rows = store.list_artifacts_by_run(run_id)
|
||
|
|
assert len(first) == 1
|
||
|
|
assert len(second) == 1
|
||
|
|
assert len(artifact_rows) == 1
|
||
|
|
assert artifact_rows[0].object_name.endswith("layouts-v2.json")
|
||
|
|
assert failed is not None and failed.run_status == "failed"
|
||
|
|
|
||
|
|
store.delete_by_document(doc_id)
|
||
|
|
|
||
|
|
assert store.list_runs_by_document(doc_id) == []
|
||
|
|
assert store.list_status_events_by_document(doc_id) == []
|
||
|
|
assert store.list_artifacts_by_document(doc_id) == []
|
||
|
|
finally:
|
||
|
|
repository.delete(doc_id)
|