74 lines
2.6 KiB
Python
74 lines
2.6 KiB
Python
|
|
"""Celery tasks for document processing.
|
||
|
|
|
||
|
|
Each task is a thin wrapper that retrieves the already-stored document
|
||
|
|
binary and delegates to DocumentCommandService._process_document.
|
||
|
|
The task does not accept raw file bytes — it reads them from the binary
|
||
|
|
store using the doc_id, so the Celery message payload stays small.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from loguru import logger
|
||
|
|
|
||
|
|
from app.infrastructure.tasks.celery_app import celery_app
|
||
|
|
|
||
|
|
|
||
|
|
@celery_app.task(
|
||
|
|
name="app.infrastructure.tasks.document_tasks.process_document_task",
|
||
|
|
bind=True,
|
||
|
|
max_retries=3,
|
||
|
|
default_retry_delay=30,
|
||
|
|
acks_late=True,
|
||
|
|
)
|
||
|
|
def process_document_task(
|
||
|
|
self,
|
||
|
|
doc_id: str,
|
||
|
|
file_name: str,
|
||
|
|
doc_name: str,
|
||
|
|
regulation_type: str,
|
||
|
|
version: str,
|
||
|
|
generate_summary: bool,
|
||
|
|
run_id: str | None = None,
|
||
|
|
) -> dict:
|
||
|
|
"""Parse, embed, and index a document that has already been stored.
|
||
|
|
|
||
|
|
The task reads the file binary from MinIO using doc_id so the Celery
|
||
|
|
message stays small. Retries up to 3 times with a 30-second delay on
|
||
|
|
transient infrastructure errors.
|
||
|
|
"""
|
||
|
|
# Import inside the task function to avoid pickling issues and to ensure
|
||
|
|
# that each worker process initialises its own bootstrap singletons.
|
||
|
|
from app.shared.bootstrap import get_document_command_service, get_document_query_service
|
||
|
|
|
||
|
|
logger.info("process_document_task started: doc_id={}", doc_id)
|
||
|
|
try:
|
||
|
|
svc = get_document_command_service()
|
||
|
|
doc = get_document_query_service().get(doc_id)
|
||
|
|
if not doc:
|
||
|
|
raise ValueError(f"Document record not found: {doc_id}")
|
||
|
|
|
||
|
|
# Read the stored binary from MinIO — avoids passing raw bytes in the task message.
|
||
|
|
content = svc.binary_store.read(doc.object_name)
|
||
|
|
|
||
|
|
result = svc._process_document(
|
||
|
|
doc_id=doc_id,
|
||
|
|
file_name=file_name,
|
||
|
|
final_doc_name=doc_name,
|
||
|
|
content=content,
|
||
|
|
regulation_type=regulation_type,
|
||
|
|
version=version,
|
||
|
|
generate_summary=generate_summary,
|
||
|
|
run_id=run_id,
|
||
|
|
)
|
||
|
|
logger.info(
|
||
|
|
"process_document_task completed: doc_id={} status={} chunks={}",
|
||
|
|
doc_id, result.status, result.num_chunks,
|
||
|
|
)
|
||
|
|
return {"doc_id": result.doc_id, "status": result.status, "num_chunks": result.num_chunks}
|
||
|
|
|
||
|
|
except Exception as exc:
|
||
|
|
logger.exception("process_document_task failed: doc_id={}", doc_id)
|
||
|
|
# Retry on transient errors; permanent errors (bad file, parse failure)
|
||
|
|
# will exhaust retries and leave the document in FAILED state.
|
||
|
|
raise self.retry(exc=exc)
|