"""Implement infrastructure support for openai compatible embedding provider.""" from __future__ import annotations import os import httpx from app.config.settings import settings from app.domain.retrieval import EmbeddingProvider # Keep adapter behavior explicit so integration details remain easy to audit. class OpenAICompatibleEmbeddingProvider(EmbeddingProvider): """Provide the Open A I Compatible Embedding Provider provider.""" def __init__(self) -> None: """Initialize the Open A I Compatible Embedding Provider instance.""" self.base_url = settings.embedding_base_url.rstrip("/") self.api_key = ( settings.embedding_api_key or os.getenv("OPENAI_API_KEY", "") or os.getenv("QWEN_API_KEY", "") or os.getenv("DEEPSEEK_API_KEY", "") ) self.model = settings.embedding_model self.timeout = settings.embedding_timeout_seconds self.dimension = settings.embedding_dim def _request(self, texts: list[str]) -> list[list[float]]: """Handle request for this module for the Open A I Compatible Embedding Provider instance.""" if not self.api_key: raise ValueError("缺少 EMBEDDING_API_KEY / OPENAI_API_KEY") response = httpx.post( f"{self.base_url}/embeddings", headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", }, json={"model": self.model, "input": texts}, timeout=self.timeout, ) response.raise_for_status() data = response.json() vectors = [item["embedding"] for item in sorted(data.get("data", []), key=lambda item: item["index"])] if any(len(vector) != self.dimension for vector in vectors): raise ValueError(f"embedding 维度不匹配,期望 {self.dimension}") return vectors def embed_texts(self, texts: list[str]) -> list[list[float]]: """Embed texts for the Open A I Compatible Embedding Provider instance.""" if not texts: return [] return self._request(texts) def embed_query(self, text: str) -> list[float]: """Embed query for the Open A I Compatible Embedding Provider instance.""" vectors = self._request([text]) return vectors[0]