Files
AIRegulation-DocAnalysis/backend/app/services/llm/qwen_client.py

352 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Provide service-layer logic for qwen client."""
import time
import json
from typing import List, Dict, Optional, Generator, AsyncGenerator
from loguru import logger
import httpx
from .base_client import BaseLLMClient, LLMResponse, LLMConfig, LLMProvider
# Keep provider-specific behavior explicit so debugging stays straightforward.
class QwenClient(BaseLLMClient):
"""Represent the Qwen Client type."""
SUPPORTED_MODELS = [
"qwen-turbo",
"qwen-plus",
"qwen-max",
"qwen-max-longcontext",
"qwen-long",
"qwen3.5-flash",
"qwen3.5-plus",
"qwen3-plus",
"qwen2.5-72b-instruct",
"qwen2.5-32b-instruct",
"qwen2.5-14b-instruct",
"qwen2.5-7b-instruct"
]
def __init__(self, config: LLMConfig):
"""Initialize the Qwen Client instance."""
if config.provider not in [LLMProvider.QWEN, LLMProvider.QWEN_VL]:
raise ValueError(f"配置provider应为Qwen实际为{config.provider}")
super().__init__(config)
self._init_client()
def _init_client(self):
"""Handle init client for this module for the Qwen Client instance."""
# Keep provider-specific behavior explicit so debugging stays straightforward.
self._client = httpx.Client(
base_url=self.config.base_url,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
},
timeout=self.config.timeout
)
logger.info(f"Qwen客户端初始化完成: {self.config.base_url} - {self.config.model}")
def chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> LLMResponse:
"""Handle chat for the Qwen Client instance."""
start_time = time.time()
try:
# Keep provider-specific behavior explicit so debugging stays straightforward.
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": False
}
# Keep provider-specific behavior explicit so debugging stays straightforward.
response = self._client.post("/chat/completions", json=payload)
response.raise_for_status()
data = response.json()
latency_ms = int((time.time() - start_time) * 1000)
# Keep provider-specific behavior explicit so debugging stays straightforward.
choices = data.get("choices", [{}])
message = choices[0].get("message", {})
return LLMResponse(
content=message.get("content", ""),
model=data.get("model", self.config.model),
usage=data.get("usage", {}),
finish_reason=choices[0].get("finish_reason", "stop"),
latency_ms=latency_ms
)
except httpx.HTTPStatusError as e:
logger.error(f"Qwen API错误: {e.response.status_code} - {e.response.text}")
return LLMResponse(
content="",
model=self.config.model,
error=f"API错误: {e.response.status_code} - {e.response.text[:200]}"
)
except Exception as e:
logger.error(f"Qwen调用失败: {e}")
return LLMResponse(
content="",
model=self.config.model,
error=str(e)
)
def stream_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> Generator[str, None, None]:
"""Stream chat for the Qwen Client instance."""
try:
# Keep provider-specific behavior explicit so debugging stays straightforward.
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": True # Keep provider-specific behavior explicit so debugging stays straightforward.
}
# Keep provider-specific behavior explicit so debugging stays straightforward.
with self._client.stream("POST", "/chat/completions", json=payload) as response:
for line in response.iter_lines():
if line:
line = line.strip()
# Keep provider-specific behavior explicit so debugging stays straightforward.
if line.startswith("data: "):
data_str = line[6:] # Keep provider-specific behavior explicit so debugging stays straightforward.
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if not choices:
continue # Keep provider-specific behavior explicit so debugging stays straightforward.
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except httpx.HTTPStatusError as e:
logger.error(f"Qwen流式API错误: {e.response.status_code}")
yield f"[ERROR: API返回错误 {e.response.status_code}]"
except Exception as e:
logger.error(f"Qwen流式调用失败: {e}")
yield f"[ERROR: {str(e)}]"
async def async_stream_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> AsyncGenerator[str, None]:
"""Handle async stream chat for the Qwen Client instance."""
import asyncio
# Keep provider-specific behavior explicit so debugging stays straightforward.
for chunk in self.stream_chat(messages, max_tokens, temperature, **kwargs):
yield chunk
# Keep provider-specific behavior explicit so debugging stays straightforward.
await asyncio.sleep(0)
def get_available_models(self) -> List[str]:
"""Return available models for the Qwen Client instance."""
return self.SUPPORTED_MODELS
def close(self):
"""Release the resources held by this component."""
if self._client:
self._client.close()
class QwenVLClient(BaseLLMClient):
"""Represent the Qwen V L Client type."""
SUPPORTED_MODELS = [
"qwen-vl-plus",
"qwen-vl-max",
"qwen3-vl-plus",
"qwen2-vl-7b-instruct",
"qwen2-vl-72b-instruct"
]
def __init__(self, config: LLMConfig):
"""Initialize the Qwen V L Client instance."""
if config.provider != LLMProvider.QWEN_VL:
raise ValueError(f"配置provider应为QWEN_VL实际为{config.provider}")
super().__init__(config)
self._init_client()
def _init_client(self):
"""Handle init client for this module for the Qwen V L Client instance."""
self._client = httpx.Client(
base_url=self.config.base_url,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
},
timeout=self.config.timeout
)
logger.info(f"QwenVL客户端初始化完成: {self.config.base_url} - {self.config.model}")
def chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> LLMResponse:
"""Handle chat for the Qwen V L Client instance."""
start_time = time.time()
try:
# Keep provider-specific behavior explicit so debugging stays straightforward.
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": False
}
response = self._client.post("/chat/completions", json=payload)
response.raise_for_status()
data = response.json()
latency_ms = int((time.time() - start_time) * 1000)
choices = data.get("choices", [{}])
message = choices[0].get("message", {})
return LLMResponse(
content=message.get("content", ""),
model=data.get("model", self.config.model),
usage=data.get("usage", {}),
finish_reason=choices[0].get("finish_reason", "stop"),
latency_ms=latency_ms
)
except httpx.HTTPStatusError as e:
logger.error(f"QwenVL API错误: {e.response.status_code} - {e.response.text}")
return LLMResponse(
content="",
model=self.config.model,
error=f"API错误: {e.response.status_code} - {e.response.text[:200]}"
)
except Exception as e:
logger.error(f"QwenVL调用失败: {e}")
return LLMResponse(
content="",
model=self.config.model,
error=str(e)
)
def stream_chat(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
**kwargs
) -> Generator[str, None, None]:
"""Stream chat for the Qwen V L Client instance."""
try:
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": max_tokens or self.config.max_tokens,
"temperature": temperature or self.config.temperature,
"top_p": kwargs.get("top_p", self.config.top_p),
"stream": True
}
with self._client.stream("POST", "/chat/completions", json=payload) as response:
for line in response.iter_lines():
if line:
line = line.strip()
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if not choices:
continue # Keep provider-specific behavior explicit so debugging stays straightforward.
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"QwenVL流式调用失败: {e}")
yield f"[ERROR: {str(e)}]"
def get_available_models(self) -> List[str]:
"""Return available models for the Qwen V L Client instance."""
return self.SUPPORTED_MODELS
def close(self):
"""Release the resources held by this component."""
if self._client:
self._client.close()
def create_qwen_client(
api_key: str,
model: str = "qwen3.5-flash",
base_url: str = "http://6.86.80.4:30080/v1",
**kwargs
) -> QwenClient:
"""Create qwen client."""
config = LLMConfig(
provider=LLMProvider.QWEN,
model=model,
api_key=api_key,
base_url=base_url,
**kwargs
)
return QwenClient(config)
def create_qwen_vl_client(
api_key: str,
model: str = "qwen3-vl-plus",
base_url: str = "http://6.86.80.4:30080/v1",
**kwargs
) -> QwenVLClient:
"""Create qwen vl client."""
config = LLMConfig(
provider=LLMProvider.QWEN_VL,
model=model,
api_key=api_key,
base_url=base_url,
**kwargs
)
return QwenVLClient(config)