Files
AIRegulation-DocAnalysis/backend/app/services/llm/qwen_client.py

352 lines
13 KiB
Python
Raw Permalink Normal View History

"""Provide service-layer logic for qwen client."""
2026-05-14 15:07:34 +08:00
import time
import json
from typing import List, Dict, Optional, Generator, AsyncGenerator
from loguru import logger
import httpx
from .base_client import BaseLLMClient, LLMResponse, LLMConfig, LLMProvider
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
class QwenClient(BaseLLMClient):
"""Represent the Qwen Client type."""
2026-05-14 15:07:34 +08:00
SUPPORTED_MODELS = [
"qwen-turbo",
"qwen-plus",
"qwen-max",
"qwen-max-longcontext",
"qwen-long",
"qwen3.5-flash",
"qwen3.5-plus",
"qwen3-plus",
"qwen2.5-72b-instruct",
"qwen2.5-32b-instruct",
"qwen2.5-14b-instruct",
"qwen2.5-7b-instruct"
]
def __init__(self, config: LLMConfig):
"""Initialize the Qwen Client instance."""
2026-05-14 15:07:34 +08:00
if config.provider not in [LLMProvider.QWEN, LLMProvider.QWEN_VL]:
raise ValueError(f"配置provider应为Qwen实际为{config.provider}")
super().__init__(config)
self._init_client()
def _init_client(self):
"""Handle init client for this module for the Qwen Client instance."""
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
self._client = httpx.Client(
base_url=self.config.base_url,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
},
timeout=self.config.timeout
)
logger.info(f"Qwen客户端初始化完成: {self.config.base_url} - {self.config.model}")
def chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> LLMResponse:
"""Handle chat for the Qwen Client instance."""
2026-05-14 15:07:34 +08:00
start_time = time.time()
try:
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": False
}
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
response = self._client.post("/chat/completions", json=payload)
response.raise_for_status()
data = response.json()
latency_ms = int((time.time() - start_time) * 1000)
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
choices = data.get("choices", [{}])
message = choices[0].get("message", {})
return LLMResponse(
content=message.get("content", ""),
model=data.get("model", self.config.model),
usage=data.get("usage", {}),
finish_reason=choices[0].get("finish_reason", "stop"),
latency_ms=latency_ms
)
except httpx.HTTPStatusError as e:
logger.error(f"Qwen API错误: {e.response.status_code} - {e.response.text}")
return LLMResponse(
content="",
model=self.config.model,
error=f"API错误: {e.response.status_code} - {e.response.text[:200]}"
)
except Exception as e:
logger.error(f"Qwen调用失败: {e}")
return LLMResponse(
content="",
model=self.config.model,
error=str(e)
)
def stream_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> Generator[str, None, None]:
"""Stream chat for the Qwen Client instance."""
2026-05-14 15:07:34 +08:00
try:
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": True # Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
}
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
with self._client.stream("POST", "/chat/completions", json=payload) as response:
for line in response.iter_lines():
if line:
line = line.strip()
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
if line.startswith("data: "):
data_str = line[6:] # Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if not choices:
continue # Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except httpx.HTTPStatusError as e:
logger.error(f"Qwen流式API错误: {e.response.status_code}")
yield f"[ERROR: API返回错误 {e.response.status_code}]"
except Exception as e:
logger.error(f"Qwen流式调用失败: {e}")
yield f"[ERROR: {str(e)}]"
async def async_stream_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> AsyncGenerator[str, None]:
"""Handle async stream chat for the Qwen Client instance."""
2026-05-14 15:07:34 +08:00
import asyncio
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
for chunk in self.stream_chat(messages, max_tokens, temperature, **kwargs):
yield chunk
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
await asyncio.sleep(0)
def get_available_models(self) -> List[str]:
"""Return available models for the Qwen Client instance."""
2026-05-14 15:07:34 +08:00
return self.SUPPORTED_MODELS
def close(self):
"""Release the resources held by this component."""
2026-05-14 15:07:34 +08:00
if self._client:
self._client.close()
class QwenVLClient(BaseLLMClient):
"""Represent the Qwen V L Client type."""
2026-05-14 15:07:34 +08:00
SUPPORTED_MODELS = [
"qwen-vl-plus",
"qwen-vl-max",
"qwen3-vl-plus",
"qwen2-vl-7b-instruct",
"qwen2-vl-72b-instruct"
]
def __init__(self, config: LLMConfig):
"""Initialize the Qwen V L Client instance."""
2026-05-14 15:07:34 +08:00
if config.provider != LLMProvider.QWEN_VL:
raise ValueError(f"配置provider应为QWEN_VL实际为{config.provider}")
super().__init__(config)
self._init_client()
def _init_client(self):
"""Handle init client for this module for the Qwen V L Client instance."""
2026-05-14 15:07:34 +08:00
self._client = httpx.Client(
base_url=self.config.base_url,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
},
timeout=self.config.timeout
)
logger.info(f"QwenVL客户端初始化完成: {self.config.base_url} - {self.config.model}")
def chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> LLMResponse:
"""Handle chat for the Qwen V L Client instance."""
2026-05-14 15:07:34 +08:00
start_time = time.time()
try:
# Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": False
}
response = self._client.post("/chat/completions", json=payload)
response.raise_for_status()
data = response.json()
latency_ms = int((time.time() - start_time) * 1000)
choices = data.get("choices", [{}])
message = choices[0].get("message", {})
return LLMResponse(
content=message.get("content", ""),
model=data.get("model", self.config.model),
usage=data.get("usage", {}),
finish_reason=choices[0].get("finish_reason", "stop"),
latency_ms=latency_ms
)
except httpx.HTTPStatusError as e:
logger.error(f"QwenVL API错误: {e.response.status_code} - {e.response.text}")
return LLMResponse(
content="",
model=self.config.model,
error=f"API错误: {e.response.status_code} - {e.response.text[:200]}"
)
except Exception as e:
logger.error(f"QwenVL调用失败: {e}")
return LLMResponse(
content="",
model=self.config.model,
error=str(e)
)
def stream_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> Generator[str, None, None]:
"""Stream chat for the Qwen V L Client instance."""
2026-05-14 15:07:34 +08:00
try:
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": True
}
with self._client.stream("POST", "/chat/completions", json=payload) as response:
for line in response.iter_lines():
if line:
line = line.strip()
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if not choices:
continue # Keep provider-specific behavior explicit so debugging stays straightforward.
2026-05-14 15:07:34 +08:00
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"QwenVL流式调用失败: {e}")
yield f"[ERROR: {str(e)}]"
def get_available_models(self) -> List[str]:
"""Return available models for the Qwen V L Client instance."""
2026-05-14 15:07:34 +08:00
return self.SUPPORTED_MODELS
def close(self):
"""Release the resources held by this component."""
2026-05-14 15:07:34 +08:00
if self._client:
self._client.close()
def create_qwen_client(
api_key: str,
model: str = "qwen3.5-flash",
base_url: str = "http://6.86.80.4:30080/v1",
**kwargs
) -> QwenClient:
"""Create qwen client."""
2026-05-14 15:07:34 +08:00
config = LLMConfig(
provider=LLMProvider.QWEN,
model=model,
api_key=api_key,
base_url=base_url,
**kwargs
)
return QwenClient(config)
def create_qwen_vl_client(
api_key: str,
model: str = "qwen3-vl-plus",
base_url: str = "http://6.86.80.4:30080/v1",
**kwargs
) -> QwenVLClient:
"""Create qwen vl client."""
2026-05-14 15:07:34 +08:00
config = LLMConfig(
provider=LLMProvider.QWEN_VL,
model=model,
api_key=api_key,
base_url=base_url,
**kwargs
)
return QwenVLClient(config)