Files
AIRegulation-DocAnalysis/tests/test_celery_tasks.py

46 lines
1.8 KiB
Python
Raw Normal View History

"""Tests for Celery task infrastructure.
Verifies Celery app configuration and task registration without
starting a real worker or connecting to Redis.
"""
def test_celery_app_uses_redis_broker():
"""Celery broker URL must be a Redis URL built from settings."""
from app.infrastructure.tasks.celery_app import celery_app
assert celery_app.conf.broker_url.startswith("redis://")
def test_celery_app_uses_redis_backend():
"""Celery result backend must be Redis."""
from app.infrastructure.tasks.celery_app import celery_app
assert celery_app.conf.result_backend.startswith("redis://")
def test_celery_app_has_json_serializer():
"""Task serializer must be JSON for portability."""
from app.infrastructure.tasks.celery_app import celery_app
assert celery_app.conf.task_serializer == "json"
def test_process_document_task_is_registered():
"""process_document_task must be discoverable in the Celery task registry."""
import app.infrastructure.tasks.document_tasks # noqa: F401 — triggers task registration
from app.infrastructure.tasks.celery_app import celery_app
registered = list(celery_app.tasks.keys())
assert any("process_document_task" in name for name in registered), (
f"process_document_task not found in {registered}"
)
def test_document_command_service_has_process_document():
"""DocumentCommandService must expose _process_document method."""
from app.application.documents.services import DocumentCommandService
assert hasattr(DocumentCommandService, "_process_document")
def test_document_command_service_has_store_document():
"""DocumentCommandService must expose store_document method."""
from app.application.documents.services import DocumentCommandService
assert hasattr(DocumentCommandService, "store_document")