- Updated LocalDocumentParser to include raw_layouts and artifact_prefix from settings. - Added new documents with failure reasons and metadata to documents.json for better error tracking. - Created a new documentation file detailing the Aliyun ingest implementation process. - Updated RFC to reflect changes in the parsing backend and embedding dimensions. - Modified tests to accommodate the new embedding dimension of 1024 and updated parser and chunk builder assertions. - Verified migration configurations to ensure correct settings for embedding model and backend.
143 lines
6.1 KiB
Python
143 lines
6.1 KiB
Python
"""Aliyun Docmind gateway helpers for the document ingest pipeline."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from alibabacloud_docmind_api20220711 import models as docmind_models
|
|
from alibabacloud_docmind_api20220711.client import Client as DocmindClient
|
|
from alibabacloud_tea_openapi import models as open_api_models
|
|
from alibabacloud_tea_util import models as util_models
|
|
|
|
from app.config.settings import settings
|
|
|
|
# Keep provider-specific behavior isolated so the rest of the backend can stay stable.
|
|
|
|
|
|
@dataclass
|
|
class AliyunParsePayload:
|
|
"""Represent the raw Aliyun parse payload returned by the gateway."""
|
|
|
|
task_id: str
|
|
layouts: list[dict[str, Any]]
|
|
poll_attempts: int
|
|
duration_ms: int
|
|
|
|
|
|
class AliyunDocmindGateway:
|
|
"""Submit, poll, and collect results from the Aliyun Docmind API."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize the gateway with runtime configuration."""
|
|
self.endpoint = settings.alibaba_endpoint
|
|
self.poll_interval_seconds = settings.aliyun_parse_poll_interval_seconds
|
|
self.timeout_seconds = settings.aliyun_parse_timeout_seconds
|
|
self.layout_step_size = settings.aliyun_parse_layout_step_size
|
|
self.llm_enhancement = settings.aliyun_llm_enhancement
|
|
self.enhancement_mode = settings.aliyun_enhancement_mode
|
|
|
|
def parse_document(self, *, file_path: str) -> AliyunParsePayload:
|
|
"""Parse a single document and return the collected layouts."""
|
|
client = self._create_client()
|
|
started_at = time.monotonic()
|
|
task_id = self._submit_job(client=client, file_path=file_path)
|
|
poll_attempts = self._wait_for_completion(client=client, task_id=task_id, started_at=started_at)
|
|
layouts = self._collect_all_results(client=client, task_id=task_id)
|
|
duration_ms = int((time.monotonic() - started_at) * 1000)
|
|
return AliyunParsePayload(
|
|
task_id=task_id,
|
|
layouts=layouts,
|
|
poll_attempts=poll_attempts,
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
def _create_client(self) -> DocmindClient:
|
|
"""Create a Docmind client using explicit AccessKey settings only."""
|
|
config = open_api_models.Config()
|
|
config.endpoint = self.endpoint
|
|
|
|
if not settings.alibaba_access_key_id or not settings.alibaba_access_key_secret:
|
|
raise ValueError(
|
|
"Missing Aliyun parser credentials. Set ALIBABA_ACCESS_KEY_ID and "
|
|
"ALIBABA_ACCESS_KEY_SECRET in the project root .env."
|
|
)
|
|
|
|
# Keep production behavior deterministic by using only project-configured credentials.
|
|
config.access_key_id = settings.alibaba_access_key_id
|
|
config.access_key_secret = settings.alibaba_access_key_secret
|
|
return DocmindClient(config)
|
|
|
|
def _submit_job(self, *, client: DocmindClient, file_path: str) -> str:
|
|
"""Submit an asynchronous Docmind parse job."""
|
|
path = Path(file_path)
|
|
with open(file_path, "rb") as file_stream:
|
|
request = docmind_models.SubmitDocParserJobAdvanceRequest(
|
|
file_url_object=file_stream,
|
|
file_name=path.name,
|
|
file_name_extension=path.suffix.lstrip("."),
|
|
llm_enhancement=self.llm_enhancement,
|
|
enhancement_mode=self.enhancement_mode,
|
|
)
|
|
runtime = util_models.RuntimeOptions()
|
|
response = client.submit_doc_parser_job_advance(request, runtime)
|
|
task_id = response.body.data.id if response.body and response.body.data else ""
|
|
if not task_id:
|
|
raise RuntimeError("Aliyun Docmind did not return a parse task id.")
|
|
return task_id
|
|
|
|
def _query_status(self, *, client: DocmindClient, task_id: str) -> dict[str, Any] | None:
|
|
"""Query the current Docmind parse status."""
|
|
request = docmind_models.QueryDocParserStatusRequest(id=task_id)
|
|
response = client.query_doc_parser_status(request)
|
|
return response.body.data.to_map() if response.body and response.body.data else None
|
|
|
|
def _wait_for_completion(self, *, client: DocmindClient, task_id: str, started_at: float) -> int:
|
|
"""Poll until the parse job finishes or times out."""
|
|
poll_attempts = 0
|
|
while True:
|
|
poll_attempts += 1
|
|
status_payload = self._query_status(client=client, task_id=task_id)
|
|
if not status_payload:
|
|
raise RuntimeError(f"Aliyun parse status payload is empty for task {task_id}.")
|
|
|
|
status = str(status_payload.get("Status", "")).lower()
|
|
if status == "success":
|
|
return poll_attempts
|
|
if status == "failed":
|
|
raise RuntimeError(f"Aliyun parse task failed: {status_payload}")
|
|
|
|
elapsed = time.monotonic() - started_at
|
|
if elapsed > self.timeout_seconds:
|
|
raise TimeoutError(
|
|
f"Aliyun parse task timed out after {self.timeout_seconds}s: task_id={task_id}"
|
|
)
|
|
time.sleep(self.poll_interval_seconds)
|
|
|
|
def _collect_all_results(self, *, client: DocmindClient, task_id: str) -> list[dict[str, Any]]:
|
|
"""Collect all paginated layout results from a completed parse task."""
|
|
all_layouts: list[dict[str, Any]] = []
|
|
layout_num = 0
|
|
while True:
|
|
request = docmind_models.GetDocParserResultRequest(
|
|
id=task_id,
|
|
layout_step_size=self.layout_step_size,
|
|
layout_num=layout_num,
|
|
)
|
|
response = client.get_doc_parser_result(request)
|
|
payload = response.body.data if response.body else None
|
|
if not payload:
|
|
break
|
|
layouts = payload.get("layouts", [])
|
|
if not layouts:
|
|
break
|
|
all_layouts.extend(layouts)
|
|
layout_num += len(layouts)
|
|
if len(layouts) < self.layout_step_size:
|
|
break
|
|
if not all_layouts:
|
|
raise RuntimeError(f"Aliyun parse task returned no layouts: task_id={task_id}")
|
|
return all_layouts
|