Files
AIRegulation-DocAnalysis/docs/superpowers/specs/2026-06-05-perception-intelligence-design.md
2026-06-08 11:16:28 +08:00

14 KiB

Regulatory Signals Intelligence Enhancement — Design Spec

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Replace the 20-item hardcoded MockEventStore with real regulatory data from Chinese and international sources, add LLM-driven structured extraction, impact assessment, and semantic change diff — all accessible through a manual-trigger crawl in the frontend.

Architecture: Crawler Service (httpx + BeautifulSoup) → PostgreSQL EventStore → LLM Pipeline (extract → assess → diff) → existing PerceptionService interface. New code follows api → application → domain ports → infrastructure layering; no new files in services/* or workflows/*; shared/bootstrap.py is the composition root.

Tech Stack: httpx, BeautifulSoup4, sentence-transformers (for diff), existing LLM factory (deepseek/qwen), existing KnowledgeRetrievalService (RAG), PostgreSQL (already available), existing SSE infrastructure.


1. Data Sources

Source URL Method Coverage
CATARC 汽车标准 https://www.catarc.org.cn/bzzxd/qcbz/index.html httpx + BeautifulSoup (static pages) 国家/行业汽车标准列表
国标委强制性标准 https://openstd.samr.gov.cn/bzgk/std/std_list_type?p.p1=1&p.p2=车&p.p90=circulation_date&p.p91=desc httpx + JSON API parse 强制性国家标准,按"车"过滤
国标委推荐性标准 https://openstd.samr.gov.cn/bzgk/std/std_list_type?p.p1=2&p.p2=车&p.p90=circulation_date&p.p91=desc httpx + JSON API parse 推荐性国家标准,按"车"过滤
EUR-Lex RSS + CELLAR REST API pyeurlex / httpx EU AI Act, automotive directives
UN R155/R156 CELLAR REST API (CELEX lookup) httpx UN-ECE cybersecurity/OTA regulations

Crawl is manual-trigger only — no cron/Celery Beat. Admin clicks "刷新数据源" in the frontend UI.


2. Database Schema

New table: regulation_events

CREATE TABLE IF NOT EXISTS regulation_events (
    id              TEXT PRIMARY KEY,          -- sha256(source + standard_code)[:12]
    source          TEXT NOT NULL,             -- 'CATARC' | '国标委' | 'EUR-Lex' | 'UN-ECE'
    source_label    TEXT,                      -- Human-readable source label
    standard_code   TEXT NOT NULL,             -- e.g. "GB 18384-2025", "EU/2024/1689"
    title           TEXT NOT NULL,
    summary         TEXT,                      -- Crawled abstract or first paragraph
    full_text_url   TEXT,                      -- Original page URL
    status          TEXT,                      -- 'enacted' | 'draft' | 'consultation'
    impact_level    TEXT,                      -- 'high' | 'medium' | 'low' (LLM-assigned)
    published_at    DATE,
    effective_at    DATE,
    category        TEXT,
    tags            TEXT[],
    -- LLM structured extraction
    obligations     JSONB,       -- [{text, deontic, subject, object, condition}]
    deadlines       JSONB,       -- [{date, description}]
    scope           TEXT,        -- Applicability scope summary
    penalties       TEXT,        -- Penalty / consequence summary
    -- Change tracking
    content_hash    TEXT,        -- SHA256 of crawled full text
    previous_hash   TEXT,        -- Hash from prior crawl (NULL on first crawl)
    change_summary  TEXT,        -- LLM-generated description of changes
    changed_sections JSONB,      -- [{old_text, new_text, change_type}] where cosine<0.85
    -- Impact assessment
    affected_docs   JSONB,       -- [{doc_id, doc_name, score, key_clauses, recommendation}]
    -- Metadata
    crawled_at      TIMESTAMPTZ DEFAULT now(),
    processed_at    TIMESTAMPTZ,
    raw_storage_key TEXT         -- MinIO path for raw HTML/PDF (optional)
);

CREATE INDEX IF NOT EXISTS regulation_events_source_date
    ON regulation_events (source, published_at DESC);
CREATE INDEX IF NOT EXISTS regulation_events_impact_date
    ON regulation_events (impact_level, published_at DESC);
CREATE INDEX IF NOT EXISTS regulation_events_tags
    ON regulation_events USING gin(tags);

3. Backend Architecture

3.1 File Map

New files (infrastructure layer):

  • backend/app/infrastructure/perception/crawlers/catarc_crawler.py — CATARC scraper
  • backend/app/infrastructure/perception/crawlers/guobiao_crawler.py — 国标委 JSON API crawler
  • backend/app/infrastructure/perception/crawlers/eurlex_crawler.py — EUR-Lex RSS + CELLAR
  • backend/app/infrastructure/perception/crawlers/base.py — Abstract base class
  • backend/app/infrastructure/perception/postgres_event_store.py — PostgresEventStore (replaces MockEventStore)
  • backend/app/infrastructure/perception/llm_pipeline.py — Extract / assess / diff pipeline

New files (application layer):

  • backend/app/application/perception/crawl_service.py — Orchestrates crawlers + LLM pipeline, exposes run_crawl(sources) + progress generator

Modified files:

  • backend/app/api/routes/perception.py — Add POST /crawl, GET /crawl/status (SSE), POST /events/{id}/process, GET /events/{id}/diff
  • backend/app/shared/bootstrap.py — Wire PostgresEventStore + CrawlService + LlmPipeline when DOCUMENT_REPOSITORY_BACKEND=postgres; fallback to MockEventStore when json
  • backend/app/config/settings.py — Add perception_crawl_timeout_seconds, perception_max_events_per_source

Unchanged files:

  • backend/app/application/perception/services.pyPerceptionService interface unchanged; only _store swap
  • backend/app/infrastructure/perception/mock_event_store.py — Kept for json backend mode

3.2 Domain Port (Abstract Interface)

# backend/app/infrastructure/perception/base_event_store.py
from abc import ABC, abstractmethod

class BaseEventStore(ABC):
    @abstractmethod
    def all(self) -> list[dict]: ...
    @abstractmethod
    def get(self, event_id: str) -> dict | None: ...
    @abstractmethod
    def filter(self, source=None, impact_level=None, limit=50) -> list[dict]: ...
    @abstractmethod
    def stats(self) -> dict: ...
    @abstractmethod
    def upsert(self, event: dict) -> None: ...      # new — needed for crawl writes
    @abstractmethod
    def get_by_standard_code(self, code: str) -> dict | None: ...  # for change detection

MockEventStore and PostgresEventStore both implement this interface.

3.3 Crawler Base Contract

# backend/app/infrastructure/perception/crawlers/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class RawEvent:
    source: str
    source_label: str
    standard_code: str
    title: str
    summary: str
    full_text_url: str
    status: str           # 'enacted' | 'draft' | 'consultation'
    published_at: str     # YYYY-MM-DD string
    effective_at: str | None
    category: str
    tags: list[str]
    raw_text: str         # full crawled text for hashing + LLM

class BaseCrawler(ABC):
    @abstractmethod
    def fetch(self, limit: int = 50) -> list[RawEvent]: ...

3.4 LLM Pipeline

# backend/app/infrastructure/perception/llm_pipeline.py

class LlmPipeline:
    """Runs three sequential LLM steps on a regulation event."""

    def extract_structure(self, event: dict) -> dict:
        """Step 1: Extract obligations, deadlines, scope, penalties, impact_level.
        
        Returns dict with keys: obligations, deadlines, scope, penalties, impact_level.
        Uses JSON-mode or structured prompt; model retries once on parse failure.
        """

    def assess_impact(self, event: dict, retrieval_service) -> list[dict]:
        """Step 2: RAG-based impact on existing knowledge base documents.
        
        Query = standard_code + title + first obligation texts.
        Returns list of {doc_id, doc_name, score, key_clauses, recommendation}.
        """

    def compute_diff(self, old_text: str, new_text: str) -> dict:
        """Step 3: Semantic diff between old and new regulation text.
        
        Splits both texts by paragraph. Calls existing EmbeddingService (text-embedding-v3
        via EMBEDDING_BASE_URL) to embed each paragraph, then computes cosine similarity.
        Changed paragraphs (cosine < 0.85) sent to LLM for change_type classification:
          'tightened' | 'relaxed' | 'added' | 'removed'
        Returns {changed_sections: [...], change_summary: str}.
        Only called when content_hash differs from previous_hash.
        """

3.5 CrawlService

# backend/app/application/perception/crawl_service.py

class CrawlService:
    def __init__(self, crawlers, event_store, llm_pipeline, retrieval_service): ...

    def run_crawl(self, sources: list[str] | None = None) -> Generator[dict, None, None]:
        """Manual-trigger crawl. Yields progress SSE dicts:
          {event: 'progress', data: {source, fetched, new, updated, stage}}
          {event: 'done', data: {total_new, total_updated, duration_ms}}
          {event: 'error', data: {source, message}}
        
        For each crawler:
          1. fetch() RawEvents
          2. hash check vs stored event → skip if unchanged
          3. upsert raw event to DB
          4. run LLM pipeline (extract → assess → diff)
          5. upsert enriched event to DB
          6. yield progress
        """

4. API Endpoints

Existing (unchanged interface, new store backend)

  • GET /api/v1/perception/stats
  • GET /api/v1/perception/events
  • GET /api/v1/perception/events/{id}
  • POST /api/v1/perception/events/{id}/analyze (streaming)

New endpoints

POST /api/v1/perception/crawl
  Body: { sources?: ["CATARC", "国标委", "EUR-Lex", "UN-ECE"] }
  Response: text/event-stream (SSE)
  Auth: requires current_user (admin/legal role)
  Streams progress events until done or error.

POST /api/v1/perception/events/{id}/process
  Trigger LLM pipeline for a single already-crawled event.
  Response: { status: "ok", processed_at: "..." }
  Auth: requires current_user

GET /api/v1/perception/events/{id}/diff
  Returns: { changed_sections: [...], change_summary: str, previous_hash: str }
  Returns 404 if no diff available (first crawl or no change detected).

5. Frontend Changes

5.1 New: Crawl Control Bar (top of PerceptionPage)

Above the stats-bar, add a <CrawlBar> component:

  • "刷新数据源" button — triggers POST /crawl (all sources)
  • Inline progress display: shows SSE progress events as a mini status line
    • e.g. "CATARC: 抓取中… | 国标委: 12 条新增 | EUR-Lex: 等待中"
  • On completion: shows "更新完成 — 新增 N 条,更新 M 条"
  • Disabled while crawl is in progress (prevents double-trigger)

5.2 Signal Card Enhancement

Existing cards get two new indicators:

  • NEW badge — shown when crawled_at is within last 24h (green dot)
  • CHANGED badge — shown when previous_hash != content_hash and change_summary exists

5.3 Right Panel — Structured Tab

Right detail panel adds a tab bar: 概览 | 义务条款 | 影响评估 | 变更对比

义务条款 tab:

  • Table: 义务描述 | 主体 | 对象 | 截止日期
  • Tags for deontic type: 强制 / 禁止 / 允许
  • Shows obligations[] + deadlines[] from DB

影响评估 tab:

  • Replaces hardcoded MOCK_DOCS with real affected_docs[] from DB
  • Each row: document name, similarity score (%), key clause excerpt, LLM recommendation
  • "Run fresh assessment" button → triggers POST /events/{id}/process

变更对比 tab:

  • Only visible when change_summary is non-null
  • Top: change_summary text (LLM prose)
  • Below: diff table with old/new paragraph pairs, change_type badge per row
  • Hidden (tab disabled) on first-crawl events with no prior version

5.4 Existing behavior preserved

  • analyze streaming (AI analysis) unchanged
  • Search/filter (source, impact) unchanged — now hits real DB data
  • Stats bar — now reflects real counts from PostgreSQL

6. Settings Additions

# backend/app/config/settings.py additions
perception_crawl_timeout_seconds: int = Field(default=120, ...)
perception_max_events_per_source: int = Field(default=100, ...)
perception_diff_similarity_threshold: float = Field(default=0.85, ...)
# .env additions
PERCEPTION_CRAWL_TIMEOUT_SECONDS=120
PERCEPTION_MAX_EVENTS_PER_SOURCE=100
PERCEPTION_DIFF_SIMILARITY_THRESHOLD=0.85

7. Dependencies

# requirements.txt additions
httpx>=0.27.0              # already likely present; confirm
beautifulsoup4>=4.12.0     # HTML parsing for CATARC
lxml>=5.0.0                # BeautifulSoup parser backend
# sentence-transformers NOT added — diff uses existing text-embedding-v3 API (EMBEDDING_BASE_URL)

No new infrastructure required (PostgreSQL + MinIO + Milvus already available).


8. Backward Compatibility

  • DOCUMENT_REPOSITORY_BACKEND=jsonbootstrap.py uses MockEventStore (unchanged behavior)
  • DOCUMENT_REPOSITORY_BACKEND=postgres → uses PostgresEventStore
  • Migration: run CREATE TABLE SQL on first startup (idempotent CREATE TABLE IF NOT EXISTS)
  • Existing 20 mock events are not seeded to PostgreSQL; PostgreSQL starts empty until first crawl

9. Out of Scope (this phase)

  • Automatic/scheduled crawling (Celery Beat) — manual trigger only
  • Playwright-based JS-rendered pages — all target sites work with httpx
  • Knowledge Graph (Neo4j / LightRAG) — future phase
  • Email/Slack webhook notifications — future phase
  • User-facing diff history (versioning beyond one prior snapshot) — future phase