Files
AIRegulation-DocAnalysis/docs/superpowers/plans/2026-06-08-compliance-enhancement.md

78 KiB
Raw Permalink Blame History

Compliance Analysis Enhancement Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Enhance the Compliance Analysis module with parallel clause processing + bug fixes (A), persistent analysis history + DOCX export (B), and per-finding persistent chat threads with LLM-generated suggestions (C).

Architecture: Direction A refactors the SSE pipeline in app/api/routes/compliance.py to run clauses in parallel via asyncio.gather and fixes highlight_terms and LLM retry. Direction B adds a PostgreSQL-backed ComplianceRepository (new domain port + infrastructure) and three REST endpoints plus a frontend History Rail. Direction C adds per-finding chat endpoints grounded in real analysis data and a React Drawer component. Implementation order: A → B → C (each direction is a prerequisite for the next).

Tech Stack: FastAPI (SSE), psycopg2 (ThreadedConnectionPool), python-docx, tenacity (retry), React 18, TypeScript, existing PageStateContext pattern.


File Map

Direction A — Analysis Quality

File Action
backend/app/application/compliance/pipeline.py Add process_single_clause(), _call_llm_with_retry() wrapper, add tenacity retry to synthesize_conclusion and check_clause_compliance
backend/app/api/routes/compliance.py Replace sequential for-loop with asyncio.gather on process_single_clause
backend/app/infrastructure/vectorstore/pass_through_reranker.py New — PassThroughReranker stub
backend/tests/compliance/test_pipeline.py New — unit tests for parallel processing + highlight_terms fix

Direction B — History & Reports

File Action
backend/app/domain/compliance/__init__.py New — empty
backend/app/domain/compliance/ports.py New — FindingRecord, AnalysisRecord, ComplianceRepository ABC
backend/app/infrastructure/compliance/__init__.py New — empty
backend/app/infrastructure/compliance/repository.py New — PostgresComplianceRepository
backend/app/infrastructure/compliance/docx_export.py New — generate_docx()
backend/app/api/routes/compliance.py Add history endpoints + auto-save hook in analyze_stream
backend/app/shared/bootstrap.py Add get_compliance_repository() factory
frontend/src/pages/Compliance/HistoryRail.tsx New — History Rail component
frontend/src/pages/Compliance/CompliancePage.tsx Import + render HistoryRail, handle saved SSE event, store analysisId
frontend/src/contexts/PageStateContext.tsx Add analysisId, isReadOnly to ComplianceState
backend/tests/compliance/test_repository.py New — unit tests for repository (with mock psycopg2)

Direction C — Deep Chat

File Action
backend/app/application/compliance/pipeline.py Add build_finding_context(), generate_suggestions()
backend/app/api/routes/compliance.py Add 3 finding-chat endpoints, deprecate old chat endpoint
frontend/src/pages/Compliance/FindingChatDrawer.tsx New — per-finding chat drawer
frontend/src/pages/Compliance/CompliancePage.tsx Add Chat button to finding cards, render FindingChatDrawer
frontend/src/contexts/PageStateContext.tsx Add activeFindingId to ComplianceState
backend/tests/compliance/test_pipeline.py Extend — test build_finding_context + generate_suggestions

Direction A Tasks

Task 1: PassThroughReranker stub

Files:

  • Create: backend/app/infrastructure/vectorstore/pass_through_reranker.py

  • Test: backend/tests/compliance/test_pipeline.py

  • Step 1: Create test file and write failing test

# backend/tests/compliance/test_pipeline.py
import pytest
from app.infrastructure.vectorstore.pass_through_reranker import PassThroughReranker
from app.domain.retrieval.models import RetrievedChunk


def _make_chunk(score: float) -> RetrievedChunk:
    return RetrievedChunk(
        doc_id="d1",
        doc_title="Test Doc",
        section_title="S1",
        text="some text",
        score=score,
        page_start=1,
    )


def test_pass_through_returns_top_k():
    reranker = PassThroughReranker()
    chunks = [_make_chunk(0.9), _make_chunk(0.8), _make_chunk(0.7)]
    result = reranker.rerank(query="test", chunks=chunks, top_k=2)
    assert len(result) == 2
    assert result[0].score == 0.9


def test_pass_through_returns_all_when_top_k_exceeds():
    reranker = PassThroughReranker()
    chunks = [_make_chunk(0.5)]
    result = reranker.rerank(query="test", chunks=chunks, top_k=10)
    assert len(result) == 1
  • Step 2: Run test to verify it fails
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_pass_through_returns_top_k -v

Expected: ModuleNotFoundError or ImportErrorpass_through_reranker does not exist yet.

  • Step 3: Create tests/compliance/__init__.py
# backend/tests/compliance/__init__.py
  • Step 4: Create the reranker
# backend/app/infrastructure/vectorstore/pass_through_reranker.py
"""No-op reranker stub.

Returns the original candidate list sliced to top_k.
Replace with CrossEncoderReranker when a local cross-encoder model is available.
"""
from __future__ import annotations

from app.domain.retrieval.models import RetrievedChunk
from app.domain.retrieval.ports import Reranker


class PassThroughReranker(Reranker):
    """Pass-through reranker that preserves original retrieval order.

    Acts as a placeholder for future cross-encoder reranking (e.g. ms-marco-MiniLM).
    Wire via bootstrap.get_compliance_reranker() when ready to swap.
    """

    def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
        """Return the first top_k chunks without reordering."""
        return chunks[:top_k]
  • Step 5: Run tests to verify they pass
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_pass_through_returns_top_k tests/compliance/test_pipeline.py::test_pass_through_returns_all_when_top_k_exceeds -v

Expected: 2 passed.


Task 2: Parallel clause processing + LLM retry

Files:

  • Modify: backend/app/application/compliance/pipeline.py

  • Modify: backend/app/api/routes/compliance.py

  • Test: backend/tests/compliance/test_pipeline.py

  • Step 1: Write failing tests

Add these tests to backend/tests/compliance/test_pipeline.py:

import asyncio
from unittest.mock import MagicMock, patch


def _make_mock_client(content: str = '{"status":"ok","title":"T","desc":"D","clause_ref":"A1"}'):
    client = MagicMock()
    response = MagicMock()
    response.is_success = True
    response.content = content
    client.chat.return_value = response
    return client


def _make_mock_retrieval():
    svc = MagicMock()
    svc.retrieve.return_value = []
    return svc


def test_process_single_clause_returns_finding():
    from app.application.compliance.pipeline import process_single_clause
    client = _make_mock_client()
    svc = _make_mock_retrieval()
    result = process_single_clause("test clause", 0, svc, client, "EU AI Act", "para")
    assert result["finding"] is not None
    assert result["index"] == 0
    assert result["chunks"] == []


def test_run_clauses_parallel_runs_all():
    from app.application.compliance.pipeline import run_clauses_parallel
    client = _make_mock_client()
    svc = _make_mock_retrieval()
    clauses = ["clause one", "clause two", "clause three"]
    results = asyncio.run(run_clauses_parallel(clauses, svc, client, "EU AI Act", "para"))
    assert len(results) == 3
    assert all(r["index"] == i for i, r in enumerate(results))
  • Step 2: Run to verify failure
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_process_single_clause_returns_finding -v

Expected: ImportError: cannot import name 'process_single_clause'

  • Step 3: Add process_single_clause and run_clauses_parallel to pipeline.py

Add after line 109 (after retrieve_for_clause), before check_clause_compliance:

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import httpx


def process_single_clause(
    clause: str,
    index: int,
    retrieval_service: "KnowledgeRetrievalService",
    client: "BaseLLMClient",
    standard_name: str,
    para_text: str,
    top_k: int = 5,
    domains: str | None = None,
) -> dict:
    """Process one clause: retrieve relevant regulations then check compliance.

    Returns a dict with keys: index, chunks, finding (may be None on LLM failure).
    Designed to run inside asyncio.to_thread() for parallel execution.
    """
    chunks = retrieve_for_clause(clause, retrieval_service, top_k, domains)
    finding = check_clause_compliance(clause, chunks, client)
    return {"index": index, "chunks": chunks, "finding": finding}


async def run_clauses_parallel(
    clauses: list[str],
    retrieval_service: "KnowledgeRetrievalService",
    client: "BaseLLMClient",
    standard_name: str,
    para_text: str,
    top_k: int = 5,
    domains: str | None = None,
) -> list[dict]:
    """Run all clauses through retrieve+gap-check in parallel.

    Results are returned in the original clause order even though processing
    is concurrent. Exceptions in individual clauses are caught and returned as
    dicts with finding=None so the stream continues for remaining clauses.
    """
    tasks = [
        asyncio.to_thread(
            process_single_clause,
            clause, i, retrieval_service, client, standard_name, para_text, top_k, domains,
        )
        for i, clause in enumerate(clauses)
    ]
    raw = await asyncio.gather(*tasks, return_exceptions=True)
    results = []
    for i, r in enumerate(raw):
        if isinstance(r, Exception):
            logger.warning("Clause {} processing failed: {}", i, r)
            results.append({"index": i, "chunks": [], "finding": None})
        else:
            results.append(r)
    return results

Also add the imports at the top of pipeline.py (after existing imports):

import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
  • Step 4: Wrap LLM calls in check_clause_compliance with retry

Replace the direct client.chat(...) call in check_clause_compliance (line 137) with:

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=4),
    retry=retry_if_exception_type((httpx.HTTPError, ValueError)),
    reraise=True,
)
def _llm_check(prompt_messages: list[dict]) -> "BaseLLMClient":
    resp = client.chat(prompt_messages, max_tokens=500)
    if not resp.is_success:
        raise ValueError(f"LLM returned non-success status for gap check")
    return resp

Then in check_clause_compliance, replace:

response = client.chat([{"role": "user", "content": prompt}], max_tokens=500)
if not response.is_success:
    return None

with:

try:
    response = _llm_check([{"role": "user", "content": prompt}])
except Exception as exc:
    logger.warning("check_clause_compliance LLM call failed after retries: {}", exc)
    return None

Apply the same pattern to synthesize_conclusion (line 189): wrap client.chat(...) in a nested retry function and catch on final failure to return fallback.

Also fix the highlight_terms prompt bug — the current prompt uses ["Key terms to highlight, max 10 terms"] as an example string, so the LLM returns the literal example instead of real terms. In synthesize_conclusion, replace that prompt line with:

'  "highlight_terms": ["term1", "term2"],  // up to 10 key technical/legal terms actually present in the text\n'
  • Step 5: Replace sequential loop in compliance.py with parallel version

In backend/app/api/routes/compliance.py, replace the Stage 3 block (lines 138168):

# ── Stage 3: retrieve + gap check per clause ──────────────────
findings: list[dict] = []

for i, clause in enumerate(clauses):
    yield _sse({
        "type": "stage",
        "stage": "analyzing",
        "label": f"Analyzing clause {i + 1}/{len(clauses)}…",
    })
    await asyncio.sleep(0)

    chunks = await asyncio.to_thread(
        retrieve_for_clause, clause, retrieval_service, 5, domains or None
    )

    # Emit source events
    for chunk in chunks[:3]:
        yield _sse({...})
    await asyncio.sleep(0)

    finding = await asyncio.to_thread(check_clause_compliance, clause, chunks, client)
    if finding:
        findings.append(finding)
        yield _sse({"type": "finding", **finding})
    await asyncio.sleep(0)

With:

# ── Stage 3: retrieve + gap check (parallel across all clauses) ────────────
from app.application.compliance.pipeline import run_clauses_parallel

findings: list[dict] = []

yield _sse({
    "type": "stage",
    "stage": "analyzing",
    "label": f"Analyzing {len(clauses)} clauses in parallel…",
})
await asyncio.sleep(0)

clause_results = await run_clauses_parallel(
    clauses, retrieval_service, client,
    standard_name=title or "",
    para_text=para_text,
    top_k=5,
    domains=domains or None,
)

for res in clause_results:
    i = res["index"]
    chunks = res["chunks"]
    finding = res["finding"]

    # Emit source events for this clause
    for chunk in chunks[:3]:
        yield _sse({
            "type": "source",
            "standard": getattr(chunk, "doc_title", "") or getattr(chunk, "doc_name", ""),
            "clause": getattr(chunk, "section_title", "") or "",
            "score": round(float(getattr(chunk, "score", 0)), 3),
            "status": "retrieved",
            "full_content": (getattr(chunk, "text", "") or "")[:300],
        })

    if finding:
        findings.append(finding)
        yield _sse({"type": "finding", **finding})

    await asyncio.sleep(0)

Also update the imports at the top of the generate() inner function — add run_clauses_parallel to the from app.application.compliance.pipeline import (...) block and remove individual imports of retrieve_for_clause, check_clause_compliance.

  • Step 6: Run all Direction A tests
cd backend
python -m pytest tests/compliance/ -v

Expected: all pass.

  • Step 7: Smoke test the SSE endpoint manually
curl -X POST http://localhost:8000/api/v1/compliance/analyze-stream \
  -H "Authorization: Bearer <token>" \
  -F "text=The system shall implement CSMS as per ISO 21434 requirements. All training data must be documented and version controlled." \
  --no-buffer

Expected: SSE stream with one stage:analyzing event (not per-clause), then multiple finding events, then done with non-empty highlight_terms array.


Direction B Tasks

Task 3: Domain port + data classes

Files:

  • Create: backend/app/domain/compliance/__init__.py

  • Create: backend/app/domain/compliance/ports.py

  • Test: backend/tests/compliance/test_repository.py

  • Step 1: Write failing test

# backend/tests/compliance/test_repository.py
from datetime import datetime
from app.domain.compliance.ports import (
    AnalysisRecord,
    FindingRecord,
    ComplianceRepository,
)


def test_analysis_record_construction():
    record = AnalysisRecord(
        id="",
        created_at=datetime.utcnow(),
        created_by="user1",
        doc_name="test.pdf",
        standard_name="EU AI Act",
        risk_score=72,
        conclusion="Some gaps found.",
        actions=[{"label": "Fix", "value": "Update docs"}],
        para_text="The system shall...",
        highlight_terms=["CSMS", "ISO 21434"],
        findings=[
            FindingRecord(
                id="",
                analysis_id="",
                seq=0,
                title="Missing CSMS",
                description="No CSMS certification found.",
                status="risk",
                clause_ref="Art.9.1",
            )
        ],
    )
    assert record.doc_name == "test.pdf"
    assert len(record.findings) == 1
    assert record.findings[0].status == "risk"


def test_compliance_repository_is_abstract():
    import inspect
    assert inspect.isabstract(ComplianceRepository)
  • Step 2: Run to verify failure
cd backend
python -m pytest tests/compliance/test_repository.py -v

Expected: ModuleNotFoundError: No module named 'app.domain.compliance'

  • Step 3: Create domain module
# backend/app/domain/compliance/__init__.py
  • Step 4: Create ports.py
# backend/app/domain/compliance/ports.py
"""Domain ports for compliance history persistence."""
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass
class FindingRecord:
    """Single finding row linked to an analysis."""
    id: str
    analysis_id: str
    seq: int
    title: str
    description: str
    status: str          # "ok" | "warn" | "risk"
    clause_ref: Optional[str] = None


@dataclass
class AnalysisRecord:
    """Full compliance analysis record with nested findings."""
    id: str              # UUID string; empty string means not yet persisted
    created_at: datetime
    created_by: Optional[str]
    doc_name: str
    standard_name: str
    risk_score: int
    conclusion: str
    actions: list        # list[dict] — serialised action items
    para_text: str
    highlight_terms: list  # list[str]
    findings: list[FindingRecord] = field(default_factory=list)


class ComplianceRepository(ABC):
    """Port for persisting and retrieving compliance analysis records."""

    @abstractmethod
    def save_analysis(self, record: AnalysisRecord) -> str:
        """Persist a new analysis record and return the assigned UUID string."""

    @abstractmethod
    def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]:
        """Return analyses ordered by created_at DESC, without nested findings."""

    @abstractmethod
    def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]:
        """Return a single analysis with all nested findings, or None."""

    @abstractmethod
    def delete_analysis(self, analysis_id: str) -> None:
        """Delete an analysis and all related findings and chat messages (cascade)."""

    @abstractmethod
    def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str:
        """Persist a chat message and return its UUID string."""

    @abstractmethod
    def get_messages(self, finding_id: str) -> list[dict]:
        """Return chat messages for a finding ordered by created_at ASC.

        Each dict has keys: id, role, content, created_at (ISO string).
        """
  • Step 5: Run tests to verify they pass
cd backend
python -m pytest tests/compliance/test_repository.py -v

Expected: 2 passed.


Task 4: PostgresComplianceRepository

Files:

  • Create: backend/app/infrastructure/compliance/__init__.py

  • Create: backend/app/infrastructure/compliance/repository.py

  • Test: backend/tests/compliance/test_repository.py

  • Step 1: Write failing test

Add to backend/tests/compliance/test_repository.py:

from unittest.mock import MagicMock, patch, call
from datetime import datetime
from app.domain.compliance.ports import AnalysisRecord, FindingRecord


def _mock_pool():
    """Return a mock psycopg2 ThreadedConnectionPool."""
    conn = MagicMock()
    cursor = MagicMock()
    cursor.__enter__ = MagicMock(return_value=cursor)
    cursor.__exit__ = MagicMock(return_value=False)
    conn.cursor.return_value = cursor
    pool = MagicMock()
    pool.getconn.return_value = conn
    return pool, conn, cursor


@patch("app.infrastructure.compliance.repository.psycopg2.pool.ThreadedConnectionPool")
@patch("app.infrastructure.compliance.repository.psycopg2.extras")
def test_save_analysis_returns_uuid(mock_extras, mock_pool_cls):
    from app.infrastructure.compliance.repository import PostgresComplianceRepository
    pool, conn, cursor = _mock_pool()
    mock_pool_cls.return_value = pool
    cursor.fetchone.return_value = {"id": "abc-123"}

    repo = PostgresComplianceRepository(
        host="localhost", port=5432, user="u", password="p", dbname="db"
    )
    record = AnalysisRecord(
        id="", created_at=datetime.utcnow(), created_by="user1",
        doc_name="doc.pdf", standard_name="EU AI Act",
        risk_score=50, conclusion="OK", actions=[], para_text="p",
        highlight_terms=[], findings=[],
    )
    result = repo.save_analysis(record)
    assert result == "abc-123"
  • Step 2: Run to verify failure
cd backend
python -m pytest tests/compliance/test_repository.py::test_save_analysis_returns_uuid -v

Expected: ModuleNotFoundError: No module named 'app.infrastructure.compliance'

  • Step 3: Create infrastructure module
# backend/app/infrastructure/compliance/__init__.py
  • Step 4: Implement repository.py
# backend/app/infrastructure/compliance/repository.py
"""PostgreSQL-backed compliance analysis repository.

Follows the same psycopg2 pattern as PostgresDocumentRepository:
ThreadedConnectionPool + RealDictCursor for reads, _ensure_schema on init.
"""
from __future__ import annotations

import json
from contextlib import contextmanager
from datetime import datetime
from typing import Optional

import psycopg2
import psycopg2.extras
import psycopg2.pool
from loguru import logger

from app.domain.compliance.ports import (
    AnalysisRecord,
    ComplianceRepository,
    FindingRecord,
)


class PostgresComplianceRepository(ComplianceRepository):
    """Stores compliance analyses, findings, and finding chat messages in PostgreSQL."""

    def __init__(
        self,
        host: str,
        port: int,
        user: str,
        password: str,
        dbname: str,
        minconn: int = 1,
        maxconn: int = 5,
    ) -> None:
        self._pool = psycopg2.pool.ThreadedConnectionPool(
            minconn=minconn,
            maxconn=maxconn,
            host=host,
            port=port,
            user=user,
            password=password,
            dbname=dbname,
        )
        self._ensure_schema()

    @contextmanager
    def _conn(self):
        conn = self._pool.getconn()
        try:
            yield conn
        finally:
            self._pool.putconn(conn)

    def _ensure_schema(self) -> None:
        """Create tables if they do not exist."""
        with self._conn() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS compliance_analyses (
                        id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
                        created_by      VARCHAR(255),
                        doc_name        VARCHAR(500),
                        standard_name   VARCHAR(500),
                        risk_score      INTEGER,
                        conclusion      TEXT,
                        actions         JSONB,
                        para_text       TEXT,
                        highlight_terms JSONB
                    );
                """)
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS compliance_findings (
                        id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
                        seq         INTEGER NOT NULL,
                        title       VARCHAR(500),
                        description TEXT,
                        status      VARCHAR(50),
                        clause_ref  VARCHAR(200)
                    );
                """)
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS finding_chat_messages (
                        id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        analysis_id UUID NOT NULL REFERENCES compliance_analyses(id) ON DELETE CASCADE,
                        finding_id  UUID NOT NULL REFERENCES compliance_findings(id) ON DELETE CASCADE,
                        role        VARCHAR(20) NOT NULL,
                        content     TEXT NOT NULL,
                        created_at  TIMESTAMPTZ NOT NULL DEFAULT now()
                    );
                """)
            conn.commit()

    def save_analysis(self, record: AnalysisRecord) -> str:
        """Insert analysis + findings; return the new analysis UUID."""
        with self._conn() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    """
                    INSERT INTO compliance_analyses
                        (created_by, doc_name, standard_name, risk_score,
                         conclusion, actions, para_text, highlight_terms)
                    VALUES
                        (%(created_by)s, %(doc_name)s, %(standard_name)s, %(risk_score)s,
                         %(conclusion)s, %(actions)s, %(para_text)s, %(highlight_terms)s)
                    RETURNING id
                    """,
                    {
                        "created_by": record.created_by,
                        "doc_name": record.doc_name,
                        "standard_name": record.standard_name,
                        "risk_score": record.risk_score,
                        "conclusion": record.conclusion,
                        "actions": json.dumps(record.actions, ensure_ascii=False),
                        "para_text": record.para_text,
                        "highlight_terms": json.dumps(record.highlight_terms, ensure_ascii=False),
                    },
                )
                row = cur.fetchone()
                analysis_id = str(row["id"])

            # Insert findings
            if record.findings:
                with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                    for f in record.findings:
                        cur.execute(
                            """
                            INSERT INTO compliance_findings
                                (analysis_id, seq, title, description, status, clause_ref)
                            VALUES
                                (%(analysis_id)s, %(seq)s, %(title)s, %(desc)s, %(status)s, %(clause_ref)s)
                            """,
                            {
                                "analysis_id": analysis_id,
                                "seq": f.seq,
                                "title": f.title,
                                "desc": f.description,
                                "status": f.status,
                                "clause_ref": f.clause_ref,
                            },
                        )
            conn.commit()
        return analysis_id

    def list_analyses(self, limit: int = 50, offset: int = 0) -> list[AnalysisRecord]:
        """Return analyses without nested findings, ordered newest first."""
        with self._conn() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    """
                    SELECT id, created_at, created_by, doc_name, standard_name,
                           risk_score, conclusion, actions, para_text, highlight_terms
                    FROM compliance_analyses
                    ORDER BY created_at DESC
                    LIMIT %(limit)s OFFSET %(offset)s
                    """,
                    {"limit": limit, "offset": offset},
                )
                rows = cur.fetchall()
        return [self._row_to_record(dict(r)) for r in rows]

    def get_analysis(self, analysis_id: str) -> Optional[AnalysisRecord]:
        """Return analysis with nested findings list."""
        with self._conn() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    "SELECT * FROM compliance_analyses WHERE id = %(id)s",
                    {"id": analysis_id},
                )
                row = cur.fetchone()
                if not row:
                    return None
                record = self._row_to_record(dict(row))

                cur.execute(
                    """
                    SELECT id, analysis_id, seq, title, description, status, clause_ref
                    FROM compliance_findings
                    WHERE analysis_id = %(id)s
                    ORDER BY seq
                    """,
                    {"id": analysis_id},
                )
                findings = [
                    FindingRecord(
                        id=str(r["id"]),
                        analysis_id=str(r["analysis_id"]),
                        seq=r["seq"],
                        title=r["title"] or "",
                        description=r["description"] or "",
                        status=r["status"] or "ok",
                        clause_ref=r["clause_ref"],
                    )
                    for r in cur.fetchall()
                ]
                record.findings = findings
        return record

    def delete_analysis(self, analysis_id: str) -> None:
        """Delete analysis; findings and chat messages cascade automatically."""
        with self._conn() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "DELETE FROM compliance_analyses WHERE id = %(id)s",
                    {"id": analysis_id},
                )
            conn.commit()

    def save_message(self, analysis_id: str, finding_id: str, role: str, content: str) -> str:
        """Persist a chat message; return its UUID."""
        with self._conn() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    """
                    INSERT INTO finding_chat_messages
                        (analysis_id, finding_id, role, content)
                    VALUES
                        (%(analysis_id)s, %(finding_id)s, %(role)s, %(content)s)
                    RETURNING id
                    """,
                    {
                        "analysis_id": analysis_id,
                        "finding_id": finding_id,
                        "role": role,
                        "content": content,
                    },
                )
                row = cur.fetchone()
            conn.commit()
        return str(row["id"])

    def get_messages(self, finding_id: str) -> list[dict]:
        """Return messages for a finding, oldest first."""
        with self._conn() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    """
                    SELECT id, role, content, created_at
                    FROM finding_chat_messages
                    WHERE finding_id = %(finding_id)s
                    ORDER BY created_at ASC
                    """,
                    {"finding_id": finding_id},
                )
                rows = cur.fetchall()
        return [
            {
                "id": str(r["id"]),
                "role": r["role"],
                "content": r["content"],
                "created_at": r["created_at"].isoformat() if r["created_at"] else "",
            }
            for r in rows
        ]

    def _row_to_record(self, row: dict) -> AnalysisRecord:
        """Convert a RealDictCursor row to an AnalysisRecord (no findings)."""
        actions = row.get("actions") or []
        if isinstance(actions, str):
            actions = json.loads(actions)
        highlight_terms = row.get("highlight_terms") or []
        if isinstance(highlight_terms, str):
            highlight_terms = json.loads(highlight_terms)
        return AnalysisRecord(
            id=str(row["id"]),
            created_at=row["created_at"] if isinstance(row["created_at"], datetime) else datetime.utcnow(),
            created_by=row.get("created_by"),
            doc_name=row.get("doc_name") or "",
            standard_name=row.get("standard_name") or "",
            risk_score=int(row.get("risk_score") or 0),
            conclusion=row.get("conclusion") or "",
            actions=actions,
            para_text=row.get("para_text") or "",
            highlight_terms=highlight_terms,
            findings=[],
        )
  • Step 5: Run tests
cd backend
python -m pytest tests/compliance/test_repository.py -v

Expected: all pass.


Task 5: DOCX export

Files:

  • Create: backend/app/infrastructure/compliance/docx_export.py

  • Test: backend/tests/compliance/test_repository.py

  • Step 1: Write failing test

Add to backend/tests/compliance/test_repository.py:

from datetime import datetime
from app.domain.compliance.ports import AnalysisRecord, FindingRecord


def test_generate_docx_returns_bytes():
    from app.infrastructure.compliance.docx_export import generate_docx
    record = AnalysisRecord(
        id="test-id", created_at=datetime(2026, 6, 8), created_by="user1",
        doc_name="test.pdf", standard_name="EU AI Act",
        risk_score=72, conclusion="Several gaps found.",
        actions=[{"label": "Fix", "value": "Update CSMS docs"}],
        para_text="The system shall implement CSMS.",
        highlight_terms=["CSMS"],
        findings=[
            FindingRecord(
                id="f1", analysis_id="test-id", seq=0,
                title="Missing CSMS", description="No CSMS cert.",
                status="risk", clause_ref="Art.9.1",
            )
        ],
    )
    data = generate_docx(record)
    assert isinstance(data, bytes)
    assert len(data) > 1000  # DOCX is at minimum a ZIP with ~1 KB overhead
    # Verify it's a valid ZIP (DOCX = ZIP container)
    import zipfile, io
    assert zipfile.is_zipfile(io.BytesIO(data))
  • Step 2: Run to verify failure
cd backend
python -m pytest tests/compliance/test_repository.py::test_generate_docx_returns_bytes -v

Expected: ModuleNotFoundError: No module named 'app.infrastructure.compliance.docx_export'

  • Step 3: Implement docx_export.py
# backend/app/infrastructure/compliance/docx_export.py
"""DOCX report generator for compliance analysis results.

Uses python-docx (already in requirements.txt). Returns raw bytes so the
caller can stream the response without writing to disk.
"""
from __future__ import annotations

from datetime import datetime
from io import BytesIO

from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH

from app.domain.compliance.ports import AnalysisRecord

_STATUS_LABEL = {"ok": "Compliant", "warn": "Warning", "risk": "Non-Compliant"}
_STATUS_COLOR = {
    "ok": RGBColor(0x22, 0x8B, 0x22),     # forest green
    "warn": RGBColor(0xFF, 0x8C, 0x00),    # dark orange
    "risk": RGBColor(0xDC, 0x14, 0x3C),    # crimson
}


def generate_docx(record: AnalysisRecord) -> bytes:
    """Generate a compliance report DOCX and return its raw bytes.

    Structure:
    - Cover: document name, standard, date, risk score
    - Executive summary (conclusion)
    - Findings table
    - Recommended actions
    - Footer note
    """
    doc = Document()

    # ── Cover ──────────────────────────────────────────────────────────────────
    title_para = doc.add_heading("Compliance Analysis Report", level=0)
    title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER

    doc.add_paragraph("")
    meta_table = doc.add_table(rows=4, cols=2)
    meta_table.style = "Table Grid"
    labels = ["Document", "Standard", "Date", "Risk Score"]
    values = [
        record.doc_name,
        record.standard_name,
        record.created_at.strftime("%Y-%m-%d %H:%M UTC") if record.created_at else "",
        f"{record.risk_score} / 100",
    ]
    for i, (label, value) in enumerate(zip(labels, values)):
        meta_table.cell(i, 0).text = label
        meta_table.cell(i, 1).text = value

    # ── Executive Summary ──────────────────────────────────────────────────────
    doc.add_heading("Executive Summary", level=1)
    doc.add_paragraph(record.conclusion)

    # ── Findings ───────────────────────────────────────────────────────────────
    doc.add_heading("Findings", level=1)
    if record.findings:
        table = doc.add_table(rows=1, cols=4)
        table.style = "Table Grid"
        hdr = table.rows[0].cells
        for i, h in enumerate(["#", "Status", "Title", "Description / Clause"]):
            hdr[i].text = h
            for run in hdr[i].paragraphs[0].runs:
                run.bold = True

        for f in record.findings:
            row = table.add_row().cells
            row[0].text = str(f.seq + 1)
            row[1].text = _STATUS_LABEL.get(f.status, f.status)
            row[2].text = f.title
            desc = f.description
            if f.clause_ref:
                desc += f"\n[{f.clause_ref}]"
            row[3].text = desc
    else:
        doc.add_paragraph("No findings recorded.")

    # ── Recommended Actions ────────────────────────────────────────────────────
    doc.add_heading("Recommended Actions", level=1)
    for i, action in enumerate(record.actions, start=1):
        label = action.get("label", "Action")
        value = action.get("value", "")
        doc.add_paragraph(f"{i}. {label}: {value}", style="List Number")

    # ── Footer note ────────────────────────────────────────────────────────────
    doc.add_paragraph("")
    footer = doc.add_paragraph(
        f"Generated by AI Regulation Analysis System — {datetime.utcnow().strftime('%Y-%m-%d')}"
    )
    footer.alignment = WD_ALIGN_PARAGRAPH.CENTER
    for run in footer.runs:
        run.font.size = Pt(8)
        run.font.color.rgb = RGBColor(0x88, 0x88, 0x88)

    buf = BytesIO()
    doc.save(buf)
    return buf.getvalue()
  • Step 4: Run test
cd backend
python -m pytest tests/compliance/test_repository.py::test_generate_docx_returns_bytes -v

Expected: PASS.


Task 6: Bootstrap factory + history API endpoints

Files:

  • Modify: backend/app/shared/bootstrap.py

  • Modify: backend/app/api/routes/compliance.py

  • Step 1: Add get_compliance_repository() to bootstrap.py

At the top of bootstrap.py, add imports (after existing infrastructure imports):

from app.domain.compliance.ports import ComplianceRepository
from app.infrastructure.compliance.repository import PostgresComplianceRepository

After the get_event_store() factory (around line 310), add:

@lru_cache
def get_compliance_repository() -> ComplianceRepository:
    """Return the compliance analysis repository.

    Requires document_repository_backend=postgres and valid postgres_* settings.
    Raises NotImplementedError for any other backend value.
    """
    if settings.document_repository_backend != "postgres":
        raise NotImplementedError(
            f"ComplianceRepository requires document_repository_backend=postgres, "
            f"got '{settings.document_repository_backend}'. "
            "Set DOCUMENT_REPOSITORY_BACKEND=postgres in your .env file."
        )
    return PostgresComplianceRepository(
        host=settings.postgres_host,
        port=settings.postgres_port,
        user=settings.postgres_user,
        password=settings.postgres_password,
        dbname=settings.postgres_db,
    )
  • Step 2: Add auto-save hook in analyze_stream

In backend/app/api/routes/compliance.py, after the yield _sse({"type": "done", **conclusion_data}) line (line 177), add:

# Auto-save analysis to database
try:
    from app.shared.bootstrap import get_compliance_repository
    from app.domain.compliance.ports import AnalysisRecord, FindingRecord
    from datetime import datetime

    repo = get_compliance_repository()
    finding_records = [
        FindingRecord(
            id="",
            analysis_id="",
            seq=i,
            title=f.get("title", ""),
            description=f.get("desc", ""),
            status=f.get("status", "ok"),
            clause_ref=f.get("clause_ref"),
        )
        for i, f in enumerate(findings)
    ]
    record = AnalysisRecord(
        id="",
        created_at=datetime.utcnow(),
        created_by=current_user.username if hasattr(current_user, "username") else None,
        doc_name=file_name or (title or "Pasted text"),
        standard_name=title or "",
        risk_score=conclusion_data.get("risk_score", 0),
        conclusion=conclusion_data.get("conclusion", ""),
        actions=conclusion_data.get("actions", []),
        para_text=conclusion_data.get("para_text", ""),
        highlight_terms=conclusion_data.get("highlight_terms", []),
        findings=finding_records,
    )
    analysis_id = await asyncio.to_thread(repo.save_analysis, record)
    yield _sse({"type": "saved", "analysis_id": analysis_id})
except NotImplementedError:
    pass  # No postgres backend configured — skip saving
except Exception as exc:
    logger.warning("Failed to auto-save compliance analysis: {}", exc)
  • Step 3: Add history endpoints to compliance.py

Add these routes after the existing compliance_chat route:

@router.get("/history")
async def list_history(
    limit: int = 20,
    offset: int = 0,
    current_user: UserClaims = Depends(get_current_user),
):
    """Return paginated list of saved compliance analyses (newest first)."""
    from app.shared.bootstrap import get_compliance_repository
    try:
        repo = get_compliance_repository()
        records = await asyncio.to_thread(repo.list_analyses, limit, offset)
        return [
            {
                "id": r.id,
                "created_at": r.created_at.isoformat(),
                "created_by": r.created_by,
                "doc_name": r.doc_name,
                "standard_name": r.standard_name,
                "risk_score": r.risk_score,
                "finding_count": len(r.findings),
            }
            for r in records
        ]
    except NotImplementedError:
        return []


@router.get("/history/{analysis_id}")
async def get_history_item(
    analysis_id: str,
    current_user: UserClaims = Depends(get_current_user),
):
    """Return full analysis record including findings."""
    from app.shared.bootstrap import get_compliance_repository
    from fastapi import HTTPException
    repo = get_compliance_repository()
    record = await asyncio.to_thread(repo.get_analysis, analysis_id)
    if not record:
        raise HTTPException(status_code=404, detail="Analysis not found")
    return {
        "id": record.id,
        "created_at": record.created_at.isoformat(),
        "created_by": record.created_by,
        "doc_name": record.doc_name,
        "standard_name": record.standard_name,
        "risk_score": record.risk_score,
        "conclusion": record.conclusion,
        "actions": record.actions,
        "para_text": record.para_text,
        "highlight_terms": record.highlight_terms,
        "findings": [
            {
                "id": f.id,
                "seq": f.seq,
                "title": f.title,
                "description": f.description,
                "status": f.status,
                "clause_ref": f.clause_ref,
            }
            for f in record.findings
        ],
    }


@router.delete("/history/{analysis_id}", status_code=204)
async def delete_history_item(
    analysis_id: str,
    current_user: UserClaims = Depends(get_current_user),
):
    """Delete a saved analysis (cascade removes findings and chat messages)."""
    from app.shared.bootstrap import get_compliance_repository
    repo = get_compliance_repository()
    await asyncio.to_thread(repo.delete_analysis, analysis_id)


@router.get("/history/{analysis_id}/download")
async def download_history_docx(
    analysis_id: str,
    current_user: UserClaims = Depends(get_current_user),
):
    """Return a DOCX compliance report for the given analysis."""
    from app.shared.bootstrap import get_compliance_repository
    from app.infrastructure.compliance.docx_export import generate_docx
    from fastapi import HTTPException
    from fastapi.responses import Response

    repo = get_compliance_repository()
    record = await asyncio.to_thread(repo.get_analysis, analysis_id)
    if not record:
        raise HTTPException(status_code=404, detail="Analysis not found")

    docx_bytes = await asyncio.to_thread(generate_docx, record)
    safe_name = (record.doc_name or "report").replace(" ", "_")[:50]
    filename = f"compliance_{safe_name}_{record.created_at.strftime('%Y%m%d')}.docx"
    return Response(
        content=docx_bytes,
        media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )
  • Step 4: Smoke test endpoints

Start the backend and run:

# List history (empty initially)
curl http://localhost:8000/api/v1/compliance/history \
  -H "Authorization: Bearer <token>"
# Expected: []

# Trigger an analysis and verify the response includes a "saved" SSE event
curl -X POST http://localhost:8000/api/v1/compliance/analyze-stream \
  -H "Authorization: Bearer <token>" \
  -F "text=System shall implement CSMS per ISO 21434." \
  --no-buffer | grep '"type":"saved"'
# Expected: data: {"type": "saved", "analysis_id": "<uuid>"}

Task 7: Frontend — PageStateContext update + HistoryRail component

Files:

  • Modify: frontend/src/contexts/PageStateContext.tsx

  • Create: frontend/src/pages/Compliance/HistoryRail.tsx

  • Modify: frontend/src/pages/Compliance/CompliancePage.tsx

  • Step 1: Update ComplianceState in PageStateContext.tsx

Find the ComplianceState interface (around line 78) and add two fields:

export interface ComplianceState {
  status: ComplianceStatus;
  stageLabel: string;
  stageKey: string;
  meta: ComplianceMeta | null;
  sources: ComplianceSourceEvent[];
  findings: ComplianceFindingEvent[];
  done: ComplianceDonePayload | null;
  errorText: string;
  // Direction B additions:
  analysisId: string | null;
  isReadOnly: boolean;
}

Update COMPLIANCE_INIT to include the new fields:

const COMPLIANCE_INIT: ComplianceState = {
  status: 'idle',
  stageLabel: '',
  stageKey: '',
  meta: null,
  sources: [],
  findings: [],
  done: null,
  errorText: '',
  analysisId: null,
  isReadOnly: false,
};
  • Step 2: Create HistoryRail.tsx
// frontend/src/pages/Compliance/HistoryRail.tsx
import { useEffect, useState, useCallback } from 'react';
import { Download, Trash2 } from 'lucide-react';

const TOKEN_KEY = 'auth_token';
function authHeader(): Record<string, string> {
  const t = localStorage.getItem(TOKEN_KEY);
  return t ? { Authorization: `Bearer ${t}` } : {};
}

interface HistoryItem {
  id: string;
  created_at: string;
  doc_name: string;
  standard_name: string;
  risk_score: number;
  finding_count: number;
}

interface Props {
  refreshTrigger: number; // increment to force re-fetch (after new analysis saved)
  onSelect: (id: string) => void;
  selectedId: string | null;
}

function riskClass(score: number): string {
  if (score >= 70) return 'risk-high';
  if (score >= 40) return 'risk-medium';
  return 'risk-low';
}

export function HistoryRail({ refreshTrigger, onSelect, selectedId }: Props) {
  const [items, setItems] = useState<HistoryItem[]>([]);
  const [deletingId, setDeletingId] = useState<string | null>(null);

  const fetchHistory = useCallback(() => {
    fetch('/api/v1/compliance/history?limit=30', { headers: authHeader() })
      .then(r => r.json())
      .then(data => {
        if (Array.isArray(data)) setItems(data);
      })
      .catch(() => {/* backend may not have postgres configured */});
  }, []);

  useEffect(() => { fetchHistory(); }, [fetchHistory, refreshTrigger]);

  function handleDownload(e: React.MouseEvent, item: HistoryItem) {
    e.stopPropagation();
    const url = `/api/v1/compliance/history/${item.id}/download`;
    const a = document.createElement('a');
    a.href = url;
    a.download = '';
    // Pass auth via query param isn't ideal; use fetch + blob for token-auth download
    fetch(url, { headers: authHeader() })
      .then(r => r.blob())
      .then(blob => {
        const blobUrl = URL.createObjectURL(blob);
        const link = document.createElement('a');
        link.href = blobUrl;
        link.download = `compliance-${item.doc_name.slice(0, 30)}.docx`;
        link.click();
        URL.revokeObjectURL(blobUrl);
      });
  }

  function handleDelete(e: React.MouseEvent, item: HistoryItem) {
    e.stopPropagation();
    if (!window.confirm(`Delete analysis for "${item.doc_name}"?`)) return;
    setDeletingId(item.id);
    fetch(`/api/v1/compliance/history/${item.id}`, {
      method: 'DELETE',
      headers: authHeader(),
    })
      .then(() => {
        setItems(prev => prev.filter(i => i.id !== item.id));
        setDeletingId(null);
      })
      .catch(() => setDeletingId(null));
  }

  function formatDate(iso: string): string {
    try {
      return new Date(iso).toLocaleDateString(undefined, { month: 'short', day: 'numeric' });
    } catch {
      return iso.slice(0, 10);
    }
  }

  if (items.length === 0) {
    return (
      <div className="history-pane" style={{ minWidth: 200, maxWidth: 220 }}>
        <div className="history-header">History</div>
        <p style={{ padding: '12px 16px', fontSize: 12, color: 'var(--muted)', lineHeight: 1.5 }}>
          Completed analyses appear here.
        </p>
      </div>
    );
  }

  return (
    <div className="history-pane" style={{ minWidth: 200, maxWidth: 220, overflowY: 'auto' }}>
      <div className="history-header">History</div>
      {items.map(item => (
        <div
          key={item.id}
          className={`quick-item${selectedId === item.id ? ' active' : ''}`}
          onClick={() => onSelect(item.id)}
          style={{ cursor: 'pointer' }}
        >
          <div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 2 }}>
            {formatDate(item.created_at)}
          </div>
          <div style={{ fontSize: 12, fontWeight: 500, marginBottom: 4, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
            {item.doc_name || 'Untitled'}
          </div>
          <div style={{ display: 'flex', alignItems: 'center', gap: 6 }}>
            <span className={`risk-badge ${riskClass(item.risk_score)}`} style={{ fontSize: 10 }}>
              {item.risk_score}
            </span>
            <span style={{ fontSize: 10, color: 'var(--muted)', flex: 1 }}>
              {item.finding_count} finding{item.finding_count !== 1 ? 's' : ''}
            </span>
            <button
              className="btn icon-btn"
              title="Download DOCX"
              onClick={e => handleDownload(e, item)}
              style={{ padding: '2px 4px' }}
            >
              <Download size={11} />
            </button>
            <button
              className="btn icon-btn danger"
              title="Delete"
              disabled={deletingId === item.id}
              onClick={e => handleDelete(e, item)}
              style={{ padding: '2px 4px' }}
            >
              <Trash2 size={11} />
            </button>
          </div>
        </div>
      ))}
    </div>
  );
}
  • Step 3: Integrate HistoryRail into CompliancePage.tsx

At the top of CompliancePage.tsx, add imports:

import { HistoryRail } from './HistoryRail';

Inside the CompliancePage component, add state:

const [historyRefresh, setHistoryRefresh] = useState(0);

In useComplianceAnalysis hook usage in CompliancePage, the run SSE loop now emits a saved event. Handle it in useComplianceAnalysis.ts — add in the SSE parsing block (after the done handler):

} else if (j.type === 'saved') {
  setState(s => ({ ...s, analysisId: j.analysis_id ?? null }));
}

Back in CompliancePage.tsx, watch for state.analysisId changes and trigger history re-fetch:

const prevAnalysisIdRef = useRef<string | null>(null);
useEffect(() => {
  if (state.analysisId && state.analysisId !== prevAnalysisIdRef.current) {
    prevAnalysisIdRef.current = state.analysisId;
    setHistoryRefresh(n => n + 1);
  }
}, [state.analysisId]);

Add a handleSelectHistory function that loads a historical record:

async function handleSelectHistory(id: string) {
  const res = await fetch(`/api/v1/compliance/history/${id}`, { headers: authHeader() });
  if (!res.ok) return;
  const data = await res.json();
  // Reconstruct state from historical record (read-only view)
  setComplianceState({
    status: 'done',
    stageLabel: 'Complete',
    stageKey: 'concluding',
    meta: { title: data.doc_name, sourceType: 'doc', startedAt: data.created_at },
    sources: [],
    findings: (data.findings || []).map((f: any) => ({
      title: f.title,
      desc: f.description,
      status: f.status,
      clause_ref: f.clause_ref,
    })),
    done: {
      conclusion: data.conclusion,
      actions: data.actions,
      risk_score: data.risk_score,
      highlight_terms: data.highlight_terms,
      para_text: data.para_text,
    },
    errorText: '',
    analysisId: data.id,
    isReadOnly: true,
  });
}

Note: setComplianceState is available from usePageState() — add it to the destructuring alongside complianceState.

In the JSX return, wrap the existing 3-column workspace with a flex row that includes HistoryRail on the left:

<div style={{ display: 'flex', flex: 1, overflow: 'hidden' }}>
  <HistoryRail
    refreshTrigger={historyRefresh}
    onSelect={handleSelectHistory}
    selectedId={state.analysisId}
  />
  <div style={{ flex: 1, overflow: 'hidden' }}>
    {/* existing 3-column workspace JSX here */}
  </div>
</div>
  • Step 4: TypeScript check
cd frontend
npx tsc --noEmit

Expected: 0 errors.


Direction C Tasks

Task 8: build_finding_context + generate_suggestions in pipeline

Files:

  • Modify: backend/app/application/compliance/pipeline.py

  • Test: backend/tests/compliance/test_pipeline.py

  • Step 1: Write failing tests

Add to backend/tests/compliance/test_pipeline.py:

from app.domain.compliance.ports import AnalysisRecord, FindingRecord
from datetime import datetime


def _sample_analysis() -> AnalysisRecord:
    return AnalysisRecord(
        id="a1", created_at=datetime(2026, 6, 8), created_by="u",
        doc_name="doc.pdf", standard_name="EU AI Act",
        risk_score=72, conclusion="Gaps found.", actions=[], para_text="para",
        highlight_terms=[], findings=[],
    )


def _sample_finding(status: str = "risk") -> FindingRecord:
    return FindingRecord(
        id="f1", analysis_id="a1", seq=0,
        title="Missing CSMS", description="No CSMS certification.",
        status=status, clause_ref="Art.9.1",
    )


def test_build_finding_context_contains_required_fields():
    from app.application.compliance.pipeline import build_finding_context
    ctx = build_finding_context(_sample_finding(), _sample_analysis())
    assert "doc.pdf" in ctx
    assert "EU AI Act" in ctx
    assert "Missing CSMS" in ctx
    assert "Art.9.1" in ctx


def test_generate_suggestions_returns_three_questions():
    from app.application.compliance.pipeline import generate_suggestions
    client = _make_mock_client(
        '{"questions": ["Q1?", "Q2?", "Q3?"]}'
    )
    questions = generate_suggestions(_sample_finding("risk"), _sample_analysis(), client)
    assert len(questions) == 3
    assert all(isinstance(q, str) for q in questions)


def test_generate_suggestions_falls_back_on_error():
    from app.application.compliance.pipeline import generate_suggestions
    bad_client = MagicMock()
    bad_resp = MagicMock()
    bad_resp.is_success = False
    bad_client.chat.return_value = bad_resp
    questions = generate_suggestions(_sample_finding(), _sample_analysis(), bad_client)
    assert len(questions) == 3  # fallback always returns 3
  • Step 2: Run to verify failure
cd backend
python -m pytest tests/compliance/test_pipeline.py::test_build_finding_context_contains_required_fields -v

Expected: ImportError: cannot import name 'build_finding_context'

  • Step 3: Implement both functions in pipeline.py

Add at the end of pipeline.py:

from app.domain.compliance.ports import AnalysisRecord, FindingRecord

_SUGGESTION_FOCUS = {
    "risk": "Focus on remediation steps, required certifications, and timeline to resolve.",
    "warn": "Focus on identifying the specific compliance gap and how to close it.",
    "ok": "Focus on maintaining compliance evidence and monitoring future changes.",
}

_SUGGESTION_FALLBACK = {
    "risk": [
        "What specific certifications or documents are required to remediate this finding?",
        "What is the typical remediation timeline for this type of non-compliance?",
        "Which regulation clause defines the exact requirement?",
    ],
    "warn": [
        "What is the exact gap between the current state and the requirement?",
        "What evidence would demonstrate partial compliance?",
        "Which regulation clause applies to this warning?",
    ],
    "ok": [
        "What documentation should be maintained to evidence this compliance?",
        "How should this area be monitored as regulations evolve?",
        "Are there related clauses that may affect this compliant area?",
    ],
}


def build_finding_context(finding: "FindingRecord", analysis: "AnalysisRecord") -> str:
    """Build a grounded system context string for a finding chat thread.

    Combines finding details with analysis metadata so the LLM has full
    context without relying on the frontend to pass segment_context.
    """
    return (
        f"Document: {analysis.doc_name}\n"
        f"Standard: {analysis.standard_name}\n"
        f"Finding [{finding.seq + 1}]: {finding.title}\n"
        f"Status: {finding.status}\n"
        f"Clause reference: {finding.clause_ref or 'N/A'}\n"
        f"Description: {finding.description}\n"
        f"Overall conclusion: {analysis.conclusion}\n"
    )


def generate_suggestions(
    finding: "FindingRecord",
    analysis: "AnalysisRecord",
    client: "BaseLLMClient",
) -> list[str]:
    """Generate 3 context-aware follow-up questions for a finding chat thread.

    Returns exactly 3 question strings. Falls back to static templates on error.
    """
    fallback = _SUGGESTION_FALLBACK.get(finding.status, _SUGGESTION_FALLBACK["warn"])
    context = build_finding_context(finding, analysis)
    focus = _SUGGESTION_FOCUS.get(finding.status, _SUGGESTION_FOCUS["warn"])
    prompt = (
        f"{context}\n\n"
        f"Task: {focus}\n"
        "Generate exactly 3 concise follow-up questions a compliance analyst would ask.\n"
        'Return JSON: {"questions": ["question 1", "question 2", "question 3"]}\n'
        "Return ONLY the JSON object."
    )
    response = client.chat([{"role": "user", "content": prompt}], max_tokens=300)
    if not response.is_success:
        return fallback
    try:
        result = _extract_json(response.content)
        questions = result.get("questions", [])
        if isinstance(questions, list) and len(questions) >= 3:
            return [str(q) for q in questions[:3]]
    except (ValueError, TypeError) as exc:
        logger.warning("generate_suggestions JSON parse failed: {}", exc)
    return fallback
  • Step 4: Run all pipeline tests
cd backend
python -m pytest tests/compliance/test_pipeline.py -v

Expected: all pass.


Task 9: Finding chat API endpoints

Files:

  • Modify: backend/app/api/routes/compliance.py

  • Step 1: Add three finding-chat endpoints

Add these routes after the history download endpoint:

@router.get("/analyses/{analysis_id}/findings/{finding_id}/chat")
async def get_finding_chat_history(
    analysis_id: str,
    finding_id: str,
    current_user: UserClaims = Depends(get_current_user),
):
    """Return persisted chat messages for a finding thread, oldest first."""
    from app.shared.bootstrap import get_compliance_repository
    try:
        repo = get_compliance_repository()
        messages = await asyncio.to_thread(repo.get_messages, finding_id)
        return messages
    except NotImplementedError:
        return []


@router.post("/analyses/{analysis_id}/findings/{finding_id}/suggestions")
async def get_finding_suggestions(
    analysis_id: str,
    finding_id: str,
    current_user: UserClaims = Depends(get_current_user),
):
    """Generate 3 LLM-powered follow-up question suggestions for a finding."""
    from app.application.compliance.pipeline import generate_suggestions, build_finding_context
    from app.shared.bootstrap import get_compliance_repository
    from app.services.llm.llm_factory import get_llm_client
    from fastapi import HTTPException

    repo = get_compliance_repository()
    analysis = await asyncio.to_thread(repo.get_analysis, analysis_id)
    if not analysis:
        raise HTTPException(status_code=404, detail="Analysis not found")

    finding = next((f for f in analysis.findings if f.id == finding_id), None)
    if not finding:
        raise HTTPException(status_code=404, detail="Finding not found")

    client = get_llm_client(provider=settings.llm_provider, model=settings.llm_model)
    questions = await asyncio.to_thread(generate_suggestions, finding, analysis, client)
    return {"questions": questions}


@router.post("/analyses/{analysis_id}/findings/{finding_id}/chat")
async def finding_chat(
    analysis_id: str,
    finding_id: str,
    request: ComplianceChatRequest,
    current_user: UserClaims = Depends(get_current_user),
):
    """Stream a grounded chat response for a specific finding.

    Loads the finding and analysis from DB to build grounded context.
    Persists both user message and assistant response to finding_chat_messages.
    """
    from app.application.compliance.pipeline import build_finding_context
    from app.shared.bootstrap import get_compliance_repository
    from app.services.llm.llm_factory import get_llm_client
    from fastapi import HTTPException

    repo = get_compliance_repository()
    analysis = await asyncio.to_thread(repo.get_analysis, analysis_id)
    if not analysis:
        raise HTTPException(status_code=404, detail="Analysis not found")
    finding = next((f for f in analysis.findings if f.id == finding_id), None)
    if not finding:
        raise HTTPException(status_code=404, detail="Finding not found")

    # Persist user message
    await asyncio.to_thread(
        repo.save_message, analysis_id, finding_id, "user", request.query
    )

    # Build message history (last 10 messages = 5 turns)
    history = await asyncio.to_thread(repo.get_messages, finding_id)
    history_messages = [
        {"role": m["role"], "content": m["content"]}
        for m in history[-10:]
    ]

    # Build grounded system context
    system_context = build_finding_context(finding, analysis)
    full_query = f"[Compliance Finding Context]\n{system_context}\n\nUser question: {request.query}"

    # Prepend history except the last user message (already included in full_query)
    messages_for_llm = history_messages[:-1] + [{"role": "user", "content": full_query}]

    assistant_buffer = []

    async def generate() -> AsyncGenerator[str, None]:
        nonlocal assistant_buffer
        try:
            _, event_stream = get_agent_conversation_service().stream_chat(
                query=full_query,
                top_k=5,
                prompt_template="compliance_qa",
            )
            for event in event_stream:
                event_type = event.get("event", "")
                if event_type == "content":
                    text = event.get("data", "")
                    if text:
                        assistant_buffer.append(text)
                        yield _sse({"type": "chunk", "text": text})
                elif event_type == "done":
                    yield _sse({"type": "done"})
                await asyncio.sleep(0)
        except Exception as exc:
            logger.exception("finding_chat stream error")
            yield _sse({"type": "error", "text": str(exc)})
        finally:
            # Persist assistant response after stream completes
            full_response = "".join(assistant_buffer)
            if full_response:
                try:
                    await asyncio.to_thread(
                        repo.save_message, analysis_id, finding_id, "assistant", full_response
                    )
                except Exception as exc:
                    logger.warning("Failed to persist assistant message: {}", exc)

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
    )
  • Step 2: Smoke test
# Get suggestions for a finding (replace IDs with real values from a previous analysis)
curl -X POST \
  "http://localhost:8000/api/v1/compliance/analyses/<analysis_id>/findings/<finding_id>/suggestions" \
  -H "Authorization: Bearer <token>"
# Expected: {"questions": ["...", "...", "..."]}

# Get chat history (empty initially)
curl "http://localhost:8000/api/v1/compliance/analyses/<analysis_id>/findings/<finding_id>/chat" \
  -H "Authorization: Bearer <token>"
# Expected: []

Task 10: FindingChatDrawer component

Files:

  • Create: frontend/src/pages/Compliance/FindingChatDrawer.tsx

  • Modify: frontend/src/pages/Compliance/CompliancePage.tsx

  • Modify: frontend/src/contexts/PageStateContext.tsx

  • Step 1: Add activeFindingId to PageStateContext.tsx

In the ComplianceState interface (already updated in Task 7), add:

activeFindingId: string | null;   // finding UUID from DB (not index)

In COMPLIANCE_INIT, add:

activeFindingId: null,
  • Step 2: Create FindingChatDrawer.tsx
// frontend/src/pages/Compliance/FindingChatDrawer.tsx
import { useEffect, useRef, useState } from 'react';
import { X, Send } from 'lucide-react';
import type { ComplianceFindingEvent } from '../../contexts';

const TOKEN_KEY = 'auth_token';
function authHeader(): Record<string, string> {
  const t = localStorage.getItem(TOKEN_KEY);
  return t ? { Authorization: `Bearer ${t}` } : {};
}

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
}

interface Props {
  analysisId: string;
  findingId: string;
  finding: { title: string; desc: string; status: string; clause_ref?: string };
  onClose: () => void;
}

export function FindingChatDrawer({ analysisId, findingId, finding, onClose }: Props) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [suggestions, setSuggestions] = useState<string[]>([]);
  const [input, setInput] = useState('');
  const [loading, setLoading] = useState(false);
  const [loadingHistory, setLoadingHistory] = useState(true);
  const abortRef = useRef<AbortController | null>(null);
  const bottomRef = useRef<HTMLDivElement>(null);

  // Load history + suggestions on open
  useEffect(() => {
    setLoadingHistory(true);
    // Fetch history
    fetch(`/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/chat`, {
      headers: authHeader(),
    })
      .then(r => r.json())
      .then((data: Message[]) => {
        setMessages(data.map(m => ({ id: m.id, role: m.role, content: m.content })));
        setLoadingHistory(false);
        // Only fetch suggestions if no history
        if (!data.length) {
          fetch(
            `/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/suggestions`,
            { method: 'POST', headers: authHeader() }
          )
            .then(r => r.json())
            .then(d => { if (Array.isArray(d?.questions)) setSuggestions(d.questions); })
            .catch(() => {});
        }
      })
      .catch(() => setLoadingHistory(false));

    return () => { abortRef.current?.abort(); };
  }, [analysisId, findingId]);

  // Auto-scroll to bottom
  useEffect(() => {
    bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);

  async function send(text?: string) {
    const q = (text ?? input).trim();
    if (!q || loading) return;
    setInput('');
    setSuggestions([]); // hide chips after first message

    const assistantId = `ast-${Date.now()}`;
    setMessages(prev => [
      ...prev,
      { id: `usr-${Date.now()}`, role: 'user', content: q },
      { id: assistantId, role: 'assistant', content: '' },
    ]);
    setLoading(true);

    const ctrl = new AbortController();
    abortRef.current = ctrl;

    try {
      const res = await fetch(
        `/api/v1/compliance/analyses/${analysisId}/findings/${findingId}/chat`,
        {
          method: 'POST',
          headers: { 'Content-Type': 'application/json', ...authHeader() },
          body: JSON.stringify({ query: q }),
          signal: ctrl.signal,
        }
      );
      if (!res.body) { setLoading(false); return; }
      const reader = res.body.getReader();
      const dec = new TextDecoder();
      let buf = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        buf += dec.decode(value, { stream: true });
        const blocks = buf.split('\n\n');
        buf = blocks.pop() ?? '';
        for (const block of blocks) {
          const dl = block.split('\n').find(l => l.startsWith('data: '));
          if (!dl) continue;
          try {
            const j = JSON.parse(dl.slice(6));
            if (j.type === 'chunk' && j.text) {
              setMessages(prev =>
                prev.map(m => m.id === assistantId ? { ...m, content: m.content + j.text } : m)
              );
            }
          } catch { /* skip */ }
        }
      }
    } catch (e: unknown) {
      if (e instanceof Error && e.name !== 'AbortError') {
        setMessages(prev =>
          prev.map(m => m.id === assistantId ? { ...m, content: 'Error reaching server.' } : m)
        );
      }
    } finally {
      setLoading(false);
    }
  }

  const STATUS_COLOR: Record<string, string> = {
    risk: 'var(--danger, #dc143c)',
    warn: 'var(--warning, #ff8c00)',
    ok: 'var(--success, #228b22)',
  };

  return (
    <div
      style={{
        position: 'fixed', right: 0, top: 0, bottom: 0, width: 420,
        background: 'var(--surface)', borderLeft: '1px solid var(--border)',
        display: 'flex', flexDirection: 'column', zIndex: 200,
        boxShadow: '-4px 0 16px rgba(0,0,0,0.12)',
      }}
    >
      {/* Header */}
      <div style={{
        padding: '14px 16px', borderBottom: '1px solid var(--border)',
        display: 'flex', alignItems: 'flex-start', gap: 10,
      }}>
        <div style={{ flex: 1, minWidth: 0 }}>
          <div style={{ fontSize: 11, color: STATUS_COLOR[finding.status] || 'var(--muted)', fontWeight: 600, marginBottom: 2 }}>
            {finding.status.toUpperCase()}
            {finding.clause_ref && <span style={{ fontWeight: 400, marginLeft: 6, color: 'var(--muted)' }}>{finding.clause_ref}</span>}
          </div>
          <div style={{ fontSize: 13, fontWeight: 600, overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>
            {finding.title}
          </div>
          <div style={{ fontSize: 11, color: 'var(--muted)', marginTop: 2, lineHeight: 1.4 }}>
            {finding.desc.length > 100 ? finding.desc.slice(0, 100) + '…' : finding.desc}
          </div>
        </div>
        <button className="btn icon-btn" onClick={onClose} style={{ flexShrink: 0 }}>
          <X size={14} />
        </button>
      </div>

      {/* Suggestion chips — shown only before first user message */}
      {suggestions.length > 0 && (
        <div style={{ padding: '10px 16px', borderBottom: '1px solid var(--border)' }}>
          <div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 6 }}>Suggested questions</div>
          <div style={{ display: 'flex', flexDirection: 'column', gap: 6 }}>
            {suggestions.map((q, i) => (
              <button
                key={i}
                className="chip"
                style={{ textAlign: 'left', whiteSpace: 'normal', height: 'auto', padding: '6px 10px' }}
                onClick={() => send(q)}
              >
                {q}
              </button>
            ))}
          </div>
        </div>
      )}

      {/* Messages */}
      <div style={{ flex: 1, overflowY: 'auto', padding: '12px 16px', display: 'flex', flexDirection: 'column', gap: 10 }}>
        {loadingHistory && (
          <p style={{ fontSize: 12, color: 'var(--muted)', textAlign: 'center' }}>Loading history</p>
        )}
        {messages.map(msg => (
          <div key={msg.id} className={`message msg-${msg.role}`} style={{ maxWidth: '100%' }}>
            {msg.role === 'assistant' && <div className="msg-avatar">AI</div>}
            <div className="msg-bubble" style={{ fontSize: 13, whiteSpace: 'pre-wrap' }}>
              {msg.content || (loading ? '…' : '')}
            </div>
            {msg.role === 'user' && <div className="msg-avatar user-av">You</div>}
          </div>
        ))}
        <div ref={bottomRef} />
      </div>

      {/* Composer */}
      <div style={{ padding: '10px 16px', borderTop: '1px solid var(--border)', display: 'flex', gap: 8 }}>
        <textarea
          className="composer-input"
          placeholder="Ask about this finding…"
          value={input}
          rows={2}
          style={{ flex: 1, fontSize: 13 }}
          onChange={e => setInput(e.target.value)}
          onKeyDown={e => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); send(); } }}
        />
        <button
          className="btn primary"
          disabled={!input.trim() || loading}
          onClick={() => send()}
          style={{ alignSelf: 'flex-end' }}
        >
          <Send size={14} />
        </button>
      </div>
    </div>
  );
}
  • Step 3: Wire FindingChatDrawer into CompliancePage.tsx

Add import:

import { FindingChatDrawer } from './FindingChatDrawer';

In the finding card JSX (inside the findings list, around the existing chat.openFor(i, f) button), replace the existing button with a "💬 Chat" button that uses the DB finding ID. Because historical findings now have UUIDs, update the button to use state.findings index and state.analysisId:

Find the existing findings list map in CompliancePage.tsx (around line 380420) and locate the Chat button that currently calls chat.openFor(i, f). Replace the entire useFindingChat() inline chat panel logic with the new drawer:

// Add at the top of CompliancePage component body:
const [drawerFindingIdx, setDrawerFindingIdx] = useState<number | null>(null);

// Inside the findings list map, find the existing chat button and replace with:
<button
  className="btn sm"
  onClick={() => setDrawerFindingIdx(i)}
  style={{ marginTop: 6 }}
>
  💬 Chat
</button>

At the bottom of the CompliancePage JSX return (before the closing </div>), add the drawer:

{drawerFindingIdx !== null && state.analysisId && (() => {
  // We need the finding's DB UUID — fetch it from the history endpoint
  // For now, use the finding index as a fallback; Task 9 wires real UUIDs
  // The analysisId is available from state; finding UUID needs the analysis record
  // Fetch on demand via handleSelectHistory pattern:
  return (
    <_FindingChatDrawerWrapper
      analysisId={state.analysisId}
      findingIndex={drawerFindingIdx}
      finding={state.findings[drawerFindingIdx]}
      onClose={() => setDrawerFindingIdx(null)}
    />
  );
})()}

Because finding UUIDs come from the database record, add a helper wrapper that fetches them:

function _FindingChatDrawerWrapper({
  analysisId, findingIndex, finding, onClose,
}: {
  analysisId: string;
  findingIndex: number;
  finding: ComplianceFindingEvent;
  onClose: () => void;
}) {
  const [findingId, setFindingId] = useState<string | null>(null);

  useEffect(() => {
    fetch(`/api/v1/compliance/history/${analysisId}`, { headers: authHeader() })
      .then(r => r.json())
      .then(data => {
        const f = (data.findings || []).find((f: any) => f.seq === findingIndex);
        if (f?.id) setFindingId(f.id);
      })
      .catch(() => {});
  }, [analysisId, findingIndex]);

  if (!findingId) return null;
  return (
    <FindingChatDrawer
      analysisId={analysisId}
      findingId={findingId}
      finding={finding}
      onClose={onClose}
    />
  );
}
  • Step 4: TypeScript check
cd frontend
npx tsc --noEmit

Expected: 0 errors.

  • Step 5: End-to-end smoke test
  1. Start backend + frontend
  2. Run a compliance analysis with a text that produces at least 2 findings
  3. Verify the saved SSE event appears and History Rail shows the new record
  4. Click the "💬 Chat" button on any finding
  5. Verify the Drawer opens with 3 suggestion chips
  6. Click one suggestion — verify it streams a response
  7. Close and reopen the Drawer — verify the conversation history is restored
  8. Click [↓] in History Rail — verify a DOCX file downloads
  9. Click [×] in History Rail → confirm dialog → verify record disappears

Verification Checklist

  • Direction A: SSE stream emits one stage:analyzing event (not per-clause), all findings still appear, highlight_terms in done event is non-empty
  • Direction A: PassThroughReranker importable from app.infrastructure.vectorstore.pass_through_reranker
  • Direction B: Running analysis emits {"type":"saved","analysis_id":"<uuid>"} SSE event
  • Direction B: GET /api/v1/compliance/history returns list with the saved record
  • Direction B: GET /api/v1/compliance/history/<id>/download returns a valid DOCX file
  • Direction B: DELETE /api/v1/compliance/history/<id> removes the record
  • Direction B: History Rail renders in the UI and updates after analysis completes
  • Direction C: Suggestions endpoint returns 3 questions for a valid finding
  • Direction C: Chat endpoint streams a grounded response referencing finding context
  • Direction C: Reopening the Drawer shows previous conversation
  • All backend tests pass: python -m pytest tests/compliance/ -v
  • TypeScript check passes: npx tsc --noEmit (0 errors)