46 lines
1.2 KiB
Python
46 lines
1.2 KiB
Python
|
|
"""Shared Celery application instance for background task processing.
|
||
|
|
|
||
|
|
All workers and enqueueing call sites import `celery_app` from this module
|
||
|
|
so the broker/backend configuration stays in one place.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from celery import Celery
|
||
|
|
|
||
|
|
from app.config.settings import settings
|
||
|
|
|
||
|
|
|
||
|
|
def _redis_url() -> str:
|
||
|
|
"""Return a Redis connection URL from application settings."""
|
||
|
|
if settings.redis_password:
|
||
|
|
return (
|
||
|
|
f"redis://:{settings.redis_password}@"
|
||
|
|
f"{settings.redis_host}:{settings.redis_port}/{settings.redis_db}"
|
||
|
|
)
|
||
|
|
return f"redis://{settings.redis_host}:{settings.redis_port}/{settings.redis_db}"
|
||
|
|
|
||
|
|
|
||
|
|
_BROKER = _redis_url()
|
||
|
|
_BACKEND = _redis_url()
|
||
|
|
|
||
|
|
celery_app = Celery(
|
||
|
|
"compliance_hub",
|
||
|
|
broker=_BROKER,
|
||
|
|
backend=_BACKEND,
|
||
|
|
include=["app.infrastructure.tasks.document_tasks"],
|
||
|
|
)
|
||
|
|
|
||
|
|
celery_app.conf.update(
|
||
|
|
task_serializer="json",
|
||
|
|
result_serializer="json",
|
||
|
|
accept_content=["json"],
|
||
|
|
timezone="UTC",
|
||
|
|
enable_utc=True,
|
||
|
|
# Acknowledge task only after successful execution to avoid data loss.
|
||
|
|
task_acks_late=True,
|
||
|
|
task_reject_on_worker_lost=True,
|
||
|
|
# Keep results for 1 hour for status polling.
|
||
|
|
result_expires=3600,
|
||
|
|
)
|