Files
AIRegulation-DocAnalysis/docs/superpowers/plans/2026-06-08-compliance-enhancement.md

2293 lines
78 KiB
Markdown
Raw Permalink Normal View History

2026-06-10 11:10:36 +08:00
# Compliance Analysis Enhancement Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Enhance the Compliance Analysis module with parallel clause processing + bug fixes (A), persistent analysis history + DOCX export (B), and per-finding persistent chat threads with LLM-generated suggestions (C).
**Architecture:** Direction A refactors the SSE pipeline in `app/api/routes/compliance.py` to run clauses in parallel via `asyncio.gather` and fixes `highlight_terms` and LLM retry. Direction B adds a PostgreSQL-backed `ComplianceRepository` (new domain port + infrastructure) and three REST endpoints plus a frontend History Rail. Direction C adds per-finding chat endpoints grounded in real analysis data and a React Drawer component. Implementation order: A → B → C (each direction is a prerequisite for the next).
**Tech Stack:** FastAPI (SSE), psycopg2 (ThreadedConnectionPool), python-docx, tenacity (retry), React 18, TypeScript, existing `PageStateContext` pattern.
---
## File Map
### Direction A — Analysis Quality
| File | Action |
|------|--------|
| `backend/app/application/compliance/pipeline.py` | Add `process_single_clause()`, `_call_llm_with_retry()` wrapper, add `tenacity` retry to `synthesize_conclusion` and `check_clause_compliance` |
| `backend/app/api/routes/compliance.py` | Replace sequential for-loop with `asyncio.gather` on `process_single_clause` |
| `backend/app/infrastructure/vectorstore/pass_through_reranker.py` | New — `PassThroughReranker` stub |
| `backend/tests/compliance/test_pipeline.py` | New — unit tests for parallel processing + highlight_terms fix |
### Direction B — History & Reports
| File | Action |
|------|--------|
| `backend/app/domain/compliance/__init__.py` | New — empty |
| `backend/app/domain/compliance/ports.py` | New — `FindingRecord`, `AnalysisRecord`, `ComplianceRepository` ABC |
| `backend/app/infrastructure/compliance/__init__.py` | New — empty |
| `backend/app/infrastructure/compliance/repository.py` | New — `PostgresComplianceRepository` |
| `backend/app/infrastructure/compliance/docx_export.py` | New — `generate_docx()` |
| `backend/app/api/routes/compliance.py` | Add history endpoints + auto-save hook in `analyze_stream` |
| `backend/app/shared/bootstrap.py` | Add `get_compliance_repository()` factory |
| `frontend/src/pages/Compliance/HistoryRail.tsx` | New — History Rail component |
| `frontend/src/pages/Compliance/CompliancePage.tsx` | Import + render `HistoryRail`, handle `saved` SSE event, store `analysisId` |
| `frontend/src/contexts/PageStateContext.tsx` | Add `analysisId`, `isReadOnly` to `ComplianceState` |
| `backend/tests/compliance/test_repository.py` | New — unit tests for repository (with mock psycopg2) |
### Direction C — Deep Chat
| File | Action |
|------|--------|
| `backend/app/application/compliance/pipeline.py` | Add `build_finding_context()`, `generate_suggestions()` |
| `backend/app/api/routes/compliance.py` | Add 3 finding-chat endpoints, deprecate old chat endpoint |
| `frontend/src/pages/Compliance/FindingChatDrawer.tsx` | New — per-finding chat drawer |
| `frontend/src/pages/Compliance/CompliancePage.tsx` | Add Chat button to finding cards, render `FindingChatDrawer` |
| `frontend/src/contexts/PageStateContext.tsx` | Add `activeFindingId` to `ComplianceState` |
| `backend/tests/compliance/test_pipeline.py` | Extend — test `build_finding_context` + `generate_suggestions` |
---
## Direction A Tasks
### Task 1: PassThroughReranker stub
**Files:**
- Create: `backend/app/infrastructure/vectorstore/pass_through_reranker.py`
- Test: `backend/tests/compliance/test_pipeline.py`
- [ ] **Step 1: Create test file and write failing test**
```python
# backend/tests/compliance/test_pipeline.py
import pytest
from app.infrastructure.vectorstore.pass_through_reranker import PassThroughReranker
from app.domain.retrieval.models import RetrievedChunk
def _make_chunk(score: float) -> RetrievedChunk:
return RetrievedChunk(
doc_id="d1",
doc_title="Test Doc",
section_title="S1",
text="some text",
score=score,
page_start=1,
)
def test_pass_through_returns_top_k():
reranker = PassThroughReranker()
chunks = [_make_chunk(0.9), _make_chunk(0.8), _make_chunk(0.7)]
result = reranker.rerank(query="test", chunks=chunks, top_k=2)
assert len(result) == 2
assert result[0].score == 0.9
def test_pass_through_returns_all_when_top_k_exceeds():
reranker = PassThroughReranker()
chunks = [_make_chunk(0.5)]
result = reranker.rerank(query="test", chunks=chunks, top_k=10)
assert len(result) == 1
```
- [ ] **Step 2: Run test to verify it fails**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_pass_through_returns_top_k -v
```
Expected: `ModuleNotFoundError` or `ImportError``pass_through_reranker` does not exist yet.
- [ ] **Step 3: Create `tests/compliance/__init__.py`**
```python
# backend/tests/compliance/__init__.py
```
- [ ] **Step 4: Create the reranker**
```python
# backend/app/infrastructure/vectorstore/pass_through_reranker.py
"""No-op reranker stub.
Returns the original candidate list sliced to top_k.
Replace with CrossEncoderReranker when a local cross-encoder model is available.
"""
from __future__ import annotations
from app.domain.retrieval.models import RetrievedChunk
from app.domain.retrieval.ports import Reranker
class PassThroughReranker(Reranker):
"""Pass-through reranker that preserves original retrieval order.
Acts as a placeholder for future cross-encoder reranking (e.g. ms-marco-MiniLM).
Wire via bootstrap.get_compliance_reranker() when ready to swap.
"""
def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
"""Return the first top_k chunks without reordering."""
return chunks[:top_k]
```
- [ ] **Step 5: Run tests to verify they pass**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_pass_through_returns_top_k tests/compliance/test_pipeline.py::test_pass_through_returns_all_when_top_k_exceeds -v
```
Expected: 2 passed.
---
### Task 2: Parallel clause processing + LLM retry
**Files:**
- Modify: `backend/app/application/compliance/pipeline.py`
- Modify: `backend/app/api/routes/compliance.py`
- Test: `backend/tests/compliance/test_pipeline.py`
- [ ] **Step 1: Write failing tests**
Add these tests to `backend/tests/compliance/test_pipeline.py`:
```python
import asyncio
from unittest.mock import MagicMock, patch
def _make_mock_client(content: str = '{"status":"ok","title":"T","desc":"D","clause_ref":"A1"}'):
client = MagicMock()
response = MagicMock()
response.is_success = True
response.content = content
client.chat.return_value = response
return client
def _make_mock_retrieval():
svc = MagicMock()
svc.retrieve.return_value = []
return svc
def test_process_single_clause_returns_finding():
from app.application.compliance.pipeline import process_single_clause
client = _make_mock_client()
svc = _make_mock_retrieval()
result = process_single_clause("test clause", 0, svc, client, "EU AI Act", "para")
assert result["finding"] is not None
assert result["index"] == 0
assert result["chunks"] == []
def test_run_clauses_parallel_runs_all():
from app.application.compliance.pipeline import run_clauses_parallel
client = _make_mock_client()
svc = _make_mock_retrieval()
clauses = ["clause one", "clause two", "clause three"]
results = asyncio.run(run_clauses_parallel(clauses, svc, client, "EU AI Act", "para"))
assert len(results) == 3
assert all(r["index"] == i for i, r in enumerate(results))
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_process_single_clause_returns_finding -v
```
Expected: `ImportError: cannot import name 'process_single_clause'`
- [ ] **Step 3: Add `process_single_clause` and `run_clauses_parallel` to `pipeline.py`**
Add after line 109 (after `retrieve_for_clause`), before `check_clause_compliance`:
```python
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import httpx
def process_single_clause(
clause: str,
index: int,
retrieval_service: "KnowledgeRetrievalService",
client: "BaseLLMClient",
standard_name: str,
para_text: str,
top_k: int = 5,
domains: str | None = None,
) -> dict:
"""Process one clause: retrieve relevant regulations then check compliance.
Returns a dict with keys: index, chunks, finding (may be None on LLM failure).
Designed to run inside asyncio.to_thread() for parallel execution.
"""
chunks = retrieve_for_clause(clause, retrieval_service, top_k, domains)
finding = check_clause_compliance(clause, chunks, client)
return {"index": index, "chunks": chunks, "finding": finding}
async def run_clauses_parallel(
clauses: list[str],
retrieval_service: "KnowledgeRetrievalService",
client: "BaseLLMClient",
standard_name: str,
para_text: str,
top_k: int = 5,
domains: str | None = None,
) -> list[dict]:
"""Run all clauses through retrieve+gap-check in parallel.
Results are returned in the original clause order even though processing
is concurrent. Exceptions in individual clauses are caught and returned as
dicts with finding=None so the stream continues for remaining clauses.
"""
tasks = [
asyncio.to_thread(
process_single_clause,
clause, i, retrieval_service, client, standard_name, para_text, top_k, domains,
)
for i, clause in enumerate(clauses)
]
raw = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, r in enumerate(raw):
if isinstance(r, Exception):
logger.warning("Clause {} processing failed: {}", i, r)
results.append({"index": i, "chunks": [], "finding": None})
else:
results.append(r)
return results
```
Also add the imports at the top of `pipeline.py` (after existing imports):
```python
import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
```
- [ ] **Step 4: Wrap LLM calls in `check_clause_compliance` with retry**
Replace the direct `client.chat(...)` call in `check_clause_compliance` (line 137) with:
```python
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, ValueError)),
reraise=True,
)
def _llm_check(prompt_messages: list[dict]) -> "BaseLLMClient":
resp = client.chat(prompt_messages, max_tokens=500)
if not resp.is_success:
raise ValueError(f"LLM returned non-success status for gap check")
return resp
```
Then in `check_clause_compliance`, replace:
```python
response = client.chat([{"role": "user", "content": prompt}], max_tokens=500)
if not response.is_success:
return None
```
with:
```python
try:
response = _llm_check([{"role": "user", "content": prompt}])
except Exception as exc:
logger.warning("check_clause_compliance LLM call failed after retries: {}", exc)
return None
```
Apply the same pattern to `synthesize_conclusion` (line 189): wrap `client.chat(...)` in a nested retry function and catch on final failure to return `fallback`.
**Also fix the `highlight_terms` prompt bug** — the current prompt uses `["Key terms to highlight, max 10 terms"]` as an example string, so the LLM returns the literal example instead of real terms. In `synthesize_conclusion`, replace that prompt line with:
```python
' "highlight_terms": ["term1", "term2"], // up to 10 key technical/legal terms actually present in the text\n'
```
- [ ] **Step 5: Replace sequential loop in `compliance.py` with parallel version**
In `backend/app/api/routes/compliance.py`, replace the Stage 3 block (lines 138168):
```python
# ── Stage 3: retrieve + gap check per clause ──────────────────
findings: list[dict] = []
for i, clause in enumerate(clauses):
yield _sse({
"type": "stage",
"stage": "analyzing",
"label": f"Analyzing clause {i + 1}/{len(clauses)}…",
})
await asyncio.sleep(0)
chunks = await asyncio.to_thread(
retrieve_for_clause, clause, retrieval_service, 5, domains or None
)
# Emit source events
for chunk in chunks[:3]:
yield _sse({...})
await asyncio.sleep(0)
finding = await asyncio.to_thread(check_clause_compliance, clause, chunks, client)
if finding:
findings.append(finding)
yield _sse({"type": "finding", **finding})
await asyncio.sleep(0)
```
With:
```python
# ── Stage 3: retrieve + gap check (parallel across all clauses) ────────────
from app.application.compliance.pipeline import run_clauses_parallel
findings: list[dict] = []
yield _sse({
"type": "stage",
"stage": "analyzing",
"label": f"Analyzing {len(clauses)} clauses in parallel…",
})
await asyncio.sleep(0)
clause_results = await run_clauses_parallel(
clauses, retrieval_service, client,
standard_name=title or "",
para_text=para_text,
top_k=5,
domains=domains or None,
)
for res in clause_results:
i = res["index"]
chunks = res["chunks"]
finding = res["finding"]
# Emit source events for this clause
for chunk in chunks[:3]:
yield _sse({
"type": "source",
"standard": getattr(chunk, "doc_title", "") or getattr(chunk, "doc_name", ""),
"clause": getattr(chunk, "section_title", "") or "",
"score": round(float(getattr(chunk, "score", 0)), 3),
"status": "retrieved",
"full_content": (getattr(chunk, "text", "") or "")[:300],
})
if finding:
findings.append(finding)
yield _sse({"type": "finding", **finding})
await asyncio.sleep(0)
```
Also update the imports at the top of the `generate()` inner function — add `run_clauses_parallel` to the `from app.application.compliance.pipeline import (...)` block and remove individual imports of `retrieve_for_clause`, `check_clause_compliance`.
- [ ] **Step 6: Run all Direction A tests**
```
cd backend
python -m pytest tests/compliance/ -v
```
Expected: all pass.
- [ ] **Step 7: Smoke test the SSE endpoint manually**
```bash
curl -X POST http://localhost:8000/api/v1/compliance/analyze-stream \
-H "Authorization: Bearer <token>" \
-F "text=The system shall implement CSMS as per ISO 21434 requirements. All training data must be documented and version controlled." \
--no-buffer
```
Expected: SSE stream with one `stage:analyzing` event (not per-clause), then multiple `finding` events, then `done` with non-empty `highlight_terms` array.
---
## Direction B Tasks
### Task 3: Domain port + data classes
**Files:**
- Create: `backend/app/domain/compliance/__init__.py`
- Create: `backend/app/domain/compliance/ports.py`
- Test: `backend/tests/compliance/test_repository.py`
- [ ] **Step 1: Write failing test**
```python
# backend/tests/compliance/test_repository.py
from datetime import datetime
from app.domain.compliance.ports import (
AnalysisRecord,
FindingRecord,
ComplianceRepository,
)
def test_analysis_record_construction():
record = AnalysisRecord(
id="",
created_at=datetime.utcnow(),
created_by="user1",
doc_name="test.pdf",
standard_name="EU AI Act",
risk_score=72,
conclusion="Some gaps found.",
actions=[{"label": "Fix", "value": "Update docs"}],
para_text="The system shall...",
highlight_terms=["CSMS", "ISO 21434"],
findings=[
FindingRecord(
id="",
analysis_id="",
seq=0,
title="Missing CSMS",
description="No CSMS certification found.",
status="risk",
clause_ref="Art.9.1",
)
],
)
assert record.doc_name == "test.pdf"
assert len(record.findings) == 1
assert record.findings[0].status == "risk"
def test_compliance_repository_is_abstract():
import inspect
assert inspect.isabstract(ComplianceRepository)
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_repository.py -v
```
Expected: `ModuleNotFoundError: No module named 'app.domain.compliance'`
- [ ] **Step 3: Create domain module**
```python
# backend/app/domain/compliance/__init__.py
```
- [ ] **Step 4: Create `ports.py`**
```python
# backend/app/domain/compliance/ports.py
"""Domain ports for compliance history persistence."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class FindingRecord:
"""Single finding row linked to an analysis."""
id: str
analysis_id: str
seq: int
title: str
description: str
status: str # "ok" | "warn" | "risk"
clause_ref: Optional[str] = None
@dataclass
class AnalysisRecord:
"""Full compliance analysis record with nested findings."""
id: str # UUID string; empty string means not yet persisted
created_at: datetime
created_by: Optional[str]
doc_name: str
standard_name: str
risk_score: int
conclusion: str
actions: list # list[dict] — serialised action items
para_text: str
highlight_terms: list # list[str]
findings: list[FindingRecord] = field(default_factory=list)
class ComplianceRepository(ABC):
"""Port for persisting and retrieving compliance analysis records."""
@abstractmethod
def save_analysis(self, record: AnalysisRecord) -> str:
"""Persist a new analysis record and return the assigned UUID string."""
@abstractmethod
def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]:
"""Return analyses ordered by created_at DESC, without nested findings."""
@abstractmethod
def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]:
"""Return a single analysis with all nested findings, or None."""
@abstractmethod
def delete_analysis(self, analysis_id: str) -> None:
"""Delete an analysis and all related findings and chat messages (cascade)."""
@abstractmethod
def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str:
"""Persist a chat message and return its UUID string."""
@abstractmethod
def get_messages(self, finding_id: str) -> list[dict]:
"""Return chat messages for a finding ordered by created_at ASC.
Each dict has keys: id, role, content, created_at (ISO string).
"""
```
- [ ] **Step 5: Run tests to verify they pass**
```
cd backend
python -m pytest tests/compliance/test_repository.py -v
```
Expected: 2 passed.
---
### Task 4: PostgresComplianceRepository
**Files:**
- Create: `backend/app/infrastructure/compliance/__init__.py`
- Create: `backend/app/infrastructure/compliance/repository.py`
- Test: `backend/tests/compliance/test_repository.py`
- [ ] **Step 1: Write failing test**
Add to `backend/tests/compliance/test_repository.py`:
```python
from unittest.mock import MagicMock, patch, call
from datetime import datetime
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
def _mock_pool():
"""Return a mock psycopg2 ThreadedConnectionPool."""
conn = MagicMock()
cursor = MagicMock()
cursor.__enter__ = MagicMock(return_value=cursor)
cursor.__exit__ = MagicMock(return_value=False)
conn.cursor.return_value = cursor
pool = MagicMock()
pool.getconn.return_value = conn
return pool, conn, cursor
@patch("app.infrastructure.compliance.repository.psycopg2.pool.ThreadedConnectionPool")
@patch("app.infrastructure.compliance.repository.psycopg2.extras")
def test_save_analysis_returns_uuid(mock_extras, mock_pool_cls):
from app.infrastructure.compliance.repository import PostgresComplianceRepository
pool, conn, cursor = _mock_pool()
mock_pool_cls.return_value = pool
cursor.fetchone.return_value = {"id": "abc-123"}
repo = PostgresComplianceRepository(
host="localhost", port=5432, user="u", password="p", dbname="db"
)
record = AnalysisRecord(
id="", created_at=datetime.utcnow(), created_by="user1",
doc_name="doc.pdf", standard_name="EU AI Act",
risk_score=50, conclusion="OK", actions=[], para_text="p",
highlight_terms=[], findings=[],
)
result = repo.save_analysis(record)
assert result == "abc-123"
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_repository.py::test_save_analysis_returns_uuid -v
```
Expected: `ModuleNotFoundError: No module named 'app.infrastructure.compliance'`
- [ ] **Step 3: Create infrastructure module**
```python
# backend/app/infrastructure/compliance/__init__.py
```
- [ ] **Step 4: Implement `repository.py`**
```python
# backend/app/infrastructure/compliance/repository.py
"""PostgreSQL-backed compliance analysis repository.
Follows the same psycopg2 pattern as PostgresDocumentRepository:
ThreadedConnectionPool + RealDictCursor for reads, _ensure_schema on init.
"""
from __future__ import annotations
import json
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import psycopg2
import psycopg2.extras
import psycopg2.pool
from loguru import logger
from app.domain.compliance.ports import (
AnalysisRecord,
ComplianceRepository,
FindingRecord,
)
class PostgresComplianceRepository(ComplianceRepository):
"""Stores compliance analyses, findings, and finding chat messages in PostgreSQL."""
def __init__(
self,
host: str,
port: int,
user: str,
password: str,
dbname: str,
minconn: int = 1,
maxconn: int = 5,
) -> None:
self._pool = psycopg2.pool.ThreadedConnectionPool(
minconn=minconn,
maxconn=maxconn,
host=host,
port=port,
user=user,
password=password,
dbname=dbname,
)
self._ensure_schema()
@contextmanager
def _conn(self):
conn = self._pool.getconn()
try:
yield conn
finally:
self._pool.putconn(conn)
def _ensure_schema(self) -> None:
"""Create tables if they do not exist."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS compliance_analyses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_by VARCHAR(255),
doc_name VARCHAR(500),
standard_name VARCHAR(500),
risk_score INTEGER,
conclusion TEXT,
actions JSONB,
para_text TEXT,
highlight_terms JSONB
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS compliance_findings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
seq INTEGER NOT NULL,
title VARCHAR(500),
description TEXT,
status VARCHAR(50),
clause_ref VARCHAR(200)
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS finding_chat_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
finding_id UUID NOT NULL REFERENCES compliance_findings(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
""")
conn.commit()
def save_analysis(self, record: AnalysisRecord) -> str:
"""Insert analysis + findings; return the new analysis UUID."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
INSERT INTO compliance_analyses
(created_by, doc_name, standard_name, risk_score,
conclusion, actions, para_text, highlight_terms)
VALUES
(%(created_by)s, %(doc_name)s, %(standard_name)s, %(risk_score)s,
%(conclusion)s, %(actions)s, %(para_text)s, %(highlight_terms)s)
RETURNING id
""",
{
"created_by": record.created_by,
"doc_name": record.doc_name,
"standard_name": record.standard_name,
"risk_score": record.risk_score,
"conclusion": record.conclusion,
"actions": json.dumps(record.actions, ensure_ascii=False),
"para_text": record.para_text,
"highlight_terms": json.dumps(record.highlight_terms, ensure_ascii=False),
},
)
row = cur.fetchone()
analysis_id = str(row["id"])
# Insert findings
if record.findings:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
for f in record.findings:
cur.execute(
"""
INSERT INTO compliance_findings
(analysis_id, seq, title, description, status, clause_ref)
VALUES
(%(analysis_id)s, %(seq)s, %(title)s, %(desc)s, %(status)s, %(clause_ref)s)
""",
{
"analysis_id": analysis_id,
"seq": f.seq,
"title": f.title,
"desc": f.description,
"status": f.status,
"clause_ref": f.clause_ref,
},
)
conn.commit()
return analysis_id
def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]:
"""Return analyses without nested findings, ordered newest first."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, created_at, created_by, doc_name, standard_name,
risk_score, conclusion, actions, para_text, highlight_terms
FROM compliance_analyses
ORDER BY created_at DESC
LIMIT %(limit)s OFFSET %(offset)s
""",
{"limit": limit, "offset": offset},
)
rows = cur.fetchall()
return [self._row_to_record(dict(r)) for r in rows]
def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]:
"""Return analysis with nested findings list."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT * FROM compliance_analyses WHERE id = %(id)s",
{"id": analysis_id},
)
row = cur.fetchone()
if not row:
return None
record = self._row_to_record(dict(row))
cur.execute(
"""
SELECT id, analysis_id, seq, title, description, status, clause_ref
FROM compliance_findings
WHERE analysis_id = %(id)s
ORDER BY seq
""",
{"id": analysis_id},
)
findings = [
FindingRecord(
id=str(r["id"]),
analysis_id=str(r["analysis_id"]),
seq=r["seq"],
title=r["title"] or "",
description=r["description"] or "",
status=r["status"] or "ok",
clause_ref=r["clause_ref"],
)
for r in cur.fetchall()
]
record.findings = findings
return record
def delete_analysis(self, analysis_id: str) -> None:
"""Delete analysis; findings and chat messages cascade automatically."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM compliance_analyses WHERE id = %(id)s",
{"id": analysis_id},
)
conn.commit()
def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str:
"""Persist a chat message; return its UUID."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
INSERT INTO finding_chat_messages
(analysis_id, finding_id, role, content)
VALUES
(%(analysis_id)s, %(finding_id)s, %(role)s, %(content)s)
RETURNING id
""",
{
"analysis_id": analysis_id,
"finding_id": finding_id,
"role": role,
"content": content,
},
)
row = cur.fetchone()
conn.commit()
return str(row["id"])
def get_messages(self, finding_id: str) -> list[dict]:
"""Return messages for a finding, oldest first."""
with self._conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, role, content, created_at
FROM finding_chat_messages
WHERE finding_id = %(finding_id)s
ORDER BY created_at ASC
""",
{"finding_id": finding_id},
)
rows = cur.fetchall()
return [
{
"id": str(r["id"]),
"role": r["role"],
"content": r["content"],
"created_at": r["created_at"].isoformat() if r["created_at"] else "",
}
for r in rows
]
def _row_to_record(self, row: dict) -> AnalysisRecord:
"""Convert a RealDictCursor row to an AnalysisRecord (no findings)."""
actions = row.get("actions") or []
if isinstance(actions, str):
actions = json.loads(actions)
highlight_terms = row.get("highlight_terms") or []
if isinstance(highlight_terms, str):
highlight_terms = json.loads(highlight_terms)
return AnalysisRecord(
id=str(row["id"]),
created_at=row["created_at"] if isinstance(row["created_at"], datetime) else datetime.utcnow(),
created_by=row.get("created_by"),
doc_name=row.get("doc_name") or "",
standard_name=row.get("standard_name") or "",
risk_score=int(row.get("risk_score") or 0),
conclusion=row.get("conclusion") or "",
actions=actions,
para_text=row.get("para_text") or "",
highlight_terms=highlight_terms,
findings=[],
)
```
- [ ] **Step 5: Run tests**
```
cd backend
python -m pytest tests/compliance/test_repository.py -v
```
Expected: all pass.
---
### Task 5: DOCX export
**Files:**
- Create: `backend/app/infrastructure/compliance/docx_export.py`
- Test: `backend/tests/compliance/test_repository.py`
- [ ] **Step 1: Write failing test**
Add to `backend/tests/compliance/test_repository.py`:
```python
from datetime import datetime
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
def test_generate_docx_returns_bytes():
from app.infrastructure.compliance.docx_export import generate_docx
record = AnalysisRecord(
id="test-id", created_at=datetime(2026, 6, 8), created_by="user1",
doc_name="test.pdf", standard_name="EU AI Act",
risk_score=72, conclusion="Several gaps found.",
actions=[{"label": "Fix", "value": "Update CSMS docs"}],
para_text="The system shall implement CSMS.",
highlight_terms=["CSMS"],
findings=[
FindingRecord(
id="f1", analysis_id="test-id", seq=0,
title="Missing CSMS", description="No CSMS cert.",
status="risk", clause_ref="Art.9.1",
)
],
)
data = generate_docx(record)
assert isinstance(data, bytes)
assert len(data) > 1000 # DOCX is at minimum a ZIP with ~1 KB overhead
# Verify it's a valid ZIP (DOCX = ZIP container)
import zipfile, io
assert zipfile.is_zipfile(io.BytesIO(data))
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_repository.py::test_generate_docx_returns_bytes -v
```
Expected: `ModuleNotFoundError: No module named 'app.infrastructure.compliance.docx_export'`
- [ ] **Step 3: Implement `docx_export.py`**
```python
# backend/app/infrastructure/compliance/docx_export.py
"""DOCX report generator for compliance analysis results.
Uses python-docx (already in requirements.txt). Returns raw bytes so the
caller can stream the response without writing to disk.
"""
from __future__ import annotations
from datetime import datetime
from io import BytesIO
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from app.domain.compliance.ports import AnalysisRecord
_STATUS_LABEL = {"ok": "Compliant", "warn": "Warning", "risk": "Non-Compliant"}
_STATUS_COLOR = {
"ok": RGBColor(0x22, 0x8B, 0x22), # forest green
"warn": RGBColor(0xFF, 0x8C, 0x00), # dark orange
"risk": RGBColor(0xDC, 0x14, 0x3C), # crimson
}
def generate_docx(record: AnalysisRecord) -> bytes:
"""Generate a compliance report DOCX and return its raw bytes.
Structure:
- Cover: document name, standard, date, risk score
- Executive summary (conclusion)
- Findings table
- Recommended actions
- Footer note
"""
doc = Document()
# ── Cover ──────────────────────────────────────────────────────────────────
title_para = doc.add_heading("Compliance Analysis Report", level=0)
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.add_paragraph("")
meta_table = doc.add_table(rows=4, cols=2)
meta_table.style = "Table Grid"
labels = ["Document", "Standard", "Date", "Risk Score"]
values = [
record.doc_name,
record.standard_name,
record.created_at.strftime("%Y-%m-%d %H:%M UTC") if record.created_at else "",
f"{record.risk_score} / 100",
]
for i, (label, value) in enumerate(zip(labels, values)):
meta_table.cell(i, 0).text = label
meta_table.cell(i, 1).text = value
# ── Executive Summary ──────────────────────────────────────────────────────
doc.add_heading("Executive Summary", level=1)
doc.add_paragraph(record.conclusion)
# ── Findings ───────────────────────────────────────────────────────────────
doc.add_heading("Findings", level=1)
if record.findings:
table = doc.add_table(rows=1, cols=4)
table.style = "Table Grid"
hdr = table.rows[0].cells
for i, h in enumerate(["#", "Status", "Title", "Description / Clause"]):
hdr[i].text = h
for run in hdr[i].paragraphs[0].runs:
run.bold = True
for f in record.findings:
row = table.add_row().cells
row[0].text = str(f.seq + 1)
row[1].text = _STATUS_LABEL.get(f.status, f.status)
row[2].text = f.title
desc = f.description
if f.clause_ref:
desc += f"\n[{f.clause_ref}]"
row[3].text = desc
else:
doc.add_paragraph("No findings recorded.")
# ── Recommended Actions ────────────────────────────────────────────────────
doc.add_heading("Recommended Actions", level=1)
for i, action in enumerate(record.actions, start=1):
label = action.get("label", "Action")
value = action.get("value", "")
doc.add_paragraph(f"{i}. {label}: {value}", style="List Number")
# ── Footer note ────────────────────────────────────────────────────────────
doc.add_paragraph("")
footer = doc.add_paragraph(
f"Generated by AI Regulation Analysis System — {datetime.utcnow().strftime('%Y-%m-%d')}"
)
footer.alignment = WD_ALIGN_PARAGRAPH.CENTER
for run in footer.runs:
run.font.size = Pt(8)
run.font.color.rgb = RGBColor(0x88, 0x88, 0x88)
buf = BytesIO()
doc.save(buf)
return buf.getvalue()
```
- [ ] **Step 4: Run test**
```
cd backend
python -m pytest tests/compliance/test_repository.py::test_generate_docx_returns_bytes -v
```
Expected: PASS.
---
### Task 6: Bootstrap factory + history API endpoints
**Files:**
- Modify: `backend/app/shared/bootstrap.py`
- Modify: `backend/app/api/routes/compliance.py`
- [ ] **Step 1: Add `get_compliance_repository()` to `bootstrap.py`**
At the top of `bootstrap.py`, add imports (after existing infrastructure imports):
```python
from app.domain.compliance.ports import ComplianceRepository
from app.infrastructure.compliance.repository import PostgresComplianceRepository
```
After the `get_event_store()` factory (around line 310), add:
```python
@lru_cache
def get_compliance_repository() -> ComplianceRepository:
"""Return the compliance analysis repository.
Requires document_repository_backend=postgres and valid postgres_* settings.
Raises NotImplementedError for any other backend value.
"""
if settings.document_repository_backend != "postgres":
raise NotImplementedError(
f"ComplianceRepository requires document_repository_backend=postgres, "
f"got '{settings.document_repository_backend}'. "
"Set DOCUMENT_REPOSITORY_BACKEND=postgres in your .env file."
)
return PostgresComplianceRepository(
host=settings.postgres_host,
port=settings.postgres_port,
user=settings.postgres_user,
password=settings.postgres_password,
dbname=settings.postgres_db,
)
```
- [ ] **Step 2: Add auto-save hook in `analyze_stream`**
In `backend/app/api/routes/compliance.py`, after the `yield _sse({"type": "done", **conclusion_data})` line (line 177), add:
```python
# Auto-save analysis to database
try:
from app.shared.bootstrap import get_compliance_repository
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
from datetime import datetime
repo = get_compliance_repository()
finding_records = [
FindingRecord(
id="",
analysis_id="",
seq=i,
title=f.get("title", ""),
description=f.get("desc", ""),
status=f.get("status", "ok"),
clause_ref=f.get("clause_ref"),
)
for i, f in enumerate(findings)
]
record = AnalysisRecord(
id="",
created_at=datetime.utcnow(),
created_by=current_user.username if hasattr(current_user, "username") else None,
doc_name=file_name or (title or "Pasted text"),
standard_name=title or "",
risk_score=conclusion_data.get("risk_score", 0),
conclusion=conclusion_data.get("conclusion", ""),
actions=conclusion_data.get("actions", []),
para_text=conclusion_data.get("para_text", ""),
highlight_terms=conclusion_data.get("highlight_terms", []),
findings=finding_records,
)
analysis_id = await asyncio.to_thread(repo.save_analysis, record)
yield _sse({"type": "saved", "analysis_id": analysis_id})
except NotImplementedError:
pass # No postgres backend configured — skip saving
except Exception as exc:
logger.warning("Failed to auto-save compliance analysis: {}", exc)
```
- [ ] **Step 3: Add history endpoints to `compliance.py`**
Add these routes after the existing `compliance_chat` route:
```python
@router.get("/history")
async def list_history(
limit: int = 20,
offset: int = 0,
current_user: UserClaims = Depends(get_current_user),
):
"""Return paginated list of saved compliance analyses (newest first)."""
from app.shared.bootstrap import get_compliance_repository
try:
repo = get_compliance_repository()
records = await asyncio.to_thread(repo.list_analyses, limit, offset)
return [
{
"id": r.id,
"created_at": r.created_at.isoformat(),
"created_by": r.created_by,
"doc_name": r.doc_name,
"standard_name": r.standard_name,
"risk_score": r.risk_score,
"finding_count": len(r.findings),
}
for r in records
]
except NotImplementedError:
return []
@router.get("/history/{analysis_id}")
async def get_history_item(
analysis_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Return full analysis record including findings."""
from app.shared.bootstrap import get_compliance_repository
from fastapi import HTTPException
repo = get_compliance_repository()
record = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not record:
raise HTTPException(status_code=404, detail="Analysis not found")
return {
"id": record.id,
"created_at": record.created_at.isoformat(),
"created_by": record.created_by,
"doc_name": record.doc_name,
"standard_name": record.standard_name,
"risk_score": record.risk_score,
"conclusion": record.conclusion,
"actions": record.actions,
"para_text": record.para_text,
"highlight_terms": record.highlight_terms,
"findings": [
{
"id": f.id,
"seq": f.seq,
"title": f.title,
"description": f.description,
"status": f.status,
"clause_ref": f.clause_ref,
}
for f in record.findings
],
}
@router.delete("/history/{analysis_id}", status_code=204)
async def delete_history_item(
analysis_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Delete a saved analysis (cascade removes findings and chat messages)."""
from app.shared.bootstrap import get_compliance_repository
repo = get_compliance_repository()
await asyncio.to_thread(repo.delete_analysis, analysis_id)
@router.get("/history/{analysis_id}/download")
async def download_history_docx(
analysis_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Return a DOCX compliance report for the given analysis."""
from app.shared.bootstrap import get_compliance_repository
from app.infrastructure.compliance.docx_export import generate_docx
from fastapi import HTTPException
from fastapi.responses import Response
repo = get_compliance_repository()
record = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not record:
raise HTTPException(status_code=404, detail="Analysis not found")
docx_bytes = await asyncio.to_thread(generate_docx, record)
safe_name = (record.doc_name or "report").replace(" ", "_")[:50]
filename = f"compliance_{safe_name}_{record.created_at.strftime('%Y%m%d')}.docx"
return Response(
content=docx_bytes,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
```
- [ ] **Step 4: Smoke test endpoints**
Start the backend and run:
```bash
# List history (empty initially)
curl http://localhost:8000/api/v1/compliance/history \
-H "Authorization: Bearer <token>"
# Expected: []
# Trigger an analysis and verify the response includes a "saved" SSE event
curl -X POST http://localhost:8000/api/v1/compliance/analyze-stream \
-H "Authorization: Bearer <token>" \
-F "text=System shall implement CSMS per ISO 21434." \
--no-buffer | grep '"type":"saved"'
# Expected: data: {"type": "saved", "analysis_id": "<uuid>"}
```
---
### Task 7: Frontend — PageStateContext update + HistoryRail component
**Files:**
- Modify: `frontend/src/contexts/PageStateContext.tsx`
- Create: `frontend/src/pages/Compliance/HistoryRail.tsx`
- Modify: `frontend/src/pages/Compliance/CompliancePage.tsx`
- [ ] **Step 1: Update `ComplianceState` in `PageStateContext.tsx`**
Find the `ComplianceState` interface (around line 78) and add two fields:
```typescript
export interface ComplianceState {
status: ComplianceStatus;
stageLabel: string;
stageKey: string;
meta: ComplianceMeta | null;
sources: ComplianceSourceEvent[];
findings: ComplianceFindingEvent[];
done: ComplianceDonePayload | null;
errorText: string;
// Direction B additions:
analysisId: string | null;
isReadOnly: boolean;
}
```
Update `COMPLIANCE_INIT` to include the new fields:
```typescript
const COMPLIANCE_INIT: ComplianceState = {
status: 'idle',
stageLabel: '',
stageKey: '',
meta: null,
sources: [],
findings: [],
done: null,
errorText: '',
analysisId: null,
isReadOnly: false,
};
```
- [ ] **Step 2: Create `HistoryRail.tsx`**
```tsx
// frontend/src/pages/Compliance/HistoryRail.tsx
import { useEffect, useState, useCallback } from 'react';
import { Download, Trash2 } from 'lucide-react';
const TOKEN_KEY = 'auth_token';
function authHeader(): Record<string, string> {
const t = localStorage.getItem(TOKEN_KEY);
return t ? { Authorization: `Bearer ${t}` } : {};
}
interface HistoryItem {
id: string;
created_at: string;
doc_name: string;
standard_name: string;
risk_score: number;
finding_count: number;
}
interface Props {
refreshTrigger: number; // increment to force re-fetch (after new analysis saved)
onSelect: (id: string) => void;
selectedId: string | null;
}
function riskClass(score: number): string {
if (score >= 70) return 'risk-high';
if (score >= 40) return 'risk-medium';
return 'risk-low';
}
export function HistoryRail({ refreshTrigger, onSelect, selectedId }: Props) {
const [items, setItems] = useState<HistoryItem[]>([]);
const [deletingId, setDeletingId] = useState<string | null>(null);
const fetchHistory = useCallback(() => {
fetch('/api/v1/compliance/history?limit=30', { headers: authHeader() })
.then(r => r.json())
.then(data => {
if (Array.isArray(data)) setItems(data);
})
.catch(() => {/* backend may not have postgres configured */});
}, []);
useEffect(() => { fetchHistory(); }, [fetchHistory, refreshTrigger]);
function handleDownload(e: React.MouseEvent, item: HistoryItem) {
e.stopPropagation();
const url = `/api/v1/compliance/history/${item.id}/download`;
const a = document.createElement('a');
a.href = url;
a.download = '';
// Pass auth via query param isn't ideal; use fetch + blob for token-auth download
fetch(url, { headers: authHeader() })
.then(r => r.blob())
.then(blob => {
const blobUrl = URL.createObjectURL(blob);
const link = document.createElement('a');
link.href = blobUrl;
link.download = `compliance-${item.doc_name.slice(0, 30)}.docx`;
link.click();
URL.revokeObjectURL(blobUrl);
});
}
function handleDelete(e: React.MouseEvent, item: HistoryItem) {
e.stopPropagation();
if (!window.confirm(`Delete analysis for "${item.doc_name}"?`)) return;
setDeletingId(item.id);
fetch(`/api/v1/compliance/history/${item.id}`, {
method: 'DELETE',
headers: authHeader(),
})
.then(() => {
setItems(prev => prev.filter(i => i.id !== item.id));
setDeletingId(null);
})
.catch(() => setDeletingId(null));
}
function formatDate(iso: string): string {
try {
return new Date(iso).toLocaleDateString(undefined, { month: 'short', day: 'numeric' });
} catch {
return iso.slice(0, 10);
}
}
if (items.length === 0) {
return (
<div className="history-pane" style={{ minWidth: 200, maxWidth: 220 }}>
<div className="history-header">History</div>
<p style={{ padding: '12px 16px', fontSize: 12, color: 'var(--muted)', lineHeight: 1.5 }}>
Completed analyses appear here.
</p>
</div>
);
}
return (
<div className="history-pane" style={{ minWidth: 200, maxWidth: 220, overflowY: 'auto' }}>
<div className="history-header">History</div>
{items.map(item => (
<div
key={item.id}
className={`quick-item${selectedId === item.id ? ' active' : ''}`}
onClick={() => onSelect(item.id)}
style={{ cursor: 'pointer' }}
>
<div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 2 }}>
{formatDate(item.created_at)}
</div>
<div style={{ fontSize: 12, fontWeight: 500, marginBottom: 4, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
{item.doc_name || 'Untitled'}
</div>
<div style={{ display: 'flex', alignItems: 'center', gap: 6 }}>
<span className={`risk-badge ${riskClass(item.risk_score)}`} style={{ fontSize: 10 }}>
{item.risk_score}
</span>
<span style={{ fontSize: 10, color: 'var(--muted)', flex: 1 }}>
{item.finding_count} finding{item.finding_count !== 1 ? 's' : ''}
</span>
<button
className="btn icon-btn"
title="Download DOCX"
onClick={e => handleDownload(e, item)}
style={{ padding: '2px 4px' }}
>
<Download size={11} />
</button>
<button
className="btn icon-btn danger"
title="Delete"
disabled={deletingId === item.id}
onClick={e => handleDelete(e, item)}
style={{ padding: '2px 4px' }}
>
<Trash2 size={11} />
</button>
</div>
</div>
))}
</div>
);
}
```
- [ ] **Step 3: Integrate `HistoryRail` into `CompliancePage.tsx`**
At the top of `CompliancePage.tsx`, add imports:
```tsx
import { HistoryRail } from './HistoryRail';
```
Inside the `CompliancePage` component, add state:
```tsx
const [historyRefresh, setHistoryRefresh] = useState(0);
```
In `useComplianceAnalysis` hook usage in `CompliancePage`, the `run` SSE loop now emits a `saved` event. Handle it in `useComplianceAnalysis.ts` — add in the SSE parsing block (after the `done` handler):
```typescript
} else if (j.type === 'saved') {
setState(s => ({ ...s, analysisId: j.analysis_id ?? null }));
}
```
Back in `CompliancePage.tsx`, watch for `state.analysisId` changes and trigger history re-fetch:
```tsx
const prevAnalysisIdRef = useRef<string | null>(null);
useEffect(() => {
if (state.analysisId && state.analysisId !== prevAnalysisIdRef.current) {
prevAnalysisIdRef.current = state.analysisId;
setHistoryRefresh(n => n + 1);
}
}, [state.analysisId]);
```
Add a `handleSelectHistory` function that loads a historical record:
```tsx
async function handleSelectHistory(id: string) {
const res = await fetch(`/api/v1/compliance/history/${id}`, { headers: authHeader() });
if (!res.ok) return;
const data = await res.json();
// Reconstruct state from historical record (read-only view)
setComplianceState({
status: 'done',
stageLabel: 'Complete',
stageKey: 'concluding',
meta: { title: data.doc_name, sourceType: 'doc', startedAt: data.created_at },
sources: [],
findings: (data.findings || []).map((f: any) => ({
title: f.title,
desc: f.description,
status: f.status,
clause_ref: f.clause_ref,
})),
done: {
conclusion: data.conclusion,
actions: data.actions,
risk_score: data.risk_score,
highlight_terms: data.highlight_terms,
para_text: data.para_text,
},
errorText: '',
analysisId: data.id,
isReadOnly: true,
});
}
```
Note: `setComplianceState` is available from `usePageState()` — add it to the destructuring alongside `complianceState`.
In the JSX return, wrap the existing 3-column workspace with a flex row that includes `HistoryRail` on the left:
```tsx
<div style={{ display: 'flex', flex: 1, overflow: 'hidden' }}>
<HistoryRail
refreshTrigger={historyRefresh}
onSelect={handleSelectHistory}
selectedId={state.analysisId}
/>
<div style={{ flex: 1, overflow: 'hidden' }}>
{/* existing 3-column workspace JSX here */}
</div>
</div>
```
- [ ] **Step 4: TypeScript check**
```
cd frontend
npx tsc --noEmit
```
Expected: 0 errors.
---
## Direction C Tasks
### Task 8: `build_finding_context` + `generate_suggestions` in pipeline
**Files:**
- Modify: `backend/app/application/compliance/pipeline.py`
- Test: `backend/tests/compliance/test_pipeline.py`
- [ ] **Step 1: Write failing tests**
Add to `backend/tests/compliance/test_pipeline.py`:
```python
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
from datetime import datetime
def _sample_analysis() -> AnalysisRecord:
return AnalysisRecord(
id="a1", created_at=datetime(2026, 6, 8), created_by="u",
doc_name="doc.pdf", standard_name="EU AI Act",
risk_score=72, conclusion="Gaps found.", actions=[], para_text="para",
highlight_terms=[], findings=[],
)
def _sample_finding(status: str = "risk") -> FindingRecord:
return FindingRecord(
id="f1", analysis_id="a1", seq=0,
title="Missing CSMS", description="No CSMS certification.",
status=status, clause_ref="Art.9.1",
)
def test_build_finding_context_contains_required_fields():
from app.application.compliance.pipeline import build_finding_context
ctx = build_finding_context(_sample_finding(), _sample_analysis())
assert "doc.pdf" in ctx
assert "EU AI Act" in ctx
assert "Missing CSMS" in ctx
assert "Art.9.1" in ctx
def test_generate_suggestions_returns_three_questions():
from app.application.compliance.pipeline import generate_suggestions
client = _make_mock_client(
'{"questions": ["Q1?", "Q2?", "Q3?"]}'
)
questions = generate_suggestions(_sample_finding("risk"), _sample_analysis(), client)
assert len(questions) == 3
assert all(isinstance(q, str) for q in questions)
def test_generate_suggestions_falls_back_on_error():
from app.application.compliance.pipeline import generate_suggestions
bad_client = MagicMock()
bad_resp = MagicMock()
bad_resp.is_success = False
bad_client.chat.return_value = bad_resp
questions = generate_suggestions(_sample_finding(), _sample_analysis(), bad_client)
assert len(questions) == 3 # fallback always returns 3
```
- [ ] **Step 2: Run to verify failure**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_build_finding_context_contains_required_fields -v
```
Expected: `ImportError: cannot import name 'build_finding_context'`
- [ ] **Step 3: Implement both functions in `pipeline.py`**
Add at the end of `pipeline.py`:
```python
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
_SUGGESTION_FOCUS = {
"risk": "Focus on remediation steps, required certifications, and timeline to resolve.",
"warn": "Focus on identifying the specific compliance gap and how to close it.",
"ok": "Focus on maintaining compliance evidence and monitoring future changes.",
}
_SUGGESTION_FALLBACK = {
"risk": [
"What specific certifications or documents are required to remediate this finding?",
"What is the typical remediation timeline for this type of non-compliance?",
"Which regulation clause defines the exact requirement?",
],
"warn": [
"What is the exact gap between the current state and the requirement?",
"What evidence would demonstrate partial compliance?",
"Which regulation clause applies to this warning?",
],
"ok": [
"What documentation should be maintained to evidence this compliance?",
"How should this area be monitored as regulations evolve?",
"Are there related clauses that may affect this compliant area?",
],
}
def build_finding_context(finding: "FindingRecord", analysis: "AnalysisRecord") -> str:
"""Build a grounded system context string for a finding chat thread.
Combines finding details with analysis metadata so the LLM has full
context without relying on the frontend to pass segment_context.
"""
return (
f"Document: {analysis.doc_name}\n"
f"Standard: {analysis.standard_name}\n"
f"Finding [{finding.seq + 1}]: {finding.title}\n"
f"Status: {finding.status}\n"
f"Clause reference: {finding.clause_ref or 'N/A'}\n"
f"Description: {finding.description}\n"
f"Overall conclusion: {analysis.conclusion}\n"
)
def generate_suggestions(
finding: "FindingRecord",
analysis: "AnalysisRecord",
client: "BaseLLMClient",
) -> list[str]:
"""Generate 3 context-aware follow-up questions for a finding chat thread.
Returns exactly 3 question strings. Falls back to static templates on error.
"""
fallback = _SUGGESTION_FALLBACK.get(finding.status, _SUGGESTION_FALLBACK["warn"])
context = build_finding_context(finding, analysis)
focus = _SUGGESTION_FOCUS.get(finding.status, _SUGGESTION_FOCUS["warn"])
prompt = (
f"{context}\n\n"
f"Task: {focus}\n"
"Generate exactly 3 concise follow-up questions a compliance analyst would ask.\n"
'Return JSON: {"questions": ["question 1", "question 2", "question 3"]}\n'
"Return ONLY the JSON object."
)
response = client.chat([{"role": "user", "content": prompt}], max_tokens=300)
if not response.is_success:
return fallback
try:
result = _extract_json(response.content)
questions = result.get("questions", [])
if isinstance(questions, list) and len(questions) >= 3:
return [str(q) for q in questions[:3]]
except (ValueError, TypeError) as exc:
logger.warning("generate_suggestions JSON parse failed: {}", exc)
return fallback
```
- [ ] **Step 4: Run all pipeline tests**
```
cd backend
python -m pytest tests/compliance/test_pipeline.py -v
```
Expected: all pass.
---
### Task 9: Finding chat API endpoints
**Files:**
- Modify: `backend/app/api/routes/compliance.py`
- [ ] **Step 1: Add three finding-chat endpoints**
Add these routes after the history download endpoint:
```python
@router.get("/analyses/{analysis_id}/findings/{finding_id}/chat")
async def get_finding_chat_history(
analysis_id: str,
finding_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Return persisted chat messages for a finding thread, oldest first."""
from app.shared.bootstrap import get_compliance_repository
try:
repo = get_compliance_repository()
messages = await asyncio.to_thread(repo.get_messages, finding_id)
return messages
except NotImplementedError:
return []
@router.post("/analyses/{analysis_id}/findings/{finding_id}/suggestions")
async def get_finding_suggestions(
analysis_id: str,
finding_id: str,
current_user: UserClaims = Depends(get_current_user),
):
"""Generate 3 LLM-powered follow-up question suggestions for a finding."""
from app.application.compliance.pipeline import generate_suggestions, build_finding_context
from app.shared.bootstrap import get_compliance_repository
from app.services.llm.llm_factory import get_llm_client
from fastapi import HTTPException
repo = get_compliance_repository()
analysis = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not analysis:
raise HTTPException(status_code=404, detail="Analysis not found")
finding = next((f for f in analysis.findings if f.id == finding_id), None)
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
client = get_llm_client(provider=settings.llm_provider, model=settings.llm_model)
questions = await asyncio.to_thread(generate_suggestions, finding, analysis, client)
return {"questions": questions}
@router.post("/analyses/{analysis_id}/findings/{finding_id}/chat")
async def finding_chat(
analysis_id: str,
finding_id: str,
request: ComplianceChatRequest,
current_user: UserClaims = Depends(get_current_user),
):
"""Stream a grounded chat response for a specific finding.
Loads the finding and analysis from DB to build grounded context.
Persists both user message and assistant response to finding_chat_messages.
"""
from app.application.compliance.pipeline import build_finding_context
from app.shared.bootstrap import get_compliance_repository
from app.services.llm.llm_factory import get_llm_client
from fastapi import HTTPException
repo = get_compliance_repository()
analysis = await asyncio.to_thread(repo.get_analysis, analysis_id)
if not analysis:
raise HTTPException(status_code=404, detail="Analysis not found")
finding = next((f for f in analysis.findings if f.id == finding_id), None)
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
# Persist user message
await asyncio.to_thread(
repo.save_message, analysis_id, finding_id, "user", request.query
)
# Build message history (last 10 messages = 5 turns)
history = await asyncio.to_thread(repo.get_messages, finding_id)
history_messages = [
{"role": m["role"], "content": m["content"]}
for m in history[-10:]
]
# Build grounded system context
system_context = build_finding_context(finding, analysis)
full_query = f"[Compliance Finding Context]\n{system_context}\n\nUser question: {request.query}"
# Prepend history except the last user message (already included in full_query)
messages_for_llm = history_messages[:-1] + [{"role": "user", "content": full_query}]
assistant_buffer = []
async def generate() -> AsyncGenerator[str, None]:
nonlocal assistant_buffer
try:
_, event_stream = get_agent_conversation_service().stream_chat(
query=full_query,
top_k=5,
prompt_template="compliance_qa",
)
for event in event_stream:
event_type = event.get("event", "")
if event_type == "content":
text = event.get("data", "")
if text:
assistant_buffer.append(text)
yield _sse({"type": "chunk", "text": text})
elif event_type == "done":
yield _sse({"type": "done"})
await asyncio.sleep(0)
except Exception as exc:
logger.exception("finding_chat stream error")
yield _sse({"type": "error", "text": str(exc)})
finally:
# Persist assistant response after stream completes
full_response = "".join(assistant_buffer)
if full_response:
try:
await asyncio.to_thread(
repo.save_message, analysis_id, finding_id, "assistant", full_response
)
except Exception as exc:
logger.warning("Failed to persist assistant message: {}", exc)
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
)
```
- [ ] **Step 2: Smoke test**
```bash
# Get suggestions for a finding (replace IDs with real values from a previous analysis)
curl -X POST \
"http://localhost:8000/api/v1/compliance/analyses/<analysis_id>/findings/<finding_id>/suggestions" \
-H "Authorization: Bearer <token>"
# Expected: {"questions": ["...", "...", "..."]}
# Get chat history (empty initially)
curl "http://localhost:8000/api/v1/compliance/analyses/<analysis_id>/findings/<finding_id>/chat" \
-H "Authorization: Bearer <token>"
# Expected: []
```
---
### Task 10: FindingChatDrawer component
**Files:**
- Create: `frontend/src/pages/Compliance/FindingChatDrawer.tsx`
- Modify: `frontend/src/pages/Compliance/CompliancePage.tsx`
- Modify: `frontend/src/contexts/PageStateContext.tsx`
- [ ] **Step 1: Add `activeFindingId` to `PageStateContext.tsx`**
In the `ComplianceState` interface (already updated in Task 7), add:
```typescript
activeFindingId: string | null; // finding UUID from DB (not index)
```
In `COMPLIANCE_INIT`, add:
```typescript
activeFindingId: null,
```
- [ ] **Step 2: Create `FindingChatDrawer.tsx`**
```tsx
// frontend/src/pages/Compliance/FindingChatDrawer.tsx
import { useEffect, useRef, useState } from 'react';
import { X, Send } from 'lucide-react';
import type { ComplianceFindingEvent } from '../../contexts';
const TOKEN_KEY = 'auth_token';
function authHeader(): Record<string, string> {
const t = localStorage.getItem(TOKEN_KEY);
return t ? { Authorization: `Bearer ${t}` } : {};
}
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
}
interface Props {
analysisId: string;
findingId: string;
finding: { title: string; desc: string; status: string; clause_ref?: string };
onClose: () => void;
}
export function FindingChatDrawer({ analysisId, findingId, finding, onClose }: Props) {
const [messages, setMessages] = useState<Message[]>([]);
const [suggestions, setSuggestions] = useState<string[]>([]);
const [input, setInput] = useState('');
const [loading, setLoading] = useState(false);
const [loadingHistory, setLoadingHistory] = useState(true);
const abortRef = useRef<AbortController | null>(null);
const bottomRef = useRef<HTMLDivElement>(null);
// Load history + suggestions on open
useEffect(() => {
setLoadingHistory(true);
// Fetch history
fetch(`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/chat`, {
headers: authHeader(),
})
.then(r => r.json())
.then((data: Message[]) => {
setMessages(data.map(m => ({ id: m.id, role: m.role, content: m.content })));
setLoadingHistory(false);
// Only fetch suggestions if no history
if (!data.length) {
fetch(
`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/suggestions`,
{ method: 'POST', headers: authHeader() }
)
.then(r => r.json())
.then(d => { if (Array.isArray(d?.questions)) setSuggestions(d.questions); })
.catch(() => {});
}
})
.catch(() => setLoadingHistory(false));
return () => { abortRef.current?.abort(); };
}, [analysisId, findingId]);
// Auto-scroll to bottom
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
async function send(text?: string) {
const q = (text ?? input).trim();
if (!q || loading) return;
setInput('');
setSuggestions([]); // hide chips after first message
const assistantId = `ast-${Date.now()}`;
setMessages(prev => [
...prev,
{ id: `usr-${Date.now()}`, role: 'user', content: q },
{ id: assistantId, role: 'assistant', content: '' },
]);
setLoading(true);
const ctrl = new AbortController();
abortRef.current = ctrl;
try {
const res = await fetch(
`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/chat`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json', ...authHeader() },
body: JSON.stringify({ query: q }),
signal: ctrl.signal,
}
);
if (!res.body) { setLoading(false); return; }
const reader = res.body.getReader();
const dec = new TextDecoder();
let buf = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += dec.decode(value, { stream: true });
const blocks = buf.split('\n\n');
buf = blocks.pop() ?? '';
for (const block of blocks) {
const dl = block.split('\n').find(l => l.startsWith('data: '));
if (!dl) continue;
try {
const j = JSON.parse(dl.slice(6));
if (j.type === 'chunk' && j.text) {
setMessages(prev =>
prev.map(m => m.id === assistantId ? { ...m, content: m.content + j.text } : m)
);
}
} catch { /* skip */ }
}
}
} catch (e: unknown) {
if (e instanceof Error && e.name !== 'AbortError') {
setMessages(prev =>
prev.map(m => m.id === assistantId ? { ...m, content: 'Error reaching server.' } : m)
);
}
} finally {
setLoading(false);
}
}
const STATUS_COLOR: Record<string, string> = {
risk: 'var(--danger, #dc143c)',
warn: 'var(--warning, #ff8c00)',
ok: 'var(--success, #228b22)',
};
return (
<div
style={{
position: 'fixed', right: 0, top: 0, bottom: 0, width: 420,
background: 'var(--surface)', borderLeft: '1px solid var(--border)',
display: 'flex', flexDirection: 'column', zIndex: 200,
boxShadow: '-4px 0 16px rgba(0,0,0,0.12)',
}}
>
{/* Header */}
<div style={{
padding: '14px 16px', borderBottom: '1px solid var(--border)',
display: 'flex', alignItems: 'flex-start', gap: 10,
}}>
<div style={{ flex: 1, minWidth: 0 }}>
<div style={{ fontSize: 11, color: STATUS_COLOR[finding.status] || 'var(--muted)', fontWeight: 600, marginBottom: 2 }}>
{finding.status.toUpperCase()}
{finding.clause_ref && <span style={{ fontWeight: 400, marginLeft: 6, color: 'var(--muted)' }}>{finding.clause_ref}</span>}
</div>
<div style={{ fontSize: 13, fontWeight: 600, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
{finding.title}
</div>
<div style={{ fontSize: 11, color: 'var(--muted)', marginTop: 2, lineHeight: 1.4 }}>
{finding.desc.length > 100 ? finding.desc.slice(0, 100) + '…' : finding.desc}
</div>
</div>
<button className="btn icon-btn" onClick={onClose} style={{ flexShrink: 0 }}>
<X size={14} />
</button>
</div>
{/* Suggestion chips — shown only before first user message */}
{suggestions.length > 0 && (
<div style={{ padding: '10px 16px', borderBottom: '1px solid var(--border)' }}>
<div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 6 }}>Suggested questions</div>
<div style={{ display: 'flex', flexDirection: 'column', gap: 6 }}>
{suggestions.map((q, i) => (
<button
key={i}
className="chip"
style={{ textAlign: 'left', whiteSpace: 'normal', height: 'auto', padding: '6px 10px' }}
onClick={() => send(q)}
>
{q}
</button>
))}
</div>
</div>
)}
{/* Messages */}
<div style={{ flex: 1, overflowY: 'auto', padding: '12px 16px', display: 'flex', flexDirection: 'column', gap: 10 }}>
{loadingHistory && (
<p style={{ fontSize: 12, color: 'var(--muted)', textAlign: 'center' }}>Loading history…</p>
)}
{messages.map(msg => (
<div key={msg.id} className={`message msg-${msg.role}`} style={{ maxWidth: '100%' }}>
{msg.role === 'assistant' && <div className="msg-avatar">AI</div>}
<div className="msg-bubble" style={{ fontSize: 13, whiteSpace: 'pre-wrap' }}>
{msg.content || (loading ? '…' : '')}
</div>
{msg.role === 'user' && <div className="msg-avatar user-av">You</div>}
</div>
))}
<div ref={bottomRef} />
</div>
{/* Composer */}
<div style={{ padding: '10px 16px', borderTop: '1px solid var(--border)', display: 'flex', gap: 8 }}>
<textarea
className="composer-input"
placeholder="Ask about this finding…"
value={input}
rows={2}
style={{ flex: 1, fontSize: 13 }}
onChange={e => setInput(e.target.value)}
onKeyDown={e => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); send(); } }}
/>
<button
className="btn primary"
disabled={!input.trim() || loading}
onClick={() => send()}
style={{ alignSelf: 'flex-end' }}
>
<Send size={14} />
</button>
</div>
</div>
);
}
```
- [ ] **Step 3: Wire `FindingChatDrawer` into `CompliancePage.tsx`**
Add import:
```tsx
import { FindingChatDrawer } from './FindingChatDrawer';
```
In the finding card JSX (inside the findings list, around the existing `chat.openFor(i, f)` button), replace the existing button with a "💬 Chat" button that uses the DB finding ID. Because historical findings now have UUIDs, update the button to use `state.findings` index and `state.analysisId`:
Find the existing findings list map in `CompliancePage.tsx` (around line 380420) and locate the `Chat` button that currently calls `chat.openFor(i, f)`. Replace the entire `useFindingChat()` inline chat panel logic with the new drawer:
```tsx
// Add at the top of CompliancePage component body:
const [drawerFindingIdx, setDrawerFindingIdx] = useState<number | null>(null);
// Inside the findings list map, find the existing chat button and replace with:
<button
className="btn sm"
onClick={() => setDrawerFindingIdx(i)}
style={{ marginTop: 6 }}
>
💬 Chat
</button>
```
At the bottom of the `CompliancePage` JSX return (before the closing `</div>`), add the drawer:
```tsx
{drawerFindingIdx !== null && state.analysisId && (() => {
// We need the finding's DB UUID — fetch it from the history endpoint
// For now, use the finding index as a fallback; Task 9 wires real UUIDs
// The analysisId is available from state; finding UUID needs the analysis record
// Fetch on demand via handleSelectHistory pattern:
return (
<_FindingChatDrawerWrapper
analysisId={state.analysisId}
findingIndex={drawerFindingIdx}
finding={state.findings[drawerFindingIdx]}
onClose={() => setDrawerFindingIdx(null)}
/>
);
})()}
```
Because finding UUIDs come from the database record, add a helper wrapper that fetches them:
```tsx
function _FindingChatDrawerWrapper({
analysisId, findingIndex, finding, onClose,
}: {
analysisId: string;
findingIndex: number;
finding: ComplianceFindingEvent;
onClose: () => void;
}) {
const [findingId, setFindingId] = useState<string | null>(null);
useEffect(() => {
fetch(`/api/v1/compliance/history/${analysisId}`, { headers: authHeader() })
.then(r => r.json())
.then(data => {
const f = (data.findings || []).find((f: any) => f.seq === findingIndex);
if (f?.id) setFindingId(f.id);
})
.catch(() => {});
}, [analysisId, findingIndex]);
if (!findingId) return null;
return (
<FindingChatDrawer
analysisId={analysisId}
findingId={findingId}
finding={finding}
onClose={onClose}
/>
);
}
```
- [ ] **Step 4: TypeScript check**
```
cd frontend
npx tsc --noEmit
```
Expected: 0 errors.
- [ ] **Step 5: End-to-end smoke test**
1. Start backend + frontend
2. Run a compliance analysis with a text that produces at least 2 findings
3. Verify the `saved` SSE event appears and History Rail shows the new record
4. Click the "💬 Chat" button on any finding
5. Verify the Drawer opens with 3 suggestion chips
6. Click one suggestion — verify it streams a response
7. Close and reopen the Drawer — verify the conversation history is restored
8. Click `[↓]` in History Rail — verify a DOCX file downloads
9. Click `[×]` in History Rail → confirm dialog → verify record disappears
---
## Verification Checklist
- [ ] Direction A: SSE stream emits one `stage:analyzing` event (not per-clause), all findings still appear, `highlight_terms` in `done` event is non-empty
- [ ] Direction A: `PassThroughReranker` importable from `app.infrastructure.vectorstore.pass_through_reranker`
- [ ] Direction B: Running analysis emits `{"type":"saved","analysis_id":"<uuid>"}` SSE event
- [ ] Direction B: `GET /api/v1/compliance/history` returns list with the saved record
- [ ] Direction B: `GET /api/v1/compliance/history/<id>/download` returns a valid DOCX file
- [ ] Direction B: `DELETE /api/v1/compliance/history/<id>` removes the record
- [ ] Direction B: History Rail renders in the UI and updates after analysis completes
- [ ] Direction C: Suggestions endpoint returns 3 questions for a valid finding
- [ ] Direction C: Chat endpoint streams a grounded response referencing finding context
- [ ] Direction C: Reopening the Drawer shows previous conversation
- [ ] All backend tests pass: `python -m pytest tests/compliance/ -v`
- [ ] TypeScript check passes: `npx tsc --noEmit` (0 errors)