"""Perception application service — event listing and streaming impact analysis.""" from __future__ import annotations import json from typing import Generator from app.application.knowledge.services import KnowledgeRetrievalService from app.infrastructure.perception.mock_event_store import MockEventStore from app.services.llm.llm_factory import get_llm_client from app.config.settings import settings _ANALYSIS_SYSTEM_PROMPT = ( "你是汽车行业法规合规专家,专注于中国国家标准(GB)、国际法规(UN-ECE、ISO)" "及欧盟法规(EUR-Lex)的解读与合规建议。回答需专业、简洁、结构清晰。" ) class PerceptionService: """Orchestrate regulatory event queries and streaming impact analysis.""" def __init__( self, event_store: MockEventStore, retrieval_service: KnowledgeRetrievalService, ) -> None: self._store = event_store self._retrieval = retrieval_service # ------------------------------------------------------------------ # Queries # ------------------------------------------------------------------ def list_events( self, *, source: str | None = None, impact_level: str | None = None, limit: int = 50, ) -> list[dict]: return self._store.filter(source=source, impact_level=impact_level, limit=limit) def get_event(self, event_id: str) -> dict | None: return self._store.get(event_id) def get_stats(self) -> dict: return self._store.stats() # ------------------------------------------------------------------ # Streaming analysis # ------------------------------------------------------------------ def analyze_event(self, event_id: str) -> Generator[dict, None, None]: """Yield SSE-ready dicts: sources → content chunks → done.""" event = self._store.get(event_id) if not event: yield {"event": "error", "data": f"事件 {event_id} 不存在"} return # --- 1. RAG retrieval: find related library documents --- query = event["title"] + " " + " ".join(event["tags"]) chunks: list = [] affected_docs: list[dict] = [] try: chunks = self._retrieval.retrieve(query=query, top_k=5) seen: set[str] = set() for chunk in chunks: if chunk.doc_id not in seen: seen.add(chunk.doc_id) affected_docs.append( { "doc_id": chunk.doc_id, "doc_title": chunk.doc_title, "score": round(float(chunk.score), 4), "snippet": (chunk.text or "")[:180], "clause": getattr(chunk, "section_title", "") or "", } ) except Exception: # noqa: BLE001 pass yield {"event": "sources", "data": json.dumps(affected_docs, ensure_ascii=False)} # --- 2. Build context from retrieved chunks --- context_parts = [ f"[文档{i}: {c.doc_title}]\n{(c.text or '')[:400]}" for i, c in enumerate(chunks[:5], 1) ] context = "\n\n".join(context_parts) if context_parts else "(知识库中暂无相关文档)" # --- 3. Build prompt --- effective = event.get("effective_at") or "待定" user_content = f"""请对以下法规动态进行专业影响分析。 【法规动态】 标准编号:{event['standard_code']} 标题:{event['title']} 来源:{event['source_label']} 摘要:{event['summary']} 生效日期:{effective} 分类:{event['category']} 关键词:{', '.join(event['tags'])} 【知识库关联文档】 {context} 请用 Markdown 格式,从以下四个维度进行分析: ## 核心变化 列出本次法规更新最关键的 3-5 项变化(用 - 列表) ## 业务影响 分析对现有产品、认证流程、技术文档的具体影响 ## 整改建议 给出优先级排序的行动清单(标注 🔴高 🟡中 🟢低 优先级) ## 时间节点 关键合规时间表与里程碑提醒""" messages = [ {"role": "system", "content": _ANALYSIS_SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ] # --- 4. Stream LLM response --- try: client = get_llm_client( provider=settings.llm_provider, model=settings.llm_model, ) if hasattr(client, "stream_chat"): for chunk in client.stream_chat(messages): yield {"event": "content", "data": chunk} else: response = client.chat(messages) yield {"event": "content", "data": response.content or ""} except Exception as exc: # noqa: BLE001 yield {"event": "error", "data": str(exc)} return yield {"event": "done", "data": "{}"}