Files
AIRegulation-DocAnalysis/docs/superpowers/specs/2026-06-08-compliance-enhancement-design.md

460 lines
18 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Compliance Analysis Enhancement Design
**Date:** 2026-06-08
**Directions:** A (Analysis Quality) + B (History & Reports) + C (Deep Chat)
**Approach:** Three independent but coordinated feature sets sharing one DB schema (method one / structured tables).
---
## Goals
1. **A — Analysis Quality:** Parallel clause processing (3-5× speed), fix `highlight_terms` bug (always returns empty), add LLM retry with tenacity, reserve `PassThroughReranker` for future cross-encoder work.
2. **B — Analysis History & Reports:** Auto-save every completed analysis to PostgreSQL, history rail in UI, per-record DOCX export, delete with confirmation.
3. **C — Deep Chat:** Per-finding persistent chat threads grounded in real retrieved text, LLM-generated suggestion questions, multi-turn memory.
---
## Architecture Overview
### Layering Rules (must not be violated)
```
api/routes/ → thin HTTP handlers, SSE generators only
application/ → orchestration logic (pipeline.py)
domain/ports/ → ABCs, no implementation
infrastructure/ → DB, docx, external calls
shared/bootstrap.py → composition root, wires everything
```
New business logic goes in `application/compliance/pipeline.py` and domain ports. Never in `services/*` or `workflows/*`.
### Shared Database Schema (B + C)
Three tables, created together so C's FK references are valid from day one:
```sql
CREATE TABLE compliance_analyses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_by VARCHAR(255),
doc_name VARCHAR(500),
standard_name VARCHAR(500),
risk_score INTEGER,
conclusion TEXT,
actions JSONB,
para_text TEXT,
highlight_terms JSONB
);
CREATE TABLE compliance_findings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
seq INTEGER NOT NULL,
title VARCHAR(500),
description TEXT,
status VARCHAR(50),
clause_ref VARCHAR(200)
);
CREATE TABLE finding_chat_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
finding_id UUID NOT NULL REFERENCES compliance_findings(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL, -- 'user' | 'assistant'
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
---
## Direction A — Analysis Quality
### A1: Parallel Clause Processing
**Current:** Route handler has a sequential `for i, clause in enumerate(clauses)` loop. Each iteration calls `retrieve_for_clause()` then `check_clause_compliance()` synchronously via `asyncio.to_thread`.
**Change:** Extract a `process_single_clause(clause, idx, ...) -> dict` function in `pipeline.py`, then replace the loop with `asyncio.gather`:
```python
async def run_clauses_parallel(clauses, retrieval_svc, llm_client, standard_name, para_text):
tasks = [
asyncio.to_thread(process_single_clause, clause, i, retrieval_svc, llm_client, standard_name, para_text)
for i, clause in enumerate(clauses)
]
return await asyncio.gather(*tasks, return_exceptions=True)
```
Results are yielded to the SSE stream in original order. Exceptions from individual clauses are caught and emitted as `{type: "error", clause_index: i}` events rather than crashing the whole stream.
### A2: Fix highlight_terms
**Root cause:** `synthesize_conclusion()` passes the LLM response through `json.loads()` but the LLM often wraps output in markdown fences (` ```json ... ``` `), causing a parse failure and silent fallback to `[]`.
**Fix in `pipeline.py`:**
```python
import re
def _extract_json(text: str) -> dict:
"""Strip markdown fences then parse JSON. Raises ValueError on failure."""
cleaned = re.sub(r"^```(?:json)?\s*|\s*```$", "", text.strip(), flags=re.MULTILINE)
return json.loads(cleaned)
```
Apply `_extract_json` in `synthesize_conclusion()` instead of bare `json.loads`. Wrap with `@retry` (see A3) so transient parse failures get a second attempt.
### A3: LLM Retry with tenacity
`tenacity` is already in `requirements.txt` but unused. Add to all LLM calls in `pipeline.py`:
```python
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, ValueError)),
reraise=True,
)
def _call_llm_with_retry(client, prompt: str) -> str:
"""Call LLM and return raw text. Retries on HTTP errors and JSON parse failures."""
...
```
On final failure, the calling function catches and emits `{type: "error", text: "LLM call failed after 3 attempts"}` to the SSE stream.
### A4: PassThroughReranker (future-ready stub)
`domain/retrieval/ports.py` already defines a `Reranker` ABC. Add the no-op implementation:
**New file:** `backend/app/infrastructure/retrieval/reranker.py`
```python
from app.domain.retrieval.ports import Reranker, RetrievedChunk
class PassThroughReranker(Reranker):
"""No-op reranker. Replace with CrossEncoderReranker when a local model is available."""
def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
return chunks[:top_k]
```
Register in `shared/bootstrap.py` as the default `Reranker` implementation.
### A — Files Changed
| File | Action |
|------|--------|
| `backend/app/application/compliance/pipeline.py` | Add `process_single_clause`, `run_clauses_parallel`, `_extract_json`, `_call_llm_with_retry` |
| `backend/app/api/routes/compliance.py` | Replace sequential loop with `await run_clauses_parallel(...)` |
| `backend/app/infrastructure/retrieval/reranker.py` | New — `PassThroughReranker` |
| `backend/app/shared/bootstrap.py` | Register `PassThroughReranker` |
---
## Direction B — History & Reports
### B1: Domain Port
**New file:** `backend/app/domain/compliance/ports.py`
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class FindingRecord:
id: str
analysis_id: str
seq: int
title: str
description: str
status: str
clause_ref: Optional[str] = None
@dataclass
class AnalysisRecord:
id: str
created_at: datetime
created_by: Optional[str]
doc_name: str
standard_name: str
risk_score: int
conclusion: str
actions: list
para_text: str
highlight_terms: list
findings: list[FindingRecord] = field(default_factory=list)
class ComplianceRepository(ABC):
@abstractmethod
def save_analysis(self, record: AnalysisRecord) -> str: ...
@abstractmethod
def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]: ...
@abstractmethod
def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]: ...
@abstractmethod
def delete_analysis(self, analysis_id: str) -> None: ...
@abstractmethod
def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str: ...
@abstractmethod
def get_messages(self, finding_id: str) -> list[dict]: ...
```
### B2: PostgresComplianceRepository
**New file:** `backend/app/infrastructure/compliance/repository.py`
Implements `ComplianceRepository` using `psycopg2` (already in requirements). Connection string from `settings.DATABASE_URL`. Key methods:
- `save_analysis`: INSERT into `compliance_analyses`, then bulk INSERT findings into `compliance_findings`, return `analysis_id` (UUID string).
- `list_analyses`: SELECT with JOIN on findings count, ORDER BY `created_at DESC`, supports limit/offset.
- `get_analysis`: SELECT analysis + all findings by `analysis_id`.
- `delete_analysis`: DELETE cascades to findings and chat messages via FK.
- `save_message` / `get_messages`: INSERT/SELECT on `finding_chat_messages`.
Uses a connection pool (simple `psycopg2.pool.ThreadedConnectionPool`, min=1, max=5).
### B3: Auto-save Hook
In the SSE generator in `compliance.py`, after the `done` event is assembled:
```python
# After yielding the done event
if repo is not None:
record = AnalysisRecord(
id="", # will be assigned by DB
created_at=datetime.utcnow(),
created_by=current_user,
doc_name=doc_name,
standard_name=standard_name,
risk_score=done_payload["risk_score"],
conclusion=done_payload["conclusion"],
actions=done_payload["actions"],
para_text=done_payload["para_text"],
highlight_terms=done_payload["highlight_terms"],
findings=[FindingRecord(...) for f in accumulated_findings],
)
analysis_id = await asyncio.to_thread(repo.save_analysis, record)
# Emit an extra SSE event so frontend receives the analysis_id
yield f"data: {json.dumps({'type': 'saved', 'analysis_id': analysis_id})}\n\n"
```
### B4: New API Endpoints
Added to `backend/app/api/routes/compliance.py`:
```
GET /api/v1/compliance/history
Query params: limit=20&offset=0
Response: [{id, created_at, doc_name, standard_name, risk_score, finding_count}]
GET /api/v1/compliance/history/{analysis_id}
Response: full AnalysisRecord including findings list
DELETE /api/v1/compliance/history/{analysis_id}
Response: 204 No Content
GET /api/v1/compliance/history/{analysis_id}/download
Response: DOCX file (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
```
### B5: DOCX Export
**New file:** `backend/app/infrastructure/compliance/docx_export.py`
Uses `python-docx` (already in requirements). Generates a structured report:
- Cover: document name, standard, date, risk score badge
- Executive summary: conclusion paragraph
- Findings table: seq / title / status / clause_ref / description
- Action items: numbered list
- Footer: generated by AI Regulation Analysis System
```python
def generate_docx(record: AnalysisRecord) -> bytes:
"""Generate a DOCX compliance report and return as bytes."""
doc = Document()
# ... build document ...
buf = BytesIO()
doc.save(buf)
return buf.getvalue()
```
### B6: Frontend — History Rail
`CompliancePage.tsx` gains a left rail (same layout pattern as RagChat's `history-pane`):
```
┌──────────────┬─────────────────────────────────┐
│ History │ Main Analysis Area │
│ ────────── │ │
│ 2026-06-08 │ (current analysis or loaded │
│ doc.pdf │ read-only historical record) │
│ ⚠ 72 [↓][×]│ │
│ ────────── │ │
│ 2026-06-07 │ │
│ csms.pdf │ │
│ ✓ 15 [↓][×]│ │
└──────────────┴─────────────────────────────────┘
```
- `[↓]` triggers `GET /history/{id}/download` and saves the DOCX file
- `[×]` shows a confirmation dialog, then calls `DELETE /history/{id}`
- Clicking a row loads that analysis into the main area in read-only mode
- `PageStateContext.ComplianceState` gains `analysisId: string | null` and `isReadOnly: boolean`
On mount, the rail calls `GET /history?limit=20` to populate the list. The list re-fetches after delete or after a new analysis completes (triggered by the `saved` SSE event).
### B — Files Changed
| File | Action |
|------|--------|
| `backend/app/domain/compliance/ports.py` | New — `ComplianceRepository` ABC + data classes |
| `backend/app/infrastructure/compliance/repository.py` | New — `PostgresComplianceRepository` |
| `backend/app/infrastructure/compliance/docx_export.py` | New — `generate_docx()` |
| `backend/app/api/routes/compliance.py` | Add history endpoints + auto-save hook |
| `backend/app/shared/bootstrap.py` | Register `PostgresComplianceRepository` |
| `frontend/src/pages/Compliance/CompliancePage.tsx` | Add History Rail |
| `frontend/src/contexts/PageStateContext.tsx` | Add `analysisId`, `isReadOnly` to `ComplianceState` |
---
## Direction C — Deep Chat
### C1: New Chat Endpoints
Replace the existing `/compliance/chat/{segment_id}` (kept for backward compatibility but deprecated) with finding-scoped endpoints:
```
POST /api/v1/compliance/analyses/{analysis_id}/findings/{finding_id}/chat
Body: {query: string}
Response: SSE stream — chunk / done / error events
GET /api/v1/compliance/analyses/{analysis_id}/findings/{finding_id}/chat
Response: [{id, role, content, created_at}]
POST /api/v1/compliance/analyses/{analysis_id}/findings/{finding_id}/suggestions
Response: {questions: [string, string, string]}
```
### C2: Grounded Context Construction
New function in `pipeline.py`:
```python
def build_finding_context(finding: FindingRecord, analysis: AnalysisRecord) -> str:
"""
Build a grounded system context string for a finding chat thread.
Combines finding details with analysis metadata for LLM grounding.
"""
return (
f"Document: {analysis.doc_name}\n"
f"Standard: {analysis.standard_name}\n"
f"Finding [{finding.seq}]: {finding.title}\n"
f"Status: {finding.status}\n"
f"Clause reference: {finding.clause_ref or 'N/A'}\n"
f"Description: {finding.description}\n"
f"Overall conclusion: {analysis.conclusion}\n"
)
```
This string is prepended to the system prompt for every chat call — replacing the fragile `segment_context` approach.
### C3: Multi-turn Context
Chat handler fetches existing messages from `finding_chat_messages` via `repo.get_messages(finding_id)` and prepends them to the LLM call as `[{"role": "user"/"assistant", "content": "..."}]` message history. Max history: 10 most recent messages (5 turns) to avoid token overflow.
After each LLM response, both the user message and assistant message are saved via `repo.save_message()`.
### C4: Suggestion Generation
New function in `pipeline.py`:
```python
SUGGESTION_PROMPTS = {
"non_compliant": "Generate 3 questions focused on remediation steps and timeline.",
"partial": "Generate 3 questions focused on identifying the compliance gap.",
"compliant": "Generate 3 questions focused on maintaining and evidencing compliance.",
}
def generate_suggestions(finding: FindingRecord, analysis: AnalysisRecord, llm_client) -> list[str]:
"""
Generate 3 context-aware follow-up questions for a finding chat thread.
Returns a list of 3 question strings. Falls back to generic questions on error.
"""
focus = SUGGESTION_PROMPTS.get(finding.status, SUGGESTION_PROMPTS["partial"])
context = build_finding_context(finding, analysis)
prompt = f"{context}\n\n{focus}\nReturn JSON: {{\"questions\": [\"...\", \"...\", \"...\"]}}"
# ... call LLM, parse JSON, return list ...
# Fallback on error:
return ["What are the specific requirements?", "What is the remediation timeline?", "Which regulation clause applies?"]
```
### C5: Frontend — Finding Chat Drawer
New component: `frontend/src/pages/Compliance/FindingChatDrawer.tsx`
Drawer slides in from the right (CSS: `position: fixed; right: 0; width: 420px`), reusing existing CSS variables (`--surface`, `--border`, `--accent`).
Structure:
- Header: finding title + close button
- Suggestions section: 3 chip buttons (only shown before first user message; hidden after)
- Message list: scrollable, same bubble style as RagChat
- Composer: textarea + send button, same pattern as RagChat composer
State managed in `PageStateContext.ComplianceState`:
- `activeFindingId: string | null` — which finding's drawer is open
- Drawer open/close controlled by `activeFindingId !== null`
On open:
1. `GET /analyses/{id}/findings/{fid}/chat` → restore history
2. If history is empty: `POST /findings/{fid}/suggestions` → show chips
Each finding card in `CompliancePage.tsx` gains a `💬 Chat` button that sets `activeFindingId`.
### C — Files Changed
| File | Action |
|------|--------|
| `backend/app/api/routes/compliance.py` | Add 3 new finding-chat endpoints |
| `backend/app/application/compliance/pipeline.py` | Add `build_finding_context`, `generate_suggestions` |
| `backend/app/infrastructure/compliance/repository.py` | Add `save_message`, `get_messages` (already in port) |
| `frontend/src/pages/Compliance/FindingChatDrawer.tsx` | New component |
| `frontend/src/pages/Compliance/CompliancePage.tsx` | Add Chat button to finding cards, render drawer |
| `frontend/src/contexts/PageStateContext.tsx` | Add `activeFindingId` to `ComplianceState` |
---
## Implementation Order
Direction A must be completed first (parallel processing changes the route handler that B's auto-save hook attaches to). B must be completed before C (C's FK references require B's tables and repository).
```
A (parallel + bug fixes + reranker stub)
└→ B (schema migration + history + DOCX)
└→ C (finding chat + suggestions)
```
---
## Non-Goals
- PDF export (DOCX only; users convert via Word/WPS)
- Cross-encoder reranking (stub reserved, not implemented)
- Scheduled/automatic crawling
- User-level history isolation (all users share history — global visibility)
- Prompt version management or A/B testing
---
## Constraints
- Backend comments and docstrings: English only
- No new top-level libraries beyond those already in `requirements.txt` (`tenacity`, `python-docx`, `psycopg2-binary` are all present)
- `DOCUMENT_REPOSITORY_BACKEND=postgres``PostgresComplianceRepository`; any other value → raise `NotImplementedError` with a clear message (no mock fallback for compliance history)
- Git commits are made by the user, never automated