Files
AIRegulation-DocAnalysis/docs/superpowers/specs/2026-06-08-compliance-enhancement-design.md

18 KiB
Raw Permalink Blame History

Compliance Analysis Enhancement Design

Date: 2026-06-08 Directions: A (Analysis Quality) + B (History & Reports) + C (Deep Chat) Approach: Three independent but coordinated feature sets sharing one DB schema (method one / structured tables).


Goals

  1. A — Analysis Quality: Parallel clause processing (3-5× speed), fix highlight_terms bug (always returns empty), add LLM retry with tenacity, reserve PassThroughReranker for future cross-encoder work.
  2. B — Analysis History & Reports: Auto-save every completed analysis to PostgreSQL, history rail in UI, per-record DOCX export, delete with confirmation.
  3. C — Deep Chat: Per-finding persistent chat threads grounded in real retrieved text, LLM-generated suggestion questions, multi-turn memory.

Architecture Overview

Layering Rules (must not be violated)

api/routes/         →  thin HTTP handlers, SSE generators only
application/        →  orchestration logic (pipeline.py)
domain/ports/       →  ABCs, no implementation
infrastructure/     →  DB, docx, external calls
shared/bootstrap.py →  composition root, wires everything

New business logic goes in application/compliance/pipeline.py and domain ports. Never in services/* or workflows/*.

Shared Database Schema (B + C)

Three tables, created together so C's FK references are valid from day one:

CREATE TABLE compliance_analyses (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    created_by      VARCHAR(255),
    doc_name        VARCHAR(500),
    standard_name   VARCHAR(500),
    risk_score      INTEGER,
    conclusion      TEXT,
    actions         JSONB,
    para_text       TEXT,
    highlight_terms JSONB
);

CREATE TABLE compliance_findings (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
    seq         INTEGER NOT NULL,
    title       VARCHAR(500),
    description TEXT,
    status      VARCHAR(50),
    clause_ref  VARCHAR(200)
);

CREATE TABLE finding_chat_messages (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
    finding_id  UUID NOT NULL REFERENCES compliance_findings(id) ON DELETE CASCADE,
    role        VARCHAR(20) NOT NULL,  -- 'user' | 'assistant'
    content     TEXT NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);

Direction A — Analysis Quality

A1: Parallel Clause Processing

Current: Route handler has a sequential for i, clause in enumerate(clauses) loop. Each iteration calls retrieve_for_clause() then check_clause_compliance() synchronously via asyncio.to_thread.

Change: Extract a process_single_clause(clause, idx, ...) -> dict function in pipeline.py, then replace the loop with asyncio.gather:

async def run_clauses_parallel(clauses, retrieval_svc, llm_client, standard_name, para_text):
    tasks = [
        asyncio.to_thread(process_single_clause, clause, i, retrieval_svc, llm_client, standard_name, para_text)
        for i, clause in enumerate(clauses)
    ]
    return await asyncio.gather(*tasks, return_exceptions=True)

Results are yielded to the SSE stream in original order. Exceptions from individual clauses are caught and emitted as {type: "error", clause_index: i} events rather than crashing the whole stream.

A2: Fix highlight_terms

Root cause: synthesize_conclusion() passes the LLM response through json.loads() but the LLM often wraps output in markdown fences (```json ... ```), causing a parse failure and silent fallback to [].

Fix in pipeline.py:

import re

def _extract_json(text: str) -> dict:
    """Strip markdown fences then parse JSON. Raises ValueError on failure."""
    cleaned = re.sub(r"^```(?:json)?\s*|\s*```$", "", text.strip(), flags=re.MULTILINE)
    return json.loads(cleaned)

Apply _extract_json in synthesize_conclusion() instead of bare json.loads. Wrap with @retry (see A3) so transient parse failures get a second attempt.

A3: LLM Retry with tenacity

tenacity is already in requirements.txt but unused. Add to all LLM calls in pipeline.py:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=4),
    retry=retry_if_exception_type((httpx.HTTPError, ValueError)),
    reraise=True,
)
def _call_llm_with_retry(client, prompt: str) -> str:
    """Call LLM and return raw text. Retries on HTTP errors and JSON parse failures."""
    ...

On final failure, the calling function catches and emits {type: "error", text: "LLM call failed after 3 attempts"} to the SSE stream.

A4: PassThroughReranker (future-ready stub)

domain/retrieval/ports.py already defines a Reranker ABC. Add the no-op implementation:

New file: backend/app/infrastructure/retrieval/reranker.py

from app.domain.retrieval.ports import Reranker, RetrievedChunk

class PassThroughReranker(Reranker):
    """No-op reranker. Replace with CrossEncoderReranker when a local model is available."""

    def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
        return chunks[:top_k]

Register in shared/bootstrap.py as the default Reranker implementation.

A — Files Changed

File Action
backend/app/application/compliance/pipeline.py Add process_single_clause, run_clauses_parallel, _extract_json, _call_llm_with_retry
backend/app/api/routes/compliance.py Replace sequential loop with await run_clauses_parallel(...)
backend/app/infrastructure/retrieval/reranker.py New — PassThroughReranker
backend/app/shared/bootstrap.py Register PassThroughReranker

Direction B — History & Reports

B1: Domain Port

New file: backend/app/domain/compliance/ports.py

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class FindingRecord:
    id: str
    analysis_id: str
    seq: int
    title: str
    description: str
    status: str
    clause_ref: Optional[str] = None

@dataclass
class AnalysisRecord:
    id: str
    created_at: datetime
    created_by: Optional[str]
    doc_name: str
    standard_name: str
    risk_score: int
    conclusion: str
    actions: list
    para_text: str
    highlight_terms: list
    findings: list[FindingRecord] = field(default_factory=list)

class ComplianceRepository(ABC):
    @abstractmethod
    def save_analysis(self, record: AnalysisRecord) -> str: ...
    @abstractmethod
    def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]: ...
    @abstractmethod
    def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]: ...
    @abstractmethod
    def delete_analysis(self, analysis_id: str) -> None: ...
    @abstractmethod
    def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str: ...
    @abstractmethod
    def get_messages(self, finding_id: str) -> list[dict]: ...

B2: PostgresComplianceRepository

New file: backend/app/infrastructure/compliance/repository.py

Implements ComplianceRepository using psycopg2 (already in requirements). Connection string from settings.DATABASE_URL. Key methods:

  • save_analysis: INSERT into compliance_analyses, then bulk INSERT findings into compliance_findings, return analysis_id (UUID string).
  • list_analyses: SELECT with JOIN on findings count, ORDER BY created_at DESC, supports limit/offset.
  • get_analysis: SELECT analysis + all findings by analysis_id.
  • delete_analysis: DELETE cascades to findings and chat messages via FK.
  • save_message / get_messages: INSERT/SELECT on finding_chat_messages.

Uses a connection pool (simple psycopg2.pool.ThreadedConnectionPool, min=1, max=5).

B3: Auto-save Hook

In the SSE generator in compliance.py, after the done event is assembled:

# After yielding the done event
if repo is not None:
    record = AnalysisRecord(
        id="",  # will be assigned by DB
        created_at=datetime.utcnow(),
        created_by=current_user,
        doc_name=doc_name,
        standard_name=standard_name,
        risk_score=done_payload["risk_score"],
        conclusion=done_payload["conclusion"],
        actions=done_payload["actions"],
        para_text=done_payload["para_text"],
        highlight_terms=done_payload["highlight_terms"],
        findings=[FindingRecord(...) for f in accumulated_findings],
    )
    analysis_id = await asyncio.to_thread(repo.save_analysis, record)
    # Emit an extra SSE event so frontend receives the analysis_id
    yield f"data: {json.dumps({'type': 'saved', 'analysis_id': analysis_id})}\n\n"

B4: New API Endpoints

Added to backend/app/api/routes/compliance.py:

GET    /api/v1/compliance/history
       Query params: limit=20&offset=0
       Response: [{id, created_at, doc_name, standard_name, risk_score, finding_count}]

GET    /api/v1/compliance/history/{analysis_id}
       Response: full AnalysisRecord including findings list

DELETE /api/v1/compliance/history/{analysis_id}
       Response: 204 No Content

GET    /api/v1/compliance/history/{analysis_id}/download
       Response: DOCX file (application/vnd.openxmlformats-officedocument.wordprocessingml.document)

B5: DOCX Export

New file: backend/app/infrastructure/compliance/docx_export.py

Uses python-docx (already in requirements). Generates a structured report:

  • Cover: document name, standard, date, risk score badge
  • Executive summary: conclusion paragraph
  • Findings table: seq / title / status / clause_ref / description
  • Action items: numbered list
  • Footer: generated by AI Regulation Analysis System
def generate_docx(record: AnalysisRecord) -> bytes:
    """Generate a DOCX compliance report and return as bytes."""
    doc = Document()
    # ... build document ...
    buf = BytesIO()
    doc.save(buf)
    return buf.getvalue()

B6: Frontend — History Rail

CompliancePage.tsx gains a left rail (same layout pattern as RagChat's history-pane):

┌──────────────┬─────────────────────────────────┐
│ History      │  Main Analysis Area              │
│ ──────────   │                                  │
│ 2026-06-08   │  (current analysis or loaded     │
│ doc.pdf      │   read-only historical record)   │
│ ⚠ 72  [↓][×]│                                  │
│ ──────────   │                                  │
│ 2026-06-07   │                                  │
│ csms.pdf     │                                  │
│ ✓ 15  [↓][×]│                                  │
└──────────────┴─────────────────────────────────┘
  • [↓] triggers GET /history/{id}/download and saves the DOCX file
  • [×] shows a confirmation dialog, then calls DELETE /history/{id}
  • Clicking a row loads that analysis into the main area in read-only mode
  • PageStateContext.ComplianceState gains analysisId: string | null and isReadOnly: boolean

On mount, the rail calls GET /history?limit=20 to populate the list. The list re-fetches after delete or after a new analysis completes (triggered by the saved SSE event).

B — Files Changed

File Action
backend/app/domain/compliance/ports.py New — ComplianceRepository ABC + data classes
backend/app/infrastructure/compliance/repository.py New — PostgresComplianceRepository
backend/app/infrastructure/compliance/docx_export.py New — generate_docx()
backend/app/api/routes/compliance.py Add history endpoints + auto-save hook
backend/app/shared/bootstrap.py Register PostgresComplianceRepository
frontend/src/pages/Compliance/CompliancePage.tsx Add History Rail
frontend/src/contexts/PageStateContext.tsx Add analysisId, isReadOnly to ComplianceState

Direction C — Deep Chat

C1: New Chat Endpoints

Replace the existing /compliance/chat/{segment_id} (kept for backward compatibility but deprecated) with finding-scoped endpoints:

POST /api/v1/compliance/analyses/{analysis_id}/findings/{finding_id}/chat
     Body: {query: string}
     Response: SSE stream — chunk / done / error events

GET  /api/v1/compliance/analyses/{analysis_id}/findings/{finding_id}/chat
     Response: [{id, role, content, created_at}]

POST /api/v1/compliance/analyses/{analysis_id}/findings/{finding_id}/suggestions
     Response: {questions: [string, string, string]}

C2: Grounded Context Construction

New function in pipeline.py:

def build_finding_context(finding: FindingRecord, analysis: AnalysisRecord) -> str:
    """
    Build a grounded system context string for a finding chat thread.
    Combines finding details with analysis metadata for LLM grounding.
    """
    return (
        f"Document: {analysis.doc_name}\n"
        f"Standard: {analysis.standard_name}\n"
        f"Finding [{finding.seq}]: {finding.title}\n"
        f"Status: {finding.status}\n"
        f"Clause reference: {finding.clause_ref or 'N/A'}\n"
        f"Description: {finding.description}\n"
        f"Overall conclusion: {analysis.conclusion}\n"
    )

This string is prepended to the system prompt for every chat call — replacing the fragile segment_context approach.

C3: Multi-turn Context

Chat handler fetches existing messages from finding_chat_messages via repo.get_messages(finding_id) and prepends them to the LLM call as [{"role": "user"/"assistant", "content": "..."}] message history. Max history: 10 most recent messages (5 turns) to avoid token overflow.

After each LLM response, both the user message and assistant message are saved via repo.save_message().

C4: Suggestion Generation

New function in pipeline.py:

SUGGESTION_PROMPTS = {
    "non_compliant": "Generate 3 questions focused on remediation steps and timeline.",
    "partial": "Generate 3 questions focused on identifying the compliance gap.",
    "compliant": "Generate 3 questions focused on maintaining and evidencing compliance.",
}

def generate_suggestions(finding: FindingRecord, analysis: AnalysisRecord, llm_client) -> list[str]:
    """
    Generate 3 context-aware follow-up questions for a finding chat thread.
    Returns a list of 3 question strings. Falls back to generic questions on error.
    """
    focus = SUGGESTION_PROMPTS.get(finding.status, SUGGESTION_PROMPTS["partial"])
    context = build_finding_context(finding, analysis)
    prompt = f"{context}\n\n{focus}\nReturn JSON: {{\"questions\": [\"...\", \"...\", \"...\"]}}"
    # ... call LLM, parse JSON, return list ...
    # Fallback on error:
    return ["What are the specific requirements?", "What is the remediation timeline?", "Which regulation clause applies?"]

C5: Frontend — Finding Chat Drawer

New component: frontend/src/pages/Compliance/FindingChatDrawer.tsx

Drawer slides in from the right (CSS: position: fixed; right: 0; width: 420px), reusing existing CSS variables (--surface, --border, --accent).

Structure:

  • Header: finding title + close button
  • Suggestions section: 3 chip buttons (only shown before first user message; hidden after)
  • Message list: scrollable, same bubble style as RagChat
  • Composer: textarea + send button, same pattern as RagChat composer

State managed in PageStateContext.ComplianceState:

  • activeFindingId: string | null — which finding's drawer is open
  • Drawer open/close controlled by activeFindingId !== null

On open:

  1. GET /analyses/{id}/findings/{fid}/chat → restore history
  2. If history is empty: POST /findings/{fid}/suggestions → show chips

Each finding card in CompliancePage.tsx gains a 💬 Chat button that sets activeFindingId.

C — Files Changed

File Action
backend/app/api/routes/compliance.py Add 3 new finding-chat endpoints
backend/app/application/compliance/pipeline.py Add build_finding_context, generate_suggestions
backend/app/infrastructure/compliance/repository.py Add save_message, get_messages (already in port)
frontend/src/pages/Compliance/FindingChatDrawer.tsx New component
frontend/src/pages/Compliance/CompliancePage.tsx Add Chat button to finding cards, render drawer
frontend/src/contexts/PageStateContext.tsx Add activeFindingId to ComplianceState

Implementation Order

Direction A must be completed first (parallel processing changes the route handler that B's auto-save hook attaches to). B must be completed before C (C's FK references require B's tables and repository).

A (parallel + bug fixes + reranker stub)
  └→ B (schema migration + history + DOCX)
       └→ C (finding chat + suggestions)

Non-Goals

  • PDF export (DOCX only; users convert via Word/WPS)
  • Cross-encoder reranking (stub reserved, not implemented)
  • Scheduled/automatic crawling
  • User-level history isolation (all users share history — global visibility)
  • Prompt version management or A/B testing

Constraints

  • Backend comments and docstrings: English only
  • No new top-level libraries beyond those already in requirements.txt (tenacity, python-docx, psycopg2-binary are all present)
  • DOCUMENT_REPOSITORY_BACKEND=postgresPostgresComplianceRepository; any other value → raise NotImplementedError with a clear message (no mock fallback for compliance history)
  • Git commits are made by the user, never automated