Files
AIRegulation-DocAnalysis/docs/superpowers/plans/2026-06-08-compliance-enhancement.md

2293 lines
78 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Compliance Analysis Enhancement Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Enhance the Compliance Analysis module with parallel clause processing + bug fixes (A), persistent analysis history + DOCX export (B), and per-finding persistent chat threads with LLM-generated suggestions (C).
**Architecture:** Direction A refactors the SSE pipeline in `app/api/routes/compliance.py` to run clauses in parallel via `asyncio.gather` and fixes `highlight_terms` and LLM retry. Direction B adds a PostgreSQL-backed `ComplianceRepository` (new domain port + infrastructure) and three REST endpoints plus a frontend History Rail. Direction C adds per-finding chat endpoints grounded in real analysis data and a React Drawer component. Implementation order: A → B → C (each direction is a prerequisite for the next).
**Tech Stack:** FastAPI (SSE), psycopg2 (ThreadedConnectionPool), python-docx, tenacity (retry), React 18, TypeScript, existing `PageStateContext` pattern.
---
## File Map
### Direction A — Analysis Quality
| File | Action |
|------|--------|
| `backend/app/application/compliance/pipeline.py` | Add `process_single_clause()`, `_call_llm_with_retry()` wrapper, add `tenacity` retry to `synthesize_conclusion` and `check_clause_compliance` |
| `backend/app/api/routes/compliance.py` | Replace sequential for-loop with `asyncio.gather` on `process_single_clause` |
| `backend/app/infrastructure/vectorstore/pass_through_reranker.py` | New — `PassThroughReranker` stub |
| `backend/tests/compliance/test_pipeline.py` | New — unit tests for parallel processing + highlight_terms fix |
### Direction B — History & Reports
| File | Action |
|------|--------|
| `backend/app/domain/compliance/__init__.py` | New — empty |
| `backend/app/domain/compliance/ports.py` | New — `FindingRecord`, `AnalysisRecord`, `ComplianceRepository` ABC |
| `backend/app/infrastructure/compliance/__init__.py` | New — empty |
| `backend/app/infrastructure/compliance/repository.py` | New — `PostgresComplianceRepository` |
| `backend/app/infrastructure/compliance/docx_export.py` | New — `generate_docx()` |
| `backend/app/api/routes/compliance.py` | Add history endpoints + auto-save hook in `analyze_stream` |
| `backend/app/shared/bootstrap.py` | Add `get_compliance_repository()` factory |
| `frontend/src/pages/Compliance/HistoryRail.tsx` | New — History Rail component |
| `frontend/src/pages/Compliance/CompliancePage.tsx` | Import + render `HistoryRail`, handle `saved` SSE event, store `analysisId` |
| `frontend/src/contexts/PageStateContext.tsx` | Add `analysisId`, `isReadOnly` to `ComplianceState` |
| `backend/tests/compliance/test_repository.py` | New — unit tests for repository (with mock psycopg2) |
### Direction C — Deep Chat
| File | Action |
|------|--------|
| `backend/app/application/compliance/pipeline.py` | Add `build_finding_context()`, `generate_suggestions()` |
| `backend/app/api/routes/compliance.py` | Add 3 finding-chat endpoints, deprecate old chat endpoint |
| `frontend/src/pages/Compliance/FindingChatDrawer.tsx` | New — per-finding chat drawer |
| `frontend/src/pages/Compliance/CompliancePage.tsx` | Add Chat button to finding cards, render `FindingChatDrawer` |
| `frontend/src/contexts/PageStateContext.tsx` | Add `activeFindingId` to `ComplianceState` |
| `backend/tests/compliance/test_pipeline.py` | Extend — test `build_finding_context` + `generate_suggestions` |
---
## Direction A Tasks
### Task 1: PassThroughReranker stub
**Files:**
- Create: `backend/app/infrastructure/vectorstore/pass_through_reranker.py`
- Test: `backend/tests/compliance/test_pipeline.py`
- [ ] **Step 1: Create test file and write failing test**
```python
# backend/tests/compliance/test_pipeline.py
import pytest
from app.infrastructure.vectorstore.pass_through_reranker import PassThroughReranker
from app.domain.retrieval.models import RetrievedChunk
def _make_chunk(score: float) -> RetrievedChunk:
return RetrievedChunk(
doc_id="d1",
doc_title="Test Doc",
section_title="S1",
text="some text",
score=score,
page_start=1,
)
def test_pass_through_returns_top_k():
reranker = PassThroughReranker()
chunks = [_make_chunk(0.9), _make_chunk(0.8), _make_chunk(0.7)]
result = reranker.rerank(query="test", chunks=chunks, top_k=2)
assert len(result) == 2
assert result[0].score == 0.9
def test_pass_through_returns_all_when_top_k_exceeds():
reranker = PassThroughReranker()
chunks = [_make_chunk(0.5)]
result = reranker.rerank(query="test", chunks=chunks, top_k=10)
assert len(result) == 1
```
- [ ] **Step 2: Run test to verify it fails**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_pass_through_returns_top_k -v
```
Expected: `ModuleNotFoundError` or `ImportError``pass_through_reranker` does not exist yet.
- [ ] **Step 3: Create `tests/compliance/__init__.py`**
```python
# backend/tests/compliance/__init__.py
```
- [ ] **Step 4: Create the reranker**
```python
# backend/app/infrastructure/vectorstore/pass_through_reranker.py
"""No-op reranker stub.
Returns the original candidate list sliced to top_k.
Replace with CrossEncoderReranker when a local cross-encoder model is available.
"""
from __future__ import annotations
from app.domain.retrieval.models import RetrievedChunk
from app.domain.retrieval.ports import Reranker
class PassThroughReranker(Reranker):
"""Pass-through reranker that preserves original retrieval order.
Acts as a placeholder for future cross-encoder reranking (e.g. ms-marco-MiniLM).
Wire via bootstrap.get_compliance_reranker() when ready to swap.
"""
def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
"""Return the first top_k chunks without reordering."""
return chunks[:top_k]
```
- [ ] **Step 5: Run tests to verify they pass**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_pass_through_returns_top_k tests/compliance/test_pipeline.py::test_pass_through_returns_all_when_top_k_exceeds -v
```
Expected: 2 passed.
---
### Task 2: Parallel clause processing + LLM retry
**Files:**
- Modify: `backend/app/application/compliance/pipeline.py`
- Modify: `backend/app/api/routes/compliance.py`
- Test: `backend/tests/compliance/test_pipeline.py`
- [ ] **Step 1: Write failing tests**
Add these tests to `backend/tests/compliance/test_pipeline.py`:
```python
import asyncio
from unittest.mock import MagicMock, patch
def _make_mock_client(content: str = '{"status":"ok","title":"T","desc":"D","clause_ref":"A1"}'):
client = MagicMock()
response = MagicMock()
response.is_success = True
response.content = content
client.chat.return_value = response
return client
def _make_mock_retrieval():
svc = MagicMock()
svc.retrieve.return_value = []
return svc
def test_process_single_clause_returns_finding():
from app.application.compliance.pipeline import process_single_clause
client = _make_mock_client()
svc = _make_mock_retrieval()
result = process_single_clause("test clause", 0, svc, client, "EU AI Act", "para")
assert result["finding"] is not None
assert result["index"] == 0
assert result["chunks"] == []
def test_run_clauses_parallel_runs_all():
from app.application.compliance.pipeline import run_clauses_parallel
client = _make_mock_client()
svc = _make_mock_retrieval()
clauses = ["clause one", "clause two", "clause three"]
results = asyncio.run(run_clauses_parallel(clauses, svc, client, "EU AI Act", "para"))
assert len(results) == 3
assert all(r["index"] == i for i, r in enumerate(results))
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_process_single_clause_returns_finding -v
```
Expected: `ImportError: cannot import name 'process_single_clause'`
- [ ] **Step 3: Add `process_single_clause` and `run_clauses_parallel` to `pipeline.py`**
Add after line 109 (after `retrieve_for_clause`), before `check_clause_compliance`:
```python
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import httpx
def process_single_clause(
clause: str,
index: int,
retrieval_service: "KnowledgeRetrievalService",
client: "BaseLLMClient",
standard_name: str,
para_text: str,
top_k: int = 5,
domains: str | None = None,
) -> dict:
"""Process one clause: retrieve relevant regulations then check compliance.
Returns a dict with keys: index, chunks, finding (may be None on LLM failure).
Designed to run inside asyncio.to_thread() for parallel execution.
"""
chunks = retrieve_for_clause(clause, retrieval_service, top_k, domains)
finding = check_clause_compliance(clause, chunks, client)
return {"index": index, "chunks": chunks, "finding": finding}
async def run_clauses_parallel(
clauses: list[str],
retrieval_service: "KnowledgeRetrievalService",
client: "BaseLLMClient",
standard_name: str,
para_text: str,
top_k: int = 5,
domains: str | None = None,
) -> list[dict]:
"""Run all clauses through retrieve+gap-check in parallel.
Results are returned in the original clause order even though processing
is concurrent. Exceptions in individual clauses are caught and returned as
dicts with finding=None so the stream continues for remaining clauses.
"""
tasks = [
asyncio.to_thread(
process_single_clause,
clause, i, retrieval_service, client, standard_name, para_text, top_k, domains,
)
for i, clause in enumerate(clauses)
]
raw = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, r in enumerate(raw):
if isinstance(r, Exception):
logger.warning("Clause {} processing failed: {}", i, r)
results.append({"index": i, "chunks": [], "finding": None})
else:
results.append(r)
return results
```
Also add the imports at the top of `pipeline.py` (after existing imports):
```python
import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
```
- [ ] **Step 4: Wrap LLM calls in `check_clause_compliance` with retry**
Replace the direct `client.chat(...)` call in `check_clause_compliance` (line 137) with:
```python
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, ValueError)),
reraise=True,
)
def _llm_check(prompt_messages: list[dict]) -> "BaseLLMClient":
resp = client.chat(prompt_messages, max_tokens=500)
if not resp.is_success:
raise ValueError(f"LLM returned non-success status for gap check")
return resp
```
Then in `check_clause_compliance`, replace:
```python
response = client.chat([{"role": "user", "content": prompt}], max_tokens=500)
if not response.is_success:
return None
```
with:
```python
try:
response = _llm_check([{"role": "user", "content": prompt}])
except Exception as exc:
logger.warning("check_clause_compliance LLM call failed after retries: {}", exc)
return None
```
Apply the same pattern to `synthesize_conclusion` (line 189): wrap `client.chat(...)` in a nested retry function and catch on final failure to return `fallback`.
**Also fix the `highlight_terms` prompt bug** — the current prompt uses `["Key terms to highlight, max 10 terms"]` as an example string, so the LLM returns the literal example instead of real terms. In `synthesize_conclusion`, replace that prompt line with:
```python
' "highlight_terms": ["term1", "term2"], // up to 10 key technical/legal terms actually present in the text\n'
```
- [ ] **Step 5: Replace sequential loop in `compliance.py` with parallel version**
In `backend/app/api/routes/compliance.py`, replace the Stage 3 block (lines 138168):
```python
# ── Stage 3: retrieve + gap check per clause ──────────────────
findings: list[dict] = []
for i, clause in enumerate(clauses):
yield _sse({
"type": "stage",
"stage": "analyzing",
"label": f"Analyzing clause {i + 1}/{len(clauses)}…",
})
await asyncio.sleep(0)
chunks = await asyncio.to_thread(
retrieve_for_clause, clause, retrieval_service, 5, domains or None
)
# Emit source events
for chunk in chunks[:3]:
yield _sse({...})
await asyncio.sleep(0)
finding = await asyncio.to_thread(check_clause_compliance, clause, chunks, client)
if finding:
findings.append(finding)
yield _sse({"type": "finding", **finding})
await asyncio.sleep(0)
```
With:
```python
# ── Stage 3: retrieve + gap check (parallel across all clauses) ────────────
from app.application.compliance.pipeline import run_clauses_parallel
findings: list[dict] = []
yield _sse({
"type": "stage",
"stage": "analyzing",
"label": f"Analyzing {len(clauses)} clauses in parallel…",
})
await asyncio.sleep(0)
clause_results = await run_clauses_parallel(
clauses, retrieval_service, client,
standard_name=title or "",
para_text=para_text,
top_k=5,
domains=domains or None,
)
for res in clause_results:
i = res["index"]
chunks = res["chunks"]
finding = res["finding"]
# Emit source events for this clause
for chunk in chunks[:3]:
yield _sse({
"type": "source",
"standard": getattr(chunk, "doc_title", "") or getattr(chunk, "doc_name", ""),
"clause": getattr(chunk, "section_title", "") or "",
"score": round(float(getattr(chunk, "score", 0)), 3),
"status": "retrieved",
"full_content": (getattr(chunk, "text", "") or "")[:300],
})
if finding:
findings.append(finding)
yield _sse({"type": "finding", **finding})
await asyncio.sleep(0)
```
Also update the imports at the top of the `generate()` inner function — add `run_clauses_parallel` to the `from app.application.compliance.pipeline import (...)` block and remove individual imports of `retrieve_for_clause`, `check_clause_compliance`.
- [ ] **Step 6: Run all Direction A tests**
```
cd backend
python -m pytest tests/compliance/ -v
```
Expected: all pass.
- [ ] **Step 7: Smoke test the SSE endpoint manually**
```bash
curl -X POST http://localhost:8000/api/v1/compliance/analyze-stream \
-H "Authorization: Bearer <token>" \
-F "text=The system shall implement CSMS as per ISO 21434 requirements. All training data must be documented and version controlled." \
--no-buffer
```
Expected: SSE stream with one `stage:analyzing` event (not per-clause), then multiple `finding` events, then `done` with non-empty `highlight_terms` array.
---
## Direction B Tasks
### Task 3: Domain port + data classes
**Files:**
- Create: `backend/app/domain/compliance/__init__.py`
- Create: `backend/app/domain/compliance/ports.py`
- Test: `backend/tests/compliance/test_repository.py`
- [ ] **Step 1: Write failing test**
```python
# backend/tests/compliance/test_repository.py
from datetime import datetime
from app.domain.compliance.ports import (
AnalysisRecord,
FindingRecord,
ComplianceRepository,
)
def test_analysis_record_construction():
record = AnalysisRecord(
id="",
created_at=datetime.utcnow(),
created_by="user1",
doc_name="test.pdf",
standard_name="EU AI Act",
risk_score=72,
conclusion="Some gaps found.",
actions=[{"label": "Fix", "value": "Update docs"}],
para_text="The system shall...",
highlight_terms=["CSMS", "ISO 21434"],
findings=[
FindingRecord(
id="",
analysis_id="",
seq=0,
title="Missing CSMS",
description="No CSMS certification found.",
status="risk",
clause_ref="Art.9.1",
)
],
)
assert record.doc_name == "test.pdf"
assert len(record.findings) == 1
assert record.findings[0].status == "risk"
def test_compliance_repository_is_abstract():
import inspect
assert inspect.isabstract(ComplianceRepository)
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_repository.py -v
```
Expected: `ModuleNotFoundError: No module named 'app.domain.compliance'`
- [ ] **Step 3: Create domain module**
```python
# backend/app/domain/compliance/__init__.py
```
- [ ] **Step 4: Create `ports.py`**
```python
# backend/app/domain/compliance/ports.py
"""Domain ports for compliance history persistence."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class FindingRecord:
"""Single finding row linked to an analysis."""
id: str
analysis_id: str
seq: int
title: str
description: str
status: str # "ok" | "warn" | "risk"
clause_ref: Optional[str] = None
@dataclass
class AnalysisRecord:
"""Full compliance analysis record with nested findings."""
id: str # UUID string; empty string means not yet persisted
created_at: datetime
created_by: Optional[str]
doc_name: str
standard_name: str
risk_score: int
conclusion: str
actions: list # list[dict] — serialised action items
para_text: str
highlight_terms: list # list[str]
findings: list[FindingRecord] = field(default_factory=list)
class ComplianceRepository(ABC):
"""Port for persisting and retrieving compliance analysis records."""
@abstractmethod
def save_analysis(self, record: AnalysisRecord) -> str:
"""Persist a new analysis record and return the assigned UUID string."""
@abstractmethod
def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]:
"""Return analyses ordered by created_at DESC, without nested findings."""
@abstractmethod
def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]:
"""Return a single analysis with all nested findings, or None."""
@abstractmethod
def delete_analysis(self, analysis_id: str) -> None:
"""Delete an analysis and all related findings and chat messages (cascade)."""
@abstractmethod
def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str:
"""Persist a chat message and return its UUID string."""
@abstractmethod
def get_messages(self, finding_id: str) -> list[dict]:
"""Return chat messages for a finding ordered by created_at ASC.
Each dict has keys: id, role, content, created_at (ISO string).
"""
```
- [ ] **Step 5: Run tests to verify they pass**
```
cd backend
python -m pytest tests/compliance/test_repository.py -v
```
Expected: 2 passed.
---
### Task 4: PostgresComplianceRepository
**Files:**
- Create: `backend/app/infrastructure/compliance/__init__.py`
- Create: `backend/app/infrastructure/compliance/repository.py`
- Test: `backend/tests/compliance/test_repository.py`
- [ ] **Step 1: Write failing test**
Add to `backend/tests/compliance/test_repository.py`:
```python
from unittest.mock import MagicMock, patch, call
from datetime import datetime
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
def _mock_pool():
"""Return a mock psycopg2 ThreadedConnectionPool."""
conn = MagicMock()
cursor = MagicMock()
cursor.__enter__ = MagicMock(return_value=cursor)
cursor.__exit__ = MagicMock(return_value=False)
conn.cursor.return_value = cursor
pool = MagicMock()
pool.getconn.return_value = conn
return pool, conn, cursor
@patch("app.infrastructure.compliance.repository.psycopg2.pool.ThreadedConnectionPool")
@patch("app.infrastructure.compliance.repository.psycopg2.extras")
def test_save_analysis_returns_uuid(mock_extras, mock_pool_cls):
from app.infrastructure.compliance.repository import PostgresComplianceRepository
pool, conn, cursor = _mock_pool()
mock_pool_cls.return_value = pool
cursor.fetchone.return_value = {"id": "abc-123"}
repo = PostgresComplianceRepository(
host="localhost", port=5432, user="u", password="p", dbname="db"
)
record = AnalysisRecord(
id="", created_at=datetime.utcnow(), created_by="user1",
doc_name="doc.pdf", standard_name="EU AI Act",
risk_score=50, conclusion="OK", actions=[], para_text="p",
highlight_terms=[], findings=[],
)
result = repo.save_analysis(record)
assert result == "abc-123"
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_repository.py::test_save_analysis_returns_uuid -v
```
Expected: `ModuleNotFoundError: No module named 'app.infrastructure.compliance'`
- [ ] **Step 3: Create infrastructure module**
```python
# backend/app/infrastructure/compliance/__init__.py
```
- [ ] **Step 4: Implement `repository.py`**
```python
# backend/app/infrastructure/compliance/repository.py
"""PostgreSQL-backed compliance analysis repository.
Follows the same psycopg2 pattern as PostgresDocumentRepository:
ThreadedConnectionPool + RealDictCursor for reads, _ensure_schema on init.
"""
from __future__ import annotations
import json
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import psycopg2
import psycopg2.extras
import psycopg2.pool
from loguru import logger
from app.domain.compliance.ports import (
AnalysisRecord,
ComplianceRepository,
FindingRecord,
)
class PostgresComplianceRepository(ComplianceRepository):
"""Stores compliance analyses, findings, and finding chat messages in PostgreSQL."""
def __init__(
self,
host: str,
port: int,
user: str,
password: str,
dbname: str,
minconn: int = 1,
maxconn: int = 5,
) -> None:
self._pool = psycopg2.pool.ThreadedConnectionPool(
minconn=minconn,
maxconn=maxconn,
host=host,
port=port,
user=user,
password=password,
dbname=dbname,
)
self._ensure_schema()
@contextmanager
def _conn(self):
conn = self._pool.getconn()
try:
yield conn
finally:
self._pool.putconn(conn)
def _ensure_schema(self) -> None:
"""Create tables if they do not exist."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS compliance_analyses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_by VARCHAR(255),
doc_name VARCHAR(500),
standard_name VARCHAR(500),
risk_score INTEGER,
conclusion TEXT,
actions JSONB,
para_text TEXT,
highlight_terms JSONB
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS compliance_findings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
seq INTEGER NOT NULL,
title VARCHAR(500),
description TEXT,
status VARCHAR(50),
clause_ref VARCHAR(200)
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS finding_chat_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
finding_id UUID NOT NULL REFERENCES compliance_findings(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
""")
conn.commit()
def save_analysis(self, record: AnalysisRecord) -> str:
"""Insert analysis + findings; return the new analysis UUID."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
INSERT INTO compliance_analyses
(created_by, doc_name, standard_name, risk_score,
conclusion, actions, para_text, highlight_terms)
VALUES
(%(created_by)s, %(doc_name)s, %(standard_name)s, %(risk_score)s,
%(conclusion)s, %(actions)s, %(para_text)s, %(highlight_terms)s)
RETURNING id
""",
{
"created_by": record.created_by,
"doc_name": record.doc_name,
"standard_name": record.standard_name,
"risk_score": record.risk_score,
"conclusion": record.conclusion,
"actions": json.dumps(record.actions, ensure_ascii=False),
"para_text": record.para_text,
"highlight_terms": json.dumps(record.highlight_terms, ensure_ascii=False),
},
)
row = cur.fetchone()
analysis_id = str(row["id"])
# Insert findings
if record.findings:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
for f in record.findings:
cur.execute(
"""
INSERT INTO compliance_findings
(analysis_id, seq, title, description, status, clause_ref)
VALUES
(%(analysis_id)s, %(seq)s, %(title)s, %(desc)s, %(status)s, %(clause_ref)s)
""",
{
"analysis_id": analysis_id,
"seq": f.seq,
"title": f.title,
"desc": f.description,
"status": f.status,
"clause_ref": f.clause_ref,
},
)
conn.commit()
return analysis_id
def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]:
"""Return analyses without nested findings, ordered newest first."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, created_at, created_by, doc_name, standard_name,
risk_score, conclusion, actions, para_text, highlight_terms
FROM compliance_analyses
ORDER BY created_at DESC
LIMIT %(limit)s OFFSET %(offset)s
""",
{"limit": limit, "offset": offset},
)
rows = cur.fetchall()
return [self._row_to_record(dict(r)) for r in rows]
def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]:
"""Return analysis with nested findings list."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT * FROM compliance_analyses WHERE id = %(id)s",
{"id": analysis_id},
)
row = cur.fetchone()
if not row:
return None
record = self._row_to_record(dict(row))
cur.execute(
"""
SELECT id, analysis_id, seq, title, description, status, clause_ref
FROM compliance_findings
WHERE analysis_id = %(id)s
ORDER BY seq
""",
{"id": analysis_id},
)
findings = [
FindingRecord(
id=str(r["id"]),
analysis_id=str(r["analysis_id"]),
seq=r["seq"],
title=r["title"] or "",
description=r["description"] or "",
status=r["status"] or "ok",
clause_ref=r["clause_ref"],
)
for r in cur.fetchall()
]
record.findings = findings
return record
def delete_analysis(self, analysis_id: str) -> None:
"""Delete analysis; findings and chat messages cascade automatically."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM compliance_analyses WHERE id = %(id)s",
{"id": analysis_id},
)
conn.commit()
def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str:
"""Persist a chat message; return its UUID."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
INSERT INTO finding_chat_messages
(analysis_id, finding_id, role, content)
VALUES
(%(analysis_id)s, %(finding_id)s, %(role)s, %(content)s)
RETURNING id
""",
{
"analysis_id": analysis_id,
"finding_id": finding_id,
"role": role,
"content": content,
},
)
row = cur.fetchone()
conn.commit()
return str(row["id"])
def get_messages(self, finding_id: str) -> list[dict]:
"""Return messages for a finding, oldest first."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, role, content, created_at
FROM finding_chat_messages
WHERE finding_id = %(finding_id)s
ORDER BY created_at ASC
""",
{"finding_id": finding_id},
)
rows = cur.fetchall()
return [
{
"id": str(r["id"]),
"role": r["role"],
"content": r["content"],
"created_at": r["created_at"].isoformat() if r["created_at"] else "",
}
for r in rows
]
def _row_to_record(self, row: dict) -> AnalysisRecord:
"""Convert a RealDictCursor row to an AnalysisRecord (no findings)."""
actions = row.get("actions") or []
if isinstance(actions, str):
actions = json.loads(actions)
highlight_terms = row.get("highlight_terms") or []
if isinstance(highlight_terms, str):
highlight_terms = json.loads(highlight_terms)
return AnalysisRecord(
id=str(row["id"]),
created_at=row["created_at"] if isinstance(row["created_at"], datetime) else datetime.utcnow(),
created_by=row.get("created_by"),
doc_name=row.get("doc_name") or "",
standard_name=row.get("standard_name") or "",
risk_score=int(row.get("risk_score") or 0),
conclusion=row.get("conclusion") or "",
actions=actions,
para_text=row.get("para_text") or "",
highlight_terms=highlight_terms,
findings=[],
)
```
- [ ] **Step 5: Run tests**
```
cd backend
python -m pytest tests/compliance/test_repository.py -v
```
Expected: all pass.
---
### Task 5: DOCX export
**Files:**
- Create: `backend/app/infrastructure/compliance/docx_export.py`
- Test: `backend/tests/compliance/test_repository.py`
- [ ] **Step 1: Write failing test**
Add to `backend/tests/compliance/test_repository.py`:
```python
from datetime import datetime
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
def test_generate_docx_returns_bytes():
from app.infrastructure.compliance.docx_export import generate_docx
record = AnalysisRecord(
id="test-id", created_at=datetime(2026, 6, 8), created_by="user1",
doc_name="test.pdf", standard_name="EU AI Act",
risk_score=72, conclusion="Several gaps found.",
actions=[{"label": "Fix", "value": "Update CSMS docs"}],
para_text="The system shall implement CSMS.",
highlight_terms=["CSMS"],
findings=[
FindingRecord(
id="f1", analysis_id="test-id", seq=0,
title="Missing CSMS", description="No CSMS cert.",
status="risk", clause_ref="Art.9.1",
)
],
)
data = generate_docx(record)
assert isinstance(data, bytes)
assert len(data) > 1000 # DOCX is at minimum a ZIP with ~1 KB overhead
# Verify it's a valid ZIP (DOCX = ZIP container)
import zipfile, io
assert zipfile.is_zipfile(io.BytesIO(data))
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_repository.py::test_generate_docx_returns_bytes -v
```
Expected: `ModuleNotFoundError: No module named 'app.infrastructure.compliance.docx_export'`
- [ ] **Step 3: Implement `docx_export.py`**
```python
# backend/app/infrastructure/compliance/docx_export.py
"""DOCX report generator for compliance analysis results.
Uses python-docx (already in requirements.txt). Returns raw bytes so the
caller can stream the response without writing to disk.
"""
from __future__ import annotations
from datetime import datetime
from io import BytesIO
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from app.domain.compliance.ports import AnalysisRecord
_STATUS_LABEL = {"ok": "Compliant", "warn": "Warning", "risk": "Non-Compliant"}
_STATUS_COLOR = {
"ok": RGBColor(0x22, 0x8B, 0x22), # forest green
"warn": RGBColor(0xFF, 0x8C, 0x00), # dark orange
"risk": RGBColor(0xDC, 0x14, 0x3C), # crimson
}
def generate_docx(record: AnalysisRecord) -> bytes:
"""Generate a compliance report DOCX and return its raw bytes.
Structure:
- Cover: document name, standard, date, risk score
- Executive summary (conclusion)
- Findings table
- Recommended actions
- Footer note
"""
doc = Document()
# ── Cover ──────────────────────────────────────────────────────────────────
title_para = doc.add_heading("Compliance Analysis Report", level=0)
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.add_paragraph("")
meta_table = doc.add_table(rows=4, cols=2)
meta_table.style = "Table Grid"
labels = ["Document", "Standard", "Date", "Risk Score"]
values = [
record.doc_name,
record.standard_name,
record.created_at.strftime("%Y-%m-%d %H:%M UTC") if record.created_at else "",
f"{record.risk_score} / 100",
]
for i, (label, value) in enumerate(zip(labels, values)):
meta_table.cell(i, 0).text = label
meta_table.cell(i, 1).text = value
# ── Executive Summary ──────────────────────────────────────────────────────
doc.add_heading("Executive Summary", level=1)
doc.add_paragraph(record.conclusion)
# ── Findings ───────────────────────────────────────────────────────────────
doc.add_heading("Findings", level=1)
if record.findings:
table = doc.add_table(rows=1, cols=4)
table.style = "Table Grid"
hdr = table.rows[0].cells
for i, h in enumerate(["#", "Status", "Title", "Description / Clause"]):
hdr[i].text = h
for run in hdr[i].paragraphs[0].runs:
run.bold = True
for f in record.findings:
row = table.add_row().cells
row[0].text = str(f.seq + 1)
row[1].text = _STATUS_LABEL.get(f.status, f.status)
row[2].text = f.title
desc = f.description
if f.clause_ref:
desc += f"\n[{f.clause_ref}]"
row[3].text = desc
else:
doc.add_paragraph("No findings recorded.")
# ── Recommended Actions ────────────────────────────────────────────────────
doc.add_heading("Recommended Actions", level=1)
for i, action in enumerate(record.actions, start=1):
label = action.get("label", "Action")
value = action.get("value", "")
doc.add_paragraph(f"{i}. {label}: {value}", style="List Number")
# ── Footer note ────────────────────────────────────────────────────────────
doc.add_paragraph("")
footer = doc.add_paragraph(
f"Generated by AI Regulation Analysis System — {datetime.utcnow().strftime('%Y-%m-%d')}"
)
footer.alignment = WD_ALIGN_PARAGRAPH.CENTER
for run in footer.runs:
run.font.size = Pt(8)
run.font.color.rgb = RGBColor(0x88, 0x88, 0x88)
buf = BytesIO()
doc.save(buf)
return buf.getvalue()
```
- [ ] **Step 4: Run test**
```
cd backend
python -m pytest tests/compliance/test_repository.py::test_generate_docx_returns_bytes -v
```
Expected: PASS.
---
### Task 6: Bootstrap factory + history API endpoints
**Files:**
- Modify: `backend/app/shared/bootstrap.py`
- Modify: `backend/app/api/routes/compliance.py`
- [ ] **Step 1: Add `get_compliance_repository()` to `bootstrap.py`**
At the top of `bootstrap.py`, add imports (after existing infrastructure imports):
```python
from app.domain.compliance.ports import ComplianceRepository
from app.infrastructure.compliance.repository import PostgresComplianceRepository
```
After the `get_event_store()` factory (around line 310), add:
```python
@lru_cache
def get_compliance_repository() -> ComplianceRepository:
"""Return the compliance analysis repository.
Requires document_repository_backend=postgres and valid postgres_* settings.
Raises NotImplementedError for any other backend value.
"""
if settings.document_repository_backend != "postgres":
raise NotImplementedError(
f"ComplianceRepository requires document_repository_backend=postgres, "
f"got '{settings.document_repository_backend}'. "
"Set DOCUMENT_REPOSITORY_BACKEND=postgres in your .env file."
)
return PostgresComplianceRepository(
host=settings.postgres_host,
port=settings.postgres_port,
user=settings.postgres_user,
password=settings.postgres_password,
dbname=settings.postgres_db,
)
```
- [ ] **Step 2: Add auto-save hook in `analyze_stream`**
In `backend/app/api/routes/compliance.py`, after the `yield _sse({"type": "done", **conclusion_data})` line (line 177), add:
```python
# Auto-save analysis to database
try:
from app.shared.bootstrap import get_compliance_repository
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
from datetime import datetime
repo = get_compliance_repository()
finding_records = [
FindingRecord(
id="",
analysis_id="",
seq=i,
title=f.get("title", ""),
description=f.get("desc", ""),
status=f.get("status", "ok"),
clause_ref=f.get("clause_ref"),
)
for i, f in enumerate(findings)
]
record = AnalysisRecord(
id="",
created_at=datetime.utcnow(),
created_by=current_user.username if hasattr(current_user, "username") else None,
doc_name=file_name or (title or "Pasted text"),
standard_name=title or "",
risk_score=conclusion_data.get("risk_score", 0),
conclusion=conclusion_data.get("conclusion", ""),
actions=conclusion_data.get("actions", []),
para_text=conclusion_data.get("para_text", ""),
highlight_terms=conclusion_data.get("highlight_terms", []),
findings=finding_records,
)
analysis_id = await asyncio.to_thread(repo.save_analysis, record)
yield _sse({"type": "saved", "analysis_id": analysis_id})
except NotImplementedError:
pass # No postgres backend configured — skip saving
except Exception as exc:
logger.warning("Failed to auto-save compliance analysis: {}", exc)
```
- [ ] **Step 3: Add history endpoints to `compliance.py`**
Add these routes after the existing `compliance_chat` route:
```python
@router.get("/history")
async def list_history(
limit: int = 20,
offset: int = 0,
current_user: UserClaims = Depends(get_current_user),
):
"""Return paginated list of saved compliance analyses (newest first)."""
from app.shared.bootstrap import get_compliance_repository
try:
repo = get_compliance_repository()
records = await asyncio.to_thread(repo.list_analyses, limit, offset)
return [
{
"id": r.id,
"created_at": r.created_at.isoformat(),
"created_by": r.created_by,
"doc_name": r.doc_name,
"standard_name": r.standard_name,
"risk_score": r.risk_score,
"finding_count": len(r.findings),
}
for r in records
]
except NotImplementedError:
return []
@router.get("/history/{analysis_id}")
async def get_history_item(
analysis_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Return full analysis record including findings."""
from app.shared.bootstrap import get_compliance_repository
from fastapi import HTTPException
repo = get_compliance_repository()
record = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not record:
raise HTTPException(status_code=404, detail="Analysis not found")
return {
"id": record.id,
"created_at": record.created_at.isoformat(),
"created_by": record.created_by,
"doc_name": record.doc_name,
"standard_name": record.standard_name,
"risk_score": record.risk_score,
"conclusion": record.conclusion,
"actions": record.actions,
"para_text": record.para_text,
"highlight_terms": record.highlight_terms,
"findings": [
{
"id": f.id,
"seq": f.seq,
"title": f.title,
"description": f.description,
"status": f.status,
"clause_ref": f.clause_ref,
}
for f in record.findings
],
}
@router.delete("/history/{analysis_id}", status_code=204)
async def delete_history_item(
analysis_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Delete a saved analysis (cascade removes findings and chat messages)."""
from app.shared.bootstrap import get_compliance_repository
repo = get_compliance_repository()
await asyncio.to_thread(repo.delete_analysis, analysis_id)
@router.get("/history/{analysis_id}/download")
async def download_history_docx(
analysis_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Return a DOCX compliance report for the given analysis."""
from app.shared.bootstrap import get_compliance_repository
from app.infrastructure.compliance.docx_export import generate_docx
from fastapi import HTTPException
from fastapi.responses import Response
repo = get_compliance_repository()
record = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not record:
raise HTTPException(status_code=404, detail="Analysis not found")
docx_bytes = await asyncio.to_thread(generate_docx, record)
safe_name = (record.doc_name or "report").replace(" ", "_")[:50]
filename = f"compliance_{safe_name}_{record.created_at.strftime('%Y%m%d')}.docx"
return Response(
content=docx_bytes,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
```
- [ ] **Step 4: Smoke test endpoints**
Start the backend and run:
```bash
# List history (empty initially)
curl http://localhost:8000/api/v1/compliance/history \
-H "Authorization: Bearer <token>"
# Expected: []
# Trigger an analysis and verify the response includes a "saved" SSE event
curl -X POST http://localhost:8000/api/v1/compliance/analyze-stream \
-H "Authorization: Bearer <token>" \
-F "text=System shall implement CSMS per ISO 21434." \
--no-buffer | grep '"type":"saved"'
# Expected: data: {"type": "saved", "analysis_id": "<uuid>"}
```
---
### Task 7: Frontend — PageStateContext update + HistoryRail component
**Files:**
- Modify: `frontend/src/contexts/PageStateContext.tsx`
- Create: `frontend/src/pages/Compliance/HistoryRail.tsx`
- Modify: `frontend/src/pages/Compliance/CompliancePage.tsx`
- [ ] **Step 1: Update `ComplianceState` in `PageStateContext.tsx`**
Find the `ComplianceState` interface (around line 78) and add two fields:
```typescript
export interface ComplianceState {
status: ComplianceStatus;
stageLabel: string;
stageKey: string;
meta: ComplianceMeta | null;
sources: ComplianceSourceEvent[];
findings: ComplianceFindingEvent[];
done: ComplianceDonePayload | null;
errorText: string;
// Direction B additions:
analysisId: string | null;
isReadOnly: boolean;
}
```
Update `COMPLIANCE_INIT` to include the new fields:
```typescript
const COMPLIANCE_INIT: ComplianceState = {
status: 'idle',
stageLabel: '',
stageKey: '',
meta: null,
sources: [],
findings: [],
done: null,
errorText: '',
analysisId: null,
isReadOnly: false,
};
```
- [ ] **Step 2: Create `HistoryRail.tsx`**
```tsx
// frontend/src/pages/Compliance/HistoryRail.tsx
import { useEffect, useState, useCallback } from 'react';
import { Download, Trash2 } from 'lucide-react';
const TOKEN_KEY = 'auth_token';
function authHeader(): Record<string, string> {
const t = localStorage.getItem(TOKEN_KEY);
return t ? { Authorization: `Bearer ${t}` } : {};
}
interface HistoryItem {
id: string;
created_at: string;
doc_name: string;
standard_name: string;
risk_score: number;
finding_count: number;
}
interface Props {
refreshTrigger: number; // increment to force re-fetch (after new analysis saved)
onSelect: (id: string) => void;
selectedId: string | null;
}
function riskClass(score: number): string {
if (score >= 70) return 'risk-high';
if (score >= 40) return 'risk-medium';
return 'risk-low';
}
export function HistoryRail({ refreshTrigger, onSelect, selectedId }: Props) {
const [items, setItems] = useState<HistoryItem[]>([]);
const [deletingId, setDeletingId] = useState<string | null>(null);
const fetchHistory = useCallback(() => {
fetch('/api/v1/compliance/history?limit=30', { headers: authHeader() })
.then(r => r.json())
.then(data => {
if (Array.isArray(data)) setItems(data);
})
.catch(() => {/* backend may not have postgres configured */});
}, []);
useEffect(() => { fetchHistory(); }, [fetchHistory, refreshTrigger]);
function handleDownload(e: React.MouseEvent, item: HistoryItem) {
e.stopPropagation();
const url = `/api/v1/compliance/history/${item.id}/download`;
const a = document.createElement('a');
a.href = url;
a.download = '';
// Pass auth via query param isn't ideal; use fetch + blob for token-auth download
fetch(url, { headers: authHeader() })
.then(r => r.blob())
.then(blob => {
const blobUrl = URL.createObjectURL(blob);
const link = document.createElement('a');
link.href = blobUrl;
link.download = `compliance-${item.doc_name.slice(0, 30)}.docx`;
link.click();
URL.revokeObjectURL(blobUrl);
});
}
function handleDelete(e: React.MouseEvent, item: HistoryItem) {
e.stopPropagation();
if (!window.confirm(`Delete analysis for "${item.doc_name}"?`)) return;
setDeletingId(item.id);
fetch(`/api/v1/compliance/history/${item.id}`, {
method: 'DELETE',
headers: authHeader(),
})
.then(() => {
setItems(prev => prev.filter(i => i.id !== item.id));
setDeletingId(null);
})
.catch(() => setDeletingId(null));
}
function formatDate(iso: string): string {
try {
return new Date(iso).toLocaleDateString(undefined, { month: 'short', day: 'numeric' });
} catch {
return iso.slice(0, 10);
}
}
if (items.length === 0) {
return (
<div className="history-pane" style={{ minWidth: 200, maxWidth: 220 }}>
<div className="history-header">History</div>
<p style={{ padding: '12px 16px', fontSize: 12, color: 'var(--muted)', lineHeight: 1.5 }}>
Completed analyses appear here.
</p>
</div>
);
}
return (
<div className="history-pane" style={{ minWidth: 200, maxWidth: 220, overflowY: 'auto' }}>
<div className="history-header">History</div>
{items.map(item => (
<div
key={item.id}
className={`quick-item${selectedId === item.id ? ' active' : ''}`}
onClick={() => onSelect(item.id)}
style={{ cursor: 'pointer' }}
>
<div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 2 }}>
{formatDate(item.created_at)}
</div>
<div style={{ fontSize: 12, fontWeight: 500, marginBottom: 4, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
{item.doc_name || 'Untitled'}
</div>
<div style={{ display: 'flex', alignItems: 'center', gap: 6 }}>
<span className={`risk-badge ${riskClass(item.risk_score)}`} style={{ fontSize: 10 }}>
{item.risk_score}
</span>
<span style={{ fontSize: 10, color: 'var(--muted)', flex: 1 }}>
{item.finding_count} finding{item.finding_count !== 1 ? 's' : ''}
</span>
<button
className="btn icon-btn"
title="Download DOCX"
onClick={e => handleDownload(e, item)}
style={{ padding: '2px 4px' }}
>
<Download size={11} />
</button>
<button
className="btn icon-btn danger"
title="Delete"
disabled={deletingId === item.id}
onClick={e => handleDelete(e, item)}
style={{ padding: '2px 4px' }}
>
<Trash2 size={11} />
</button>
</div>
</div>
))}
</div>
);
}
```
- [ ] **Step 3: Integrate `HistoryRail` into `CompliancePage.tsx`**
At the top of `CompliancePage.tsx`, add imports:
```tsx
import { HistoryRail } from './HistoryRail';
```
Inside the `CompliancePage` component, add state:
```tsx
const [historyRefresh, setHistoryRefresh] = useState(0);
```
In `useComplianceAnalysis` hook usage in `CompliancePage`, the `run` SSE loop now emits a `saved` event. Handle it in `useComplianceAnalysis.ts` — add in the SSE parsing block (after the `done` handler):
```typescript
} else if (j.type === 'saved') {
setState(s => ({ ...s, analysisId: j.analysis_id ?? null }));
}
```
Back in `CompliancePage.tsx`, watch for `state.analysisId` changes and trigger history re-fetch:
```tsx
const prevAnalysisIdRef = useRef<string | null>(null);
useEffect(() => {
if (state.analysisId && state.analysisId !== prevAnalysisIdRef.current) {
prevAnalysisIdRef.current = state.analysisId;
setHistoryRefresh(n => n + 1);
}
}, [state.analysisId]);
```
Add a `handleSelectHistory` function that loads a historical record:
```tsx
async function handleSelectHistory(id: string) {
const res = await fetch(`/api/v1/compliance/history/${id}`, { headers: authHeader() });
if (!res.ok) return;
const data = await res.json();
// Reconstruct state from historical record (read-only view)
setComplianceState({
status: 'done',
stageLabel: 'Complete',
stageKey: 'concluding',
meta: { title: data.doc_name, sourceType: 'doc', startedAt: data.created_at },
sources: [],
findings: (data.findings || []).map((f: any) => ({
title: f.title,
desc: f.description,
status: f.status,
clause_ref: f.clause_ref,
})),
done: {
conclusion: data.conclusion,
actions: data.actions,
risk_score: data.risk_score,
highlight_terms: data.highlight_terms,
para_text: data.para_text,
},
errorText: '',
analysisId: data.id,
isReadOnly: true,
});
}
```
Note: `setComplianceState` is available from `usePageState()` — add it to the destructuring alongside `complianceState`.
In the JSX return, wrap the existing 3-column workspace with a flex row that includes `HistoryRail` on the left:
```tsx
<div style={{ display: 'flex', flex: 1, overflow: 'hidden' }}>
<HistoryRail
refreshTrigger={historyRefresh}
onSelect={handleSelectHistory}
selectedId={state.analysisId}
/>
<div style={{ flex: 1, overflow: 'hidden' }}>
{/* existing 3-column workspace JSX here */}
</div>
</div>
```
- [ ] **Step 4: TypeScript check**
```
cd frontend
npx tsc --noEmit
```
Expected: 0 errors.
---
## Direction C Tasks
### Task 8: `build_finding_context` + `generate_suggestions` in pipeline
**Files:**
- Modify: `backend/app/application/compliance/pipeline.py`
- Test: `backend/tests/compliance/test_pipeline.py`
- [ ] **Step 1: Write failing tests**
Add to `backend/tests/compliance/test_pipeline.py`:
```python
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
from datetime import datetime
def _sample_analysis() -> AnalysisRecord:
return AnalysisRecord(
id="a1", created_at=datetime(2026, 6, 8), created_by="u",
doc_name="doc.pdf", standard_name="EU AI Act",
risk_score=72, conclusion="Gaps found.", actions=[], para_text="para",
highlight_terms=[], findings=[],
)
def _sample_finding(status: str = "risk") -> FindingRecord:
return FindingRecord(
id="f1", analysis_id="a1", seq=0,
title="Missing CSMS", description="No CSMS certification.",
status=status, clause_ref="Art.9.1",
)
def test_build_finding_context_contains_required_fields():
from app.application.compliance.pipeline import build_finding_context
ctx = build_finding_context(_sample_finding(), _sample_analysis())
assert "doc.pdf" in ctx
assert "EU AI Act" in ctx
assert "Missing CSMS" in ctx
assert "Art.9.1" in ctx
def test_generate_suggestions_returns_three_questions():
from app.application.compliance.pipeline import generate_suggestions
client = _make_mock_client(
'{"questions": ["Q1?", "Q2?", "Q3?"]}'
)
questions = generate_suggestions(_sample_finding("risk"), _sample_analysis(), client)
assert len(questions) == 3
assert all(isinstance(q, str) for q in questions)
def test_generate_suggestions_falls_back_on_error():
from app.application.compliance.pipeline import generate_suggestions
bad_client = MagicMock()
bad_resp = MagicMock()
bad_resp.is_success = False
bad_client.chat.return_value = bad_resp
questions = generate_suggestions(_sample_finding(), _sample_analysis(), bad_client)
assert len(questions) == 3 # fallback always returns 3
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_build_finding_context_contains_required_fields -v
```
Expected: `ImportError: cannot import name 'build_finding_context'`
- [ ] **Step 3: Implement both functions in `pipeline.py`**
Add at the end of `pipeline.py`:
```python
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
_SUGGESTION_FOCUS = {
"risk": "Focus on remediation steps, required certifications, and timeline to resolve.",
"warn": "Focus on identifying the specific compliance gap and how to close it.",
"ok": "Focus on maintaining compliance evidence and monitoring future changes.",
}
_SUGGESTION_FALLBACK = {
"risk": [
"What specific certifications or documents are required to remediate this finding?",
"What is the typical remediation timeline for this type of non-compliance?",
"Which regulation clause defines the exact requirement?",
],
"warn": [
"What is the exact gap between the current state and the requirement?",
"What evidence would demonstrate partial compliance?",
"Which regulation clause applies to this warning?",
],
"ok": [
"What documentation should be maintained to evidence this compliance?",
"How should this area be monitored as regulations evolve?",
"Are there related clauses that may affect this compliant area?",
],
}
def build_finding_context(finding: "FindingRecord", analysis: "AnalysisRecord") -> str:
"""Build a grounded system context string for a finding chat thread.
Combines finding details with analysis metadata so the LLM has full
context without relying on the frontend to pass segment_context.
"""
return (
f"Document: {analysis.doc_name}\n"
f"Standard: {analysis.standard_name}\n"
f"Finding [{finding.seq + 1}]: {finding.title}\n"
f"Status: {finding.status}\n"
f"Clause reference: {finding.clause_ref or 'N/A'}\n"
f"Description: {finding.description}\n"
f"Overall conclusion: {analysis.conclusion}\n"
)
def generate_suggestions(
finding: "FindingRecord",
analysis: "AnalysisRecord",
client: "BaseLLMClient",
) -> list[str]:
"""Generate 3 context-aware follow-up questions for a finding chat thread.
Returns exactly 3 question strings. Falls back to static templates on error.
"""
fallback = _SUGGESTION_FALLBACK.get(finding.status, _SUGGESTION_FALLBACK["warn"])
context = build_finding_context(finding, analysis)
focus = _SUGGESTION_FOCUS.get(finding.status, _SUGGESTION_FOCUS["warn"])
prompt = (
f"{context}\n\n"
f"Task: {focus}\n"
"Generate exactly 3 concise follow-up questions a compliance analyst would ask.\n"
'Return JSON: {"questions": ["question 1", "question 2", "question 3"]}\n'
"Return ONLY the JSON object."
)
response = client.chat([{"role": "user", "content": prompt}], max_tokens=300)
if not response.is_success:
return fallback
try:
result = _extract_json(response.content)
questions = result.get("questions", [])
if isinstance(questions, list) and len(questions) >= 3:
return [str(q) for q in questions[:3]]
except (ValueError, TypeError) as exc:
logger.warning("generate_suggestions JSON parse failed: {}", exc)
return fallback
```
- [ ] **Step 4: Run all pipeline tests**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py -v
```
Expected: all pass.
---
### Task 9: Finding chat API endpoints
**Files:**
- Modify: `backend/app/api/routes/compliance.py`
- [ ] **Step 1: Add three finding-chat endpoints**
Add these routes after the history download endpoint:
```python
@router.get("/analyses/{analysis_id}/findings/{finding_id}/chat")
async def get_finding_chat_history(
analysis_id: str,
finding_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Return persisted chat messages for a finding thread, oldest first."""
from app.shared.bootstrap import get_compliance_repository
try:
repo = get_compliance_repository()
messages = await asyncio.to_thread(repo.get_messages, finding_id)
return messages
except NotImplementedError:
return []
@router.post("/analyses/{analysis_id}/findings/{finding_id}/suggestions")
async def get_finding_suggestions(
analysis_id: str,
finding_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Generate 3 LLM-powered follow-up question suggestions for a finding."""
from app.application.compliance.pipeline import generate_suggestions, build_finding_context
from app.shared.bootstrap import get_compliance_repository
from app.services.llm.llm_factory import get_llm_client
from fastapi import HTTPException
repo = get_compliance_repository()
analysis = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not analysis:
raise HTTPException(status_code=404, detail="Analysis not found")
finding = next((f for f in analysis.findings if f.id == finding_id), None)
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
client = get_llm_client(provider=settings.llm_provider, model=settings.llm_model)
questions = await asyncio.to_thread(generate_suggestions, finding, analysis, client)
return {"questions": questions}
@router.post("/analyses/{analysis_id}/findings/{finding_id}/chat")
async def finding_chat(
analysis_id: str,
finding_id: str,
request: ComplianceChatRequest,
current_user: UserClaims = Depends(get_current_user),
):
"""Stream a grounded chat response for a specific finding.
Loads the finding and analysis from DB to build grounded context.
Persists both user message and assistant response to finding_chat_messages.
"""
from app.application.compliance.pipeline import build_finding_context
from app.shared.bootstrap import get_compliance_repository
from app.services.llm.llm_factory import get_llm_client
from fastapi import HTTPException
repo = get_compliance_repository()
analysis = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not analysis:
raise HTTPException(status_code=404, detail="Analysis not found")
finding = next((f for f in analysis.findings if f.id == finding_id), None)
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
# Persist user message
await asyncio.to_thread(
repo.save_message, analysis_id, finding_id, "user", request.query
)
# Build message history (last 10 messages = 5 turns)
history = await asyncio.to_thread(repo.get_messages, finding_id)
history_messages = [
{"role": m["role"], "content": m["content"]}
for m in history[-10:]
]
# Build grounded system context
system_context = build_finding_context(finding, analysis)
full_query = f"[Compliance Finding Context]\n{system_context}\n\nUser question: {request.query}"
# Prepend history except the last user message (already included in full_query)
messages_for_llm = history_messages[:-1] + [{"role": "user", "content": full_query}]
assistant_buffer = []
async def generate() -> AsyncGenerator[str, None]:
nonlocal assistant_buffer
try:
_, event_stream = get_agent_conversation_service().stream_chat(
query=full_query,
top_k=5,
prompt_template="compliance_qa",
)
for event in event_stream:
event_type = event.get("event", "")
if event_type == "content":
text = event.get("data", "")
if text:
assistant_buffer.append(text)
yield _sse({"type": "chunk", "text": text})
elif event_type == "done":
yield _sse({"type": "done"})
await asyncio.sleep(0)
except Exception as exc:
logger.exception("finding_chat stream error")
yield _sse({"type": "error", "text": str(exc)})
finally:
# Persist assistant response after stream completes
full_response = "".join(assistant_buffer)
if full_response:
try:
await asyncio.to_thread(
repo.save_message, analysis_id, finding_id, "assistant", full_response
)
except Exception as exc:
logger.warning("Failed to persist assistant message: {}", exc)
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
)
```
- [ ] **Step 2: Smoke test**
```bash
# Get suggestions for a finding (replace IDs with real values from a previous analysis)
curl -X POST \
"http://localhost:8000/api/v1/compliance/analyses/<analysis_id>/findings/<finding_id>/suggestions" \
-H "Authorization: Bearer <token>"
# Expected: {"questions": ["...", "...", "..."]}
# Get chat history (empty initially)
curl "http://localhost:8000/api/v1/compliance/analyses/<analysis_id>/findings/<finding_id>/chat" \
-H "Authorization: Bearer <token>"
# Expected: []
```
---
### Task 10: FindingChatDrawer component
**Files:**
- Create: `frontend/src/pages/Compliance/FindingChatDrawer.tsx`
- Modify: `frontend/src/pages/Compliance/CompliancePage.tsx`
- Modify: `frontend/src/contexts/PageStateContext.tsx`
- [ ] **Step 1: Add `activeFindingId` to `PageStateContext.tsx`**
In the `ComplianceState` interface (already updated in Task 7), add:
```typescript
activeFindingId: string | null; // finding UUID from DB (not index)
```
In `COMPLIANCE_INIT`, add:
```typescript
activeFindingId: null,
```
- [ ] **Step 2: Create `FindingChatDrawer.tsx`**
```tsx
// frontend/src/pages/Compliance/FindingChatDrawer.tsx
import { useEffect, useRef, useState } from 'react';
import { X, Send } from 'lucide-react';
import type { ComplianceFindingEvent } from '../../contexts';
const TOKEN_KEY = 'auth_token';
function authHeader(): Record<string, string> {
const t = localStorage.getItem(TOKEN_KEY);
return t ? { Authorization: `Bearer ${t}` } : {};
}
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
}
interface Props {
analysisId: string;
findingId: string;
finding: { title: string; desc: string; status: string; clause_ref?: string };
onClose: () => void;
}
export function FindingChatDrawer({ analysisId, findingId, finding, onClose }: Props) {
const [messages, setMessages] = useState<Message[]>([]);
const [suggestions, setSuggestions] = useState<string[]>([]);
const [input, setInput] = useState('');
const [loading, setLoading] = useState(false);
const [loadingHistory, setLoadingHistory] = useState(true);
const abortRef = useRef<AbortController | null>(null);
const bottomRef = useRef<HTMLDivElement>(null);
// Load history + suggestions on open
useEffect(() => {
setLoadingHistory(true);
// Fetch history
fetch(`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/chat`, {
headers: authHeader(),
})
.then(r => r.json())
.then((data: Message[]) => {
setMessages(data.map(m => ({ id: m.id, role: m.role, content: m.content })));
setLoadingHistory(false);
// Only fetch suggestions if no history
if (!data.length) {
fetch(
`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/suggestions`,
{ method: 'POST', headers: authHeader() }
)
.then(r => r.json())
.then(d => { if (Array.isArray(d?.questions)) setSuggestions(d.questions); })
.catch(() => {});
}
})
.catch(() => setLoadingHistory(false));
return () => { abortRef.current?.abort(); };
}, [analysisId, findingId]);
// Auto-scroll to bottom
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
async function send(text?: string) {
const q = (text ?? input).trim();
if (!q || loading) return;
setInput('');
setSuggestions([]); // hide chips after first message
const assistantId = `ast-${Date.now()}`;
setMessages(prev => [
...prev,
{ id: `usr-${Date.now()}`, role: 'user', content: q },
{ id: assistantId, role: 'assistant', content: '' },
]);
setLoading(true);
const ctrl = new AbortController();
abortRef.current = ctrl;
try {
const res = await fetch(
`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/chat`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json', ...authHeader() },
body: JSON.stringify({ query: q }),
signal: ctrl.signal,
}
);
if (!res.body) { setLoading(false); return; }
const reader = res.body.getReader();
const dec = new TextDecoder();
let buf = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += dec.decode(value, { stream: true });
const blocks = buf.split('\n\n');
buf = blocks.pop() ?? '';
for (const block of blocks) {
const dl = block.split('\n').find(l => l.startsWith('data: '));
if (!dl) continue;
try {
const j = JSON.parse(dl.slice(6));
if (j.type === 'chunk' && j.text) {
setMessages(prev =>
prev.map(m => m.id === assistantId ? { ...m, content: m.content + j.text } : m)
);
}
} catch { /* skip */ }
}
}
} catch (e: unknown) {
if (e instanceof Error && e.name !== 'AbortError') {
setMessages(prev =>
prev.map(m => m.id === assistantId ? { ...m, content: 'Error reaching server.' } : m)
);
}
} finally {
setLoading(false);
}
}
const STATUS_COLOR: Record<string, string> = {
risk: 'var(--danger, #dc143c)',
warn: 'var(--warning, #ff8c00)',
ok: 'var(--success, #228b22)',
};
return (
<div
style={{
position: 'fixed', right: 0, top: 0, bottom: 0, width: 420,
background: 'var(--surface)', borderLeft: '1px solid var(--border)',
display: 'flex', flexDirection: 'column', zIndex: 200,
boxShadow: '-4px 0 16px rgba(0,0,0,0.12)',
}}
>
{/* Header */}
<div style={{
padding: '14px 16px', borderBottom: '1px solid var(--border)',
display: 'flex', alignItems: 'flex-start', gap: 10,
}}>
<div style={{ flex: 1, minWidth: 0 }}>
<div style={{ fontSize: 11, color: STATUS_COLOR[finding.status] || 'var(--muted)', fontWeight: 600, marginBottom: 2 }}>
{finding.status.toUpperCase()}
{finding.clause_ref && <span style={{ fontWeight: 400, marginLeft: 6, color: 'var(--muted)' }}>{finding.clause_ref}</span>}
</div>
<div style={{ fontSize: 13, fontWeight: 600, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
{finding.title}
</div>
<div style={{ fontSize: 11, color: 'var(--muted)', marginTop: 2, lineHeight: 1.4 }}>
{finding.desc.length > 100 ? finding.desc.slice(0, 100) + '…' : finding.desc}
</div>
</div>
<button className="btn icon-btn" onClick={onClose} style={{ flexShrink: 0 }}>
<X size={14} />
</button>
</div>
{/* Suggestion chips — shown only before first user message */}
{suggestions.length > 0 && (
<div style={{ padding: '10px 16px', borderBottom: '1px solid var(--border)' }}>
<div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 6 }}>Suggested questions</div>
<div style={{ display: 'flex', flexDirection: 'column', gap: 6 }}>
{suggestions.map((q, i) => (
<button
key={i}
className="chip"
style={{ textAlign: 'left', whiteSpace: 'normal', height: 'auto', padding: '6px 10px' }}
onClick={() => send(q)}
>
{q}
</button>
))}
</div>
</div>
)}
{/* Messages */}
<div style={{ flex: 1, overflowY: 'auto', padding: '12px 16px', display: 'flex', flexDirection: 'column', gap: 10 }}>
{loadingHistory && (
<p style={{ fontSize: 12, color: 'var(--muted)', textAlign: 'center' }}>Loading history</p>
)}
{messages.map(msg => (
<div key={msg.id} className={`message msg-${msg.role}`} style={{ maxWidth: '100%' }}>
{msg.role === 'assistant' && <div className="msg-avatar">AI</div>}
<div className="msg-bubble" style={{ fontSize: 13, whiteSpace: 'pre-wrap' }}>
{msg.content || (loading ? '…' : '')}
</div>
{msg.role === 'user' && <div className="msg-avatar user-av">You</div>}
</div>
))}
<div ref={bottomRef} />
</div>
{/* Composer */}
<div style={{ padding: '10px 16px', borderTop: '1px solid var(--border)', display: 'flex', gap: 8 }}>
<textarea
className="composer-input"
placeholder="Ask about this finding…"
value={input}
rows={2}
style={{ flex: 1, fontSize: 13 }}
onChange={e => setInput(e.target.value)}
onKeyDown={e => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); send(); } }}
/>
<button
className="btn primary"
disabled={!input.trim() || loading}
onClick={() => send()}
style={{ alignSelf: 'flex-end' }}
>
<Send size={14} />
</button>
</div>
</div>
);
}
```
- [ ] **Step 3: Wire `FindingChatDrawer` into `CompliancePage.tsx`**
Add import:
```tsx
import { FindingChatDrawer } from './FindingChatDrawer';
```
In the finding card JSX (inside the findings list, around the existing `chat.openFor(i, f)` button), replace the existing button with a "💬 Chat" button that uses the DB finding ID. Because historical findings now have UUIDs, update the button to use `state.findings` index and `state.analysisId`:
Find the existing findings list map in `CompliancePage.tsx` (around line 380420) and locate the `Chat` button that currently calls `chat.openFor(i, f)`. Replace the entire `useFindingChat()` inline chat panel logic with the new drawer:
```tsx
// Add at the top of CompliancePage component body:
const [drawerFindingIdx, setDrawerFindingIdx] = useState<number | null>(null);
// Inside the findings list map, find the existing chat button and replace with:
<button
className="btn sm"
onClick={() => setDrawerFindingIdx(i)}
style={{ marginTop: 6 }}
>
💬 Chat
</button>
```
At the bottom of the `CompliancePage` JSX return (before the closing `</div>`), add the drawer:
```tsx
{drawerFindingIdx !== null && state.analysisId && (() => {
// We need the finding's DB UUID — fetch it from the history endpoint
// For now, use the finding index as a fallback; Task 9 wires real UUIDs
// The analysisId is available from state; finding UUID needs the analysis record
// Fetch on demand via handleSelectHistory pattern:
return (
<_FindingChatDrawerWrapper
analysisId={state.analysisId}
findingIndex={drawerFindingIdx}
finding={state.findings[drawerFindingIdx]}
onClose={() => setDrawerFindingIdx(null)}
/>
);
})()}
```
Because finding UUIDs come from the database record, add a helper wrapper that fetches them:
```tsx
function _FindingChatDrawerWrapper({
analysisId, findingIndex, finding, onClose,
}: {
analysisId: string;
findingIndex: number;
finding: ComplianceFindingEvent;
onClose: () => void;
}) {
const [findingId, setFindingId] = useState<string | null>(null);
useEffect(() => {
fetch(`/api/v1/compliance/history/${analysisId}`, { headers: authHeader() })
.then(r => r.json())
.then(data => {
const f = (data.findings || []).find((f: any) => f.seq === findingIndex);
if (f?.id) setFindingId(f.id);
})
.catch(() => {});
}, [analysisId, findingIndex]);
if (!findingId) return null;
return (
<FindingChatDrawer
analysisId={analysisId}
findingId={findingId}
finding={finding}
onClose={onClose}
/>
);
}
```
- [ ] **Step 4: TypeScript check**
```
cd frontend
npx tsc --noEmit
```
Expected: 0 errors.
- [ ] **Step 5: End-to-end smoke test**
1. Start backend + frontend
2. Run a compliance analysis with a text that produces at least 2 findings
3. Verify the `saved` SSE event appears and History Rail shows the new record
4. Click the "💬 Chat" button on any finding
5. Verify the Drawer opens with 3 suggestion chips
6. Click one suggestion — verify it streams a response
7. Close and reopen the Drawer — verify the conversation history is restored
8. Click `[↓]` in History Rail — verify a DOCX file downloads
9. Click `[×]` in History Rail → confirm dialog → verify record disappears
---
## Verification Checklist
- [ ] Direction A: SSE stream emits one `stage:analyzing` event (not per-clause), all findings still appear, `highlight_terms` in `done` event is non-empty
- [ ] Direction A: `PassThroughReranker` importable from `app.infrastructure.vectorstore.pass_through_reranker`
- [ ] Direction B: Running analysis emits `{"type":"saved","analysis_id":"<uuid>"}` SSE event
- [ ] Direction B: `GET /api/v1/compliance/history` returns list with the saved record
- [ ] Direction B: `GET /api/v1/compliance/history/<id>/download` returns a valid DOCX file
- [ ] Direction B: `DELETE /api/v1/compliance/history/<id>` removes the record
- [ ] Direction B: History Rail renders in the UI and updates after analysis completes
- [ ] Direction C: Suggestions endpoint returns 3 questions for a valid finding
- [ ] Direction C: Chat endpoint streams a grounded response referencing finding context
- [ ] Direction C: Reopening the Drawer shows previous conversation
- [ ] All backend tests pass: `python -m pytest tests/compliance/ -v`
- [ ] TypeScript check passes: `npx tsc --noEmit` (0 errors)