"""Define API routes for perception (regulatory intelligence).""" from __future__ import annotations import json from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from app.shared.bootstrap import get_crawl_service, get_event_store, get_perception_service from app.api.dependencies.auth import get_current_user from app.domain.auth.models import UserClaims from app.shared.async_utils import iter_in_thread router = APIRouter(prefix="/perception", tags=["智能感知"]) @router.get("/stats") async def get_perception_stats(): """Return KPI statistics for the perception dashboard.""" return get_perception_service().get_stats() @router.get("/events") async def list_events( source: str | None = Query(default=None, description="来源筛选 (MIIT/UN-ECE/ISO/国标委/EUR-Lex/IATF)"), impact_level: str | None = Query(default=None, description="影响等级 (high/medium/low)"), limit: int = Query(default=50, ge=1, le=100), ): """Return regulatory events with optional filters.""" events = get_perception_service().list_events( source=source, impact_level=impact_level, limit=limit, ) return {"events": events, "total": len(events)} @router.get("/events/{event_id}") async def get_event(event_id: str): """Return a single regulatory event by ID.""" event = get_perception_service().get_event(event_id) if event is None: from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Event {event_id} not found") return event @router.post("/events/{event_id}/analyze") async def analyze_event(event_id: str): """Stream SSE impact analysis for a regulatory event.""" service = get_perception_service() async def event_stream(): async for item in iter_in_thread(service.analyze_event(event_id)): event_name = item.get("event", "message") data = item.get("data", "") if isinstance(data, (dict, list)): data = json.dumps(data, ensure_ascii=False) yield f"event: {event_name}\ndata: {data}\n\n" return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) @router.post("/crawl") async def run_crawl( body: dict = None, current_user: UserClaims = Depends(get_current_user), ): """Trigger manual crawl of regulatory sources. Streams SSE progress. Body (optional): {"sources": ["CATARC", "国标委·强制性", "EUR-Lex"]} Omit sources to crawl all registered sources. """ sources: list[str] | None = (body or {}).get("sources") crawl_svc = get_crawl_service() async def crawl_stream(): async for item in iter_in_thread(crawl_svc.run_crawl(sources=sources)): event_name = item.get("event", "message") data = item.get("data", "") if isinstance(data, (dict, list)): data = json.dumps(data, ensure_ascii=False) yield f"event: {event_name}\ndata: {data}\n\n" return StreamingResponse( crawl_stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @router.post("/events/{event_id}/process") async def process_event( event_id: str, current_user: UserClaims = Depends(get_current_user), ): """Trigger LLM pipeline (extract + assess + diff) for a single event.""" from datetime import UTC, datetime from app.infrastructure.perception.llm_pipeline import LlmPipeline from app.shared.bootstrap import get_retrieval_service event = get_perception_service().get_event(event_id) if not event: from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Event {event_id} not found") store = get_event_store() pipeline = LlmPipeline() structure = pipeline.extract_structure(event) event.update(structure) event["affected_docs"] = pipeline.assess_impact(event, get_retrieval_service()) event["processed_at"] = datetime.now(UTC).isoformat() store.upsert(event) return {"status": "ok", "event_id": event_id, "processed_at": event["processed_at"]} @router.get("/events/{event_id}/diff") async def get_event_diff(event_id: str): """Return semantic diff detail for an event (only available if previously crawled twice).""" event = get_perception_service().get_event(event_id) if not event: from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Event {event_id} not found") if not event.get("change_summary"): from fastapi import HTTPException raise HTTPException(status_code=404, detail="No diff available for this event") return { "event_id": event_id, "change_summary": event.get("change_summary"), "changed_sections": event.get("changed_sections") or [], "previous_hash": event.get("previous_hash"), "content_hash": event.get("content_hash"), }