"""Test JSON-backed document processing history storage.""" from __future__ import annotations import json from datetime import UTC, datetime from pathlib import Path from app.domain.documents import DocumentArtifact, DocumentProcessingRun, DocumentStatusEvent from app.infrastructure.storage.json_document_processing_store import JsonDocumentProcessingStore # Keep JSON processing-store tests focused on local file persistence behavior. def test_json_document_processing_store_initializes_missing_file(tmp_path: Path): """Create the backing file with the canonical empty payload on first use.""" file_path = tmp_path / "document_processing.json" store = JsonDocumentProcessingStore(str(file_path)) payload = json.loads(file_path.read_text(encoding="utf-8")) assert payload == {"runs": {}, "status_events": {}, "artifacts": {}} assert store.list_runs_by_document("missing") == [] def test_json_document_processing_store_supports_full_run_lifecycle(tmp_path: Path): """Persist runs, events, and artifacts and read them back as dataclasses.""" file_path = tmp_path / "document_processing.json" store = JsonDocumentProcessingStore(str(file_path)) doc_id = "doc-json" run_id = "run-json" event_id = "evt-json" base_time = datetime.now(UTC) created = store.create_run( DocumentProcessingRun( run_id=run_id, doc_id=doc_id, trigger_type="upload", run_status="running", parser_backend="aliyun", chunk_backend="aliyun", embedding_model="text-embedding-v3", started_at=base_time, metadata={"origin": "json-test"}, ) ) stored = store.mark_run_stored(run_id, stored_at=base_time, metadata={"stored": True}) parsed = store.mark_run_parsed( run_id, parser_backend="fake_parser", layout_count=1, structure_node_count=2, semantic_block_count=3, vector_chunk_count=4, parsed_at=base_time, metadata={"parse_task_id": "task-json"}, ) indexed = store.mark_run_indexed( run_id, chunk_count=5, index_name="regulations_dense_1024_v1", indexed_at=base_time, finished_at=base_time, metadata={"collection": "regulations_dense_1024_v1"}, ) event = store.append_status_event( DocumentStatusEvent( event_id=event_id, doc_id=doc_id, run_id=run_id, from_status="parsed", to_status="indexed", stage="index", message="Indexed", metadata={"chunk_count": 5}, occurred_at=base_time, ) ) artifacts = store.replace_artifacts_for_run( run_id, [ DocumentArtifact( artifact_id="art-layouts", doc_id=doc_id, run_id=run_id, artifact_type="layouts", object_name="artifacts/doc-json/layouts.json", content_type="application/json", created_at=base_time, ), DocumentArtifact( artifact_id="art-vectors", doc_id=doc_id, run_id=run_id, artifact_type="vector_chunks", object_name="artifacts/doc-json/vector_chunks.json", content_type="application/json", created_at=base_time, ), ], ) fetched = store.get_run(run_id) run_rows = store.list_runs_by_document(doc_id) event_rows = store.list_status_events_by_run(run_id) artifact_rows = store.list_artifacts_by_document(doc_id) assert created.run_id == run_id assert stored is not None and stored.metadata["stored"] is True assert parsed is not None and parsed.structure_node_count == 2 assert indexed is not None and indexed.run_status == "succeeded" assert fetched is not None and fetched.chunk_count == 5 assert run_rows[0].started_at == base_time assert event_rows[0].event_id == event.event_id assert artifact_rows[0].doc_id == doc_id assert {artifact.artifact_type for artifact in artifacts} == {artifact.artifact_type for artifact in artifact_rows} def test_json_document_processing_store_replaces_artifacts_and_deletes_by_document(tmp_path: Path): """Replace one run's artifacts idempotently and remove all history for a document.""" file_path = tmp_path / "document_processing.json" store = JsonDocumentProcessingStore(str(file_path)) doc_id = "doc-delete" run_id = "run-delete" store.create_run( DocumentProcessingRun( run_id=run_id, doc_id=doc_id, trigger_type="retry", run_status="running", ) ) store.append_status_event( DocumentStatusEvent( event_id="evt-delete", doc_id=doc_id, run_id=run_id, from_status="pending", to_status="stored", stage="store", occurred_at=datetime.now(UTC), ) ) first = store.replace_artifacts_for_run( run_id, [ DocumentArtifact( artifact_id="art-first", doc_id=doc_id, run_id=run_id, artifact_type="layouts", object_name="artifacts/doc-delete/layouts-v1.json", content_type="application/json", ) ], ) second = store.replace_artifacts_for_run( run_id, [ DocumentArtifact( artifact_id="art-second", doc_id=doc_id, run_id=run_id, artifact_type="layouts", object_name="artifacts/doc-delete/layouts-v2.json", content_type="application/json", ) ], ) failed = store.mark_run_failed(run_id, failure_stage="parse", error_message="boom") artifact_rows = store.list_artifacts_by_run(run_id) assert len(first) == 1 assert len(second) == 1 assert len(artifact_rows) == 1 assert artifact_rows[0].object_name.endswith("layouts-v2.json") assert failed is not None and failed.run_status == "failed" store.delete_by_document(doc_id) assert store.list_runs_by_document(doc_id) == [] assert store.list_status_events_by_document(doc_id) == [] assert store.list_artifacts_by_document(doc_id) == []