Files
2026-06-08 11:16:28 +08:00

144 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Perception application service — event listing and streaming impact analysis."""
from __future__ import annotations
import json
from typing import Generator
from app.application.knowledge.services import KnowledgeRetrievalService
from app.infrastructure.perception.base_event_store import BaseEventStore
from app.services.llm.llm_factory import get_llm_client
from app.config.settings import settings
_ANALYSIS_SYSTEM_PROMPT = (
"你是汽车行业法规合规专家专注于中国国家标准GB、国际法规UN-ECE、ISO"
"及欧盟法规EUR-Lex的解读与合规建议。回答需专业、简洁、结构清晰。"
)
class PerceptionService:
"""Orchestrate regulatory event queries and streaming impact analysis."""
def __init__(
self,
event_store: BaseEventStore,
retrieval_service: KnowledgeRetrievalService,
) -> None:
self._store = event_store
self._retrieval = retrieval_service
# ------------------------------------------------------------------
# Queries
# ------------------------------------------------------------------
def list_events(
self,
*,
source: str | None = None,
impact_level: str | None = None,
limit: int = 50,
) -> list[dict]:
return self._store.filter(source=source, impact_level=impact_level, limit=limit)
def get_event(self, event_id: str) -> dict | None:
return self._store.get(event_id)
def get_stats(self) -> dict:
return self._store.stats()
# ------------------------------------------------------------------
# Streaming analysis
# ------------------------------------------------------------------
def analyze_event(self, event_id: str) -> Generator[dict, None, None]:
"""Yield SSE-ready dicts: sources → content chunks → done."""
event = self._store.get(event_id)
if not event:
yield {"event": "error", "data": f"事件 {event_id} 不存在"}
return
# --- 1. RAG retrieval: find related library documents ---
query = event["title"] + " " + " ".join(event["tags"])
chunks: list = []
affected_docs: list[dict] = []
try:
chunks = self._retrieval.retrieve(query=query, top_k=5)
seen: set[str] = set()
for chunk in chunks:
if chunk.doc_id not in seen:
seen.add(chunk.doc_id)
affected_docs.append(
{
"doc_id": chunk.doc_id,
"doc_title": chunk.doc_title,
"score": round(float(chunk.score), 4),
"snippet": (chunk.text or "")[:180],
"clause": getattr(chunk, "section_title", "") or "",
}
)
except Exception: # noqa: BLE001
pass
yield {"event": "sources", "data": json.dumps(affected_docs, ensure_ascii=False)}
# --- 2. Build context from retrieved chunks ---
context_parts = [
f"[文档{i}: {c.doc_title}]\n{(c.text or '')[:400]}"
for i, c in enumerate(chunks[:5], 1)
]
context = "\n\n".join(context_parts) if context_parts else "(知识库中暂无相关文档)"
# --- 3. Build prompt ---
effective = event.get("effective_at") or "待定"
user_content = f"""请对以下法规动态进行专业影响分析。
【法规动态】
标准编号:{event['standard_code']}
标题:{event['title']}
来源:{event['source_label']}
摘要:{event['summary']}
生效日期:{effective}
分类:{event['category']}
关键词:{', '.join(event['tags'])}
【知识库关联文档】
{context}
请用 Markdown 格式,从以下四个维度进行分析:
## 核心变化
列出本次法规更新最关键的 3-5 项变化(用 - 列表)
## 业务影响
分析对现有产品、认证流程、技术文档的具体影响
## 整改建议
给出优先级排序的行动清单(标注 🔴高 🟡中 🟢低 优先级)
## 时间节点
关键合规时间表与里程碑提醒"""
messages = [
{"role": "system", "content": _ANALYSIS_SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
# --- 4. Stream LLM response ---
try:
client = get_llm_client(
provider=settings.llm_provider,
model=settings.llm_model,
)
if hasattr(client, "stream_chat"):
for chunk in client.stream_chat(messages):
yield {"event": "content", "data": chunk}
else:
response = client.chat(messages)
yield {"event": "content", "data": response.content or ""}
except Exception as exc: # noqa: BLE001
yield {"event": "error", "data": str(exc)}
return
yield {"event": "done", "data": "{}"}