- Removed multiple failed document entries from `documents.json`. - Added a new document entry with updated metadata and changed the index name to `regulations_dense_1024_v2`. - Updated architecture documentation to reflect changes in the Milvus collection name. - Adjusted requirements by removing the sqlalchemy dependency. - Modified test cases to align with new document structure and naming conventions. - Introduced a new test file for Milvus vector index runtime recovery and error handling. - Updated assertions in various test files to ensure compatibility with the new schema.
144 lines
4.9 KiB
Python
144 lines
4.9 KiB
Python
"""Perception application service — event listing and streaming impact analysis."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from typing import Generator
|
||
|
||
from app.application.knowledge.services import KnowledgeRetrievalService
|
||
from app.infrastructure.perception.mock_event_store import MockEventStore
|
||
from app.services.llm.llm_factory import get_llm_client
|
||
from app.config.settings import settings
|
||
|
||
|
||
_ANALYSIS_SYSTEM_PROMPT = (
|
||
"你是汽车行业法规合规专家,专注于中国国家标准(GB)、国际法规(UN-ECE、ISO)"
|
||
"及欧盟法规(EUR-Lex)的解读与合规建议。回答需专业、简洁、结构清晰。"
|
||
)
|
||
|
||
|
||
class PerceptionService:
|
||
"""Orchestrate regulatory event queries and streaming impact analysis."""
|
||
|
||
def __init__(
|
||
self,
|
||
event_store: MockEventStore,
|
||
retrieval_service: KnowledgeRetrievalService,
|
||
) -> None:
|
||
self._store = event_store
|
||
self._retrieval = retrieval_service
|
||
|
||
# ------------------------------------------------------------------
|
||
# Queries
|
||
# ------------------------------------------------------------------
|
||
|
||
def list_events(
|
||
self,
|
||
*,
|
||
source: str | None = None,
|
||
impact_level: str | None = None,
|
||
limit: int = 50,
|
||
) -> list[dict]:
|
||
return self._store.filter(source=source, impact_level=impact_level, limit=limit)
|
||
|
||
def get_event(self, event_id: str) -> dict | None:
|
||
return self._store.get(event_id)
|
||
|
||
def get_stats(self) -> dict:
|
||
return self._store.stats()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Streaming analysis
|
||
# ------------------------------------------------------------------
|
||
|
||
def analyze_event(self, event_id: str) -> Generator[dict, None, None]:
|
||
"""Yield SSE-ready dicts: sources → content chunks → done."""
|
||
event = self._store.get(event_id)
|
||
if not event:
|
||
yield {"event": "error", "data": f"事件 {event_id} 不存在"}
|
||
return
|
||
|
||
# --- 1. RAG retrieval: find related library documents ---
|
||
query = event["title"] + " " + " ".join(event["tags"])
|
||
chunks: list = []
|
||
affected_docs: list[dict] = []
|
||
try:
|
||
chunks = self._retrieval.retrieve(query=query, top_k=5)
|
||
seen: set[str] = set()
|
||
for chunk in chunks:
|
||
if chunk.doc_id not in seen:
|
||
seen.add(chunk.doc_id)
|
||
affected_docs.append(
|
||
{
|
||
"doc_id": chunk.doc_id,
|
||
"doc_title": chunk.doc_title,
|
||
"score": round(float(chunk.score), 4),
|
||
"snippet": (chunk.text or "")[:180],
|
||
"clause": getattr(chunk, "section_title", "") or "",
|
||
}
|
||
)
|
||
except Exception: # noqa: BLE001
|
||
pass
|
||
|
||
yield {"event": "sources", "data": json.dumps(affected_docs, ensure_ascii=False)}
|
||
|
||
# --- 2. Build context from retrieved chunks ---
|
||
context_parts = [
|
||
f"[文档{i}: {c.doc_title}]\n{(c.text or '')[:400]}"
|
||
for i, c in enumerate(chunks[:5], 1)
|
||
]
|
||
context = "\n\n".join(context_parts) if context_parts else "(知识库中暂无相关文档)"
|
||
|
||
# --- 3. Build prompt ---
|
||
effective = event.get("effective_at") or "待定"
|
||
user_content = f"""请对以下法规动态进行专业影响分析。
|
||
|
||
【法规动态】
|
||
标准编号:{event['standard_code']}
|
||
标题:{event['title']}
|
||
来源:{event['source_label']}
|
||
摘要:{event['summary']}
|
||
生效日期:{effective}
|
||
分类:{event['category']}
|
||
关键词:{', '.join(event['tags'])}
|
||
|
||
【知识库关联文档】
|
||
{context}
|
||
|
||
请用 Markdown 格式,从以下四个维度进行分析:
|
||
|
||
## 核心变化
|
||
列出本次法规更新最关键的 3-5 项变化(用 - 列表)
|
||
|
||
## 业务影响
|
||
分析对现有产品、认证流程、技术文档的具体影响
|
||
|
||
## 整改建议
|
||
给出优先级排序的行动清单(标注 🔴高 🟡中 🟢低 优先级)
|
||
|
||
## 时间节点
|
||
关键合规时间表与里程碑提醒"""
|
||
|
||
messages = [
|
||
{"role": "system", "content": _ANALYSIS_SYSTEM_PROMPT},
|
||
{"role": "user", "content": user_content},
|
||
]
|
||
|
||
# --- 4. Stream LLM response ---
|
||
try:
|
||
client = get_llm_client(
|
||
provider=settings.llm_provider,
|
||
model=settings.llm_model,
|
||
)
|
||
if hasattr(client, "stream_chat"):
|
||
for chunk in client.stream_chat(messages):
|
||
yield {"event": "content", "data": chunk}
|
||
else:
|
||
response = client.chat(messages)
|
||
yield {"event": "content", "data": response.content or ""}
|
||
except Exception as exc: # noqa: BLE001
|
||
yield {"event": "error", "data": str(exc)}
|
||
return
|
||
|
||
yield {"event": "done", "data": "{}"}
|