# Regulatory Signals Intelligence Enhancement — Design Spec > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace the 20-item hardcoded MockEventStore with real regulatory data from Chinese and international sources, add LLM-driven structured extraction, impact assessment, and semantic change diff — all accessible through a manual-trigger crawl in the frontend. **Architecture:** Crawler Service (httpx + BeautifulSoup) → PostgreSQL EventStore → LLM Pipeline (extract → assess → diff) → existing PerceptionService interface. New code follows `api → application → domain ports → infrastructure` layering; no new files in `services/*` or `workflows/*`; `shared/bootstrap.py` is the composition root. **Tech Stack:** httpx, BeautifulSoup4, sentence-transformers (for diff), existing LLM factory (deepseek/qwen), existing KnowledgeRetrievalService (RAG), PostgreSQL (already available), existing SSE infrastructure. --- ## 1. Data Sources | Source | URL | Method | Coverage | |--------|-----|--------|----------| | CATARC 汽车标准 | `https://www.catarc.org.cn/bzzxd/qcbz/index.html` | httpx + BeautifulSoup (static pages) | 国家/行业汽车标准列表 | | 国标委强制性标准 | `https://openstd.samr.gov.cn/bzgk/std/std_list_type?p.p1=1&p.p2=车&p.p90=circulation_date&p.p91=desc` | httpx + JSON API parse | 强制性国家标准,按"车"过滤 | | 国标委推荐性标准 | `https://openstd.samr.gov.cn/bzgk/std/std_list_type?p.p1=2&p.p2=车&p.p90=circulation_date&p.p91=desc` | httpx + JSON API parse | 推荐性国家标准,按"车"过滤 | | EUR-Lex | RSS + CELLAR REST API | pyeurlex / httpx | EU AI Act, automotive directives | | UN R155/R156 | CELLAR REST API (CELEX lookup) | httpx | UN-ECE cybersecurity/OTA regulations | Crawl is **manual-trigger only** — no cron/Celery Beat. Admin clicks "刷新数据源" in the frontend UI. --- ## 2. Database Schema ### New table: `regulation_events` ```sql CREATE TABLE IF NOT EXISTS regulation_events ( id TEXT PRIMARY KEY, -- sha256(source + standard_code)[:12] source TEXT NOT NULL, -- 'CATARC' | '国标委' | 'EUR-Lex' | 'UN-ECE' source_label TEXT, -- Human-readable source label standard_code TEXT NOT NULL, -- e.g. "GB 18384-2025", "EU/2024/1689" title TEXT NOT NULL, summary TEXT, -- Crawled abstract or first paragraph full_text_url TEXT, -- Original page URL status TEXT, -- 'enacted' | 'draft' | 'consultation' impact_level TEXT, -- 'high' | 'medium' | 'low' (LLM-assigned) published_at DATE, effective_at DATE, category TEXT, tags TEXT[], -- LLM structured extraction obligations JSONB, -- [{text, deontic, subject, object, condition}] deadlines JSONB, -- [{date, description}] scope TEXT, -- Applicability scope summary penalties TEXT, -- Penalty / consequence summary -- Change tracking content_hash TEXT, -- SHA256 of crawled full text previous_hash TEXT, -- Hash from prior crawl (NULL on first crawl) change_summary TEXT, -- LLM-generated description of changes changed_sections JSONB, -- [{old_text, new_text, change_type}] where cosine<0.85 -- Impact assessment affected_docs JSONB, -- [{doc_id, doc_name, score, key_clauses, recommendation}] -- Metadata crawled_at TIMESTAMPTZ DEFAULT now(), processed_at TIMESTAMPTZ, raw_storage_key TEXT -- MinIO path for raw HTML/PDF (optional) ); CREATE INDEX IF NOT EXISTS regulation_events_source_date ON regulation_events (source, published_at DESC); CREATE INDEX IF NOT EXISTS regulation_events_impact_date ON regulation_events (impact_level, published_at DESC); CREATE INDEX IF NOT EXISTS regulation_events_tags ON regulation_events USING gin(tags); ``` --- ## 3. Backend Architecture ### 3.1 File Map **New files (infrastructure layer):** - `backend/app/infrastructure/perception/crawlers/catarc_crawler.py` — CATARC scraper - `backend/app/infrastructure/perception/crawlers/guobiao_crawler.py` — 国标委 JSON API crawler - `backend/app/infrastructure/perception/crawlers/eurlex_crawler.py` — EUR-Lex RSS + CELLAR - `backend/app/infrastructure/perception/crawlers/base.py` — Abstract base class - `backend/app/infrastructure/perception/postgres_event_store.py` — PostgresEventStore (replaces MockEventStore) - `backend/app/infrastructure/perception/llm_pipeline.py` — Extract / assess / diff pipeline **New files (application layer):** - `backend/app/application/perception/crawl_service.py` — Orchestrates crawlers + LLM pipeline, exposes `run_crawl(sources)` + progress generator **Modified files:** - `backend/app/api/routes/perception.py` — Add `POST /crawl`, `GET /crawl/status` (SSE), `POST /events/{id}/process`, `GET /events/{id}/diff` - `backend/app/shared/bootstrap.py` — Wire `PostgresEventStore` + `CrawlService` + `LlmPipeline` when `DOCUMENT_REPOSITORY_BACKEND=postgres`; fallback to `MockEventStore` when `json` - `backend/app/config/settings.py` — Add `perception_crawl_timeout_seconds`, `perception_max_events_per_source` **Unchanged files:** - `backend/app/application/perception/services.py` — `PerceptionService` interface unchanged; only `_store` swap - `backend/app/infrastructure/perception/mock_event_store.py` — Kept for `json` backend mode ### 3.2 Domain Port (Abstract Interface) ```python # backend/app/infrastructure/perception/base_event_store.py from abc import ABC, abstractmethod class BaseEventStore(ABC): @abstractmethod def all(self) -> list[dict]: ... @abstractmethod def get(self, event_id: str) -> dict | None: ... @abstractmethod def filter(self, source=None, impact_level=None, limit=50) -> list[dict]: ... @abstractmethod def stats(self) -> dict: ... @abstractmethod def upsert(self, event: dict) -> None: ... # new — needed for crawl writes @abstractmethod def get_by_standard_code(self, code: str) -> dict | None: ... # for change detection ``` `MockEventStore` and `PostgresEventStore` both implement this interface. ### 3.3 Crawler Base Contract ```python # backend/app/infrastructure/perception/crawlers/base.py from abc import ABC, abstractmethod from dataclasses import dataclass @dataclass class RawEvent: source: str source_label: str standard_code: str title: str summary: str full_text_url: str status: str # 'enacted' | 'draft' | 'consultation' published_at: str # YYYY-MM-DD string effective_at: str | None category: str tags: list[str] raw_text: str # full crawled text for hashing + LLM class BaseCrawler(ABC): @abstractmethod def fetch(self, limit: int = 50) -> list[RawEvent]: ... ``` ### 3.4 LLM Pipeline ```python # backend/app/infrastructure/perception/llm_pipeline.py class LlmPipeline: """Runs three sequential LLM steps on a regulation event.""" def extract_structure(self, event: dict) -> dict: """Step 1: Extract obligations, deadlines, scope, penalties, impact_level. Returns dict with keys: obligations, deadlines, scope, penalties, impact_level. Uses JSON-mode or structured prompt; model retries once on parse failure. """ def assess_impact(self, event: dict, retrieval_service) -> list[dict]: """Step 2: RAG-based impact on existing knowledge base documents. Query = standard_code + title + first obligation texts. Returns list of {doc_id, doc_name, score, key_clauses, recommendation}. """ def compute_diff(self, old_text: str, new_text: str) -> dict: """Step 3: Semantic diff between old and new regulation text. Splits both texts by paragraph. Calls existing EmbeddingService (text-embedding-v3 via EMBEDDING_BASE_URL) to embed each paragraph, then computes cosine similarity. Changed paragraphs (cosine < 0.85) sent to LLM for change_type classification: 'tightened' | 'relaxed' | 'added' | 'removed' Returns {changed_sections: [...], change_summary: str}. Only called when content_hash differs from previous_hash. """ ``` ### 3.5 CrawlService ```python # backend/app/application/perception/crawl_service.py class CrawlService: def __init__(self, crawlers, event_store, llm_pipeline, retrieval_service): ... def run_crawl(self, sources: list[str] | None = None) -> Generator[dict, None, None]: """Manual-trigger crawl. Yields progress SSE dicts: {event: 'progress', data: {source, fetched, new, updated, stage}} {event: 'done', data: {total_new, total_updated, duration_ms}} {event: 'error', data: {source, message}} For each crawler: 1. fetch() RawEvents 2. hash check vs stored event → skip if unchanged 3. upsert raw event to DB 4. run LLM pipeline (extract → assess → diff) 5. upsert enriched event to DB 6. yield progress """ ``` --- ## 4. API Endpoints ### Existing (unchanged interface, new store backend) - `GET /api/v1/perception/stats` - `GET /api/v1/perception/events` - `GET /api/v1/perception/events/{id}` - `POST /api/v1/perception/events/{id}/analyze` (streaming) ### New endpoints ``` POST /api/v1/perception/crawl Body: { sources?: ["CATARC", "国标委", "EUR-Lex", "UN-ECE"] } Response: text/event-stream (SSE) Auth: requires current_user (admin/legal role) Streams progress events until done or error. POST /api/v1/perception/events/{id}/process Trigger LLM pipeline for a single already-crawled event. Response: { status: "ok", processed_at: "..." } Auth: requires current_user GET /api/v1/perception/events/{id}/diff Returns: { changed_sections: [...], change_summary: str, previous_hash: str } Returns 404 if no diff available (first crawl or no change detected). ``` --- ## 5. Frontend Changes ### 5.1 New: Crawl Control Bar (top of PerceptionPage) Above the stats-bar, add a `` component: - "刷新数据源" button — triggers `POST /crawl` (all sources) - Inline progress display: shows SSE progress events as a mini status line - e.g. "CATARC: 抓取中… | 国标委: 12 条新增 | EUR-Lex: 等待中" - On completion: shows "更新完成 — 新增 N 条,更新 M 条" - Disabled while crawl is in progress (prevents double-trigger) ### 5.2 Signal Card Enhancement Existing cards get two new indicators: - **NEW badge** — shown when `crawled_at` is within last 24h (green dot) - **CHANGED badge** — shown when `previous_hash != content_hash` and `change_summary` exists ### 5.3 Right Panel — Structured Tab Right detail panel adds a tab bar: **概览 | 义务条款 | 影响评估 | 变更对比** **义务条款 tab:** - Table: 义务描述 | 主体 | 对象 | 截止日期 - Tags for deontic type: 强制 / 禁止 / 允许 - Shows `obligations[]` + `deadlines[]` from DB **影响评估 tab:** - Replaces hardcoded MOCK_DOCS with real `affected_docs[]` from DB - Each row: document name, similarity score (%), key clause excerpt, LLM recommendation - "Run fresh assessment" button → triggers `POST /events/{id}/process` **变更对比 tab:** - Only visible when `change_summary` is non-null - Top: `change_summary` text (LLM prose) - Below: diff table with old/new paragraph pairs, change_type badge per row - Hidden (tab disabled) on first-crawl events with no prior version ### 5.4 Existing behavior preserved - `analyze` streaming (AI analysis) unchanged - Search/filter (source, impact) unchanged — now hits real DB data - Stats bar — now reflects real counts from PostgreSQL --- ## 6. Settings Additions ```python # backend/app/config/settings.py additions perception_crawl_timeout_seconds: int = Field(default=120, ...) perception_max_events_per_source: int = Field(default=100, ...) perception_diff_similarity_threshold: float = Field(default=0.85, ...) ``` ```env # .env additions PERCEPTION_CRAWL_TIMEOUT_SECONDS=120 PERCEPTION_MAX_EVENTS_PER_SOURCE=100 PERCEPTION_DIFF_SIMILARITY_THRESHOLD=0.85 ``` --- ## 7. Dependencies ``` # requirements.txt additions httpx>=0.27.0 # already likely present; confirm beautifulsoup4>=4.12.0 # HTML parsing for CATARC lxml>=5.0.0 # BeautifulSoup parser backend # sentence-transformers NOT added — diff uses existing text-embedding-v3 API (EMBEDDING_BASE_URL) ``` No new infrastructure required (PostgreSQL + MinIO + Milvus already available). --- ## 8. Backward Compatibility - `DOCUMENT_REPOSITORY_BACKEND=json` → `bootstrap.py` uses `MockEventStore` (unchanged behavior) - `DOCUMENT_REPOSITORY_BACKEND=postgres` → uses `PostgresEventStore` - Migration: run `CREATE TABLE` SQL on first startup (idempotent `CREATE TABLE IF NOT EXISTS`) - Existing 20 mock events are not seeded to PostgreSQL; PostgreSQL starts empty until first crawl --- ## 9. Out of Scope (this phase) - Automatic/scheduled crawling (Celery Beat) — manual trigger only - Playwright-based JS-rendered pages — all target sites work with httpx - Knowledge Graph (Neo4j / LightRAG) — future phase - Email/Slack webhook notifications — future phase - User-facing diff history (versioning beyond one prior snapshot) — future phase