"""Aliyun Docmind gateway helpers for the document ingest pipeline.""" from __future__ import annotations import time from dataclasses import dataclass from pathlib import Path from typing import Any from alibabacloud_docmind_api20220711 import models as docmind_models from alibabacloud_docmind_api20220711.client import Client as DocmindClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models from app.config.settings import settings # Keep provider-specific behavior isolated so the rest of the backend can stay stable. @dataclass class AliyunParsePayload: """Represent the raw Aliyun parse payload returned by the gateway.""" task_id: str layouts: list[dict[str, Any]] poll_attempts: int duration_ms: int class AliyunDocmindGateway: """Submit, poll, and collect results from the Aliyun Docmind API.""" def __init__(self) -> None: """Initialize the gateway with runtime configuration.""" self.endpoint = settings.alibaba_endpoint self.poll_interval_seconds = settings.aliyun_parse_poll_interval_seconds self.timeout_seconds = settings.aliyun_parse_timeout_seconds self.layout_step_size = settings.aliyun_parse_layout_step_size self.llm_enhancement = settings.aliyun_llm_enhancement self.enhancement_mode = settings.aliyun_enhancement_mode def parse_document(self, *, file_path: str) -> AliyunParsePayload: """Parse a single document and return the collected layouts.""" client = self._create_client() started_at = time.monotonic() task_id = self._submit_job(client=client, file_path=file_path) poll_attempts = self._wait_for_completion(client=client, task_id=task_id, started_at=started_at) layouts = self._collect_all_results(client=client, task_id=task_id) duration_ms = int((time.monotonic() - started_at) * 1000) return AliyunParsePayload( task_id=task_id, layouts=layouts, poll_attempts=poll_attempts, duration_ms=duration_ms, ) def _create_client(self) -> DocmindClient: """Create a Docmind client using explicit AccessKey settings only.""" config = open_api_models.Config() config.endpoint = self.endpoint if not settings.alibaba_access_key_id or not settings.alibaba_access_key_secret: raise ValueError( "Missing Aliyun parser credentials. Set ALIBABA_ACCESS_KEY_ID and " "ALIBABA_ACCESS_KEY_SECRET in the project root .env." ) # Keep production behavior deterministic by using only project-configured credentials. config.access_key_id = settings.alibaba_access_key_id config.access_key_secret = settings.alibaba_access_key_secret return DocmindClient(config) def _submit_job(self, *, client: DocmindClient, file_path: str) -> str: """Submit an asynchronous Docmind parse job.""" path = Path(file_path) with open(file_path, "rb") as file_stream: request = docmind_models.SubmitDocParserJobAdvanceRequest( file_url_object=file_stream, file_name=path.name, file_name_extension=path.suffix.lstrip("."), llm_enhancement=self.llm_enhancement, enhancement_mode=self.enhancement_mode, ) runtime = util_models.RuntimeOptions() response = client.submit_doc_parser_job_advance(request, runtime) task_id = response.body.data.id if response.body and response.body.data else "" if not task_id: raise RuntimeError("Aliyun Docmind did not return a parse task id.") return task_id def _query_status(self, *, client: DocmindClient, task_id: str) -> dict[str, Any] | None: """Query the current Docmind parse status.""" request = docmind_models.QueryDocParserStatusRequest(id=task_id) response = client.query_doc_parser_status(request) return response.body.data.to_map() if response.body and response.body.data else None def _wait_for_completion(self, *, client: DocmindClient, task_id: str, started_at: float) -> int: """Poll until the parse job finishes or times out.""" poll_attempts = 0 while True: poll_attempts += 1 status_payload = self._query_status(client=client, task_id=task_id) if not status_payload: raise RuntimeError(f"Aliyun parse status payload is empty for task {task_id}.") status = str(status_payload.get("Status", "")).lower() if status == "success": return poll_attempts if status == "failed": raise RuntimeError(f"Aliyun parse task failed: {status_payload}") elapsed = time.monotonic() - started_at if elapsed > self.timeout_seconds: raise TimeoutError( f"Aliyun parse task timed out after {self.timeout_seconds}s: task_id={task_id}" ) time.sleep(self.poll_interval_seconds) def _collect_all_results(self, *, client: DocmindClient, task_id: str) -> list[dict[str, Any]]: """Collect all paginated layout results from a completed parse task.""" all_layouts: list[dict[str, Any]] = [] layout_num = 0 while True: request = docmind_models.GetDocParserResultRequest( id=task_id, layout_step_size=self.layout_step_size, layout_num=layout_num, ) response = client.get_doc_parser_result(request) payload = response.body.data if response.body else None if not payload: break layouts = payload.get("layouts", []) if not layouts: break all_layouts.extend(layouts) layout_num += len(layouts) if len(layouts) < self.layout_step_size: break if not all_layouts: raise RuntimeError(f"Aliyun parse task returned no layouts: task_id={task_id}") return all_layouts