From 06e0967128518990cc503671c597d58c63418819 Mon Sep 17 00:00:00 2001 From: wangwei Date: Fri, 5 Jun 2026 09:00:36 +0800 Subject: [PATCH] add --- backend/app/api/routes/compliance.py | 130 +- .../app/application/compliance/__init__.py | 1 + .../app/application/compliance/pipeline.py | 215 +++ backend/app/schemas/compliance.py | 28 +- .../2026-06-04-compliance-new-analysis.md | 1489 +++++++++++++++++ .../plans/2026-06-04-frontend-optimization.md | 1127 +++++++++++++ .../src/pages/Compliance/CompliancePage.tsx | 571 ++++++- .../src/pages/Compliance/NewAnalysisModal.tsx | 241 +++ .../pages/Compliance/useComplianceAnalysis.ts | 161 ++ frontend/src/pages/Docs/DocsPage.tsx | 265 ++- frontend/src/pages/Docs/UploadModal.tsx | 4 +- frontend/src/pages/Status/StatusPage.tsx | 361 ++-- frontend/src/styles/globals.css | 206 ++- 13 files changed, 4560 insertions(+), 239 deletions(-) create mode 100644 backend/app/application/compliance/__init__.py create mode 100644 backend/app/application/compliance/pipeline.py create mode 100644 docs/superpowers/plans/2026-06-04-compliance-new-analysis.md create mode 100644 docs/superpowers/plans/2026-06-04-frontend-optimization.md create mode 100644 frontend/src/pages/Compliance/NewAnalysisModal.tsx create mode 100644 frontend/src/pages/Compliance/useComplianceAnalysis.ts diff --git a/backend/app/api/routes/compliance.py b/backend/app/api/routes/compliance.py index 813a53a..1c61486 100644 --- a/backend/app/api/routes/compliance.py +++ b/backend/app/api/routes/compliance.py @@ -5,17 +5,19 @@ from __future__ import annotations import asyncio import json from pathlib import Path -from typing import AsyncGenerator +from typing import AsyncGenerator, Optional -from fastapi import APIRouter, File, UploadFile +from fastapi import APIRouter, File, Form, UploadFile from fastapi.responses import StreamingResponse +from loguru import logger from app.schemas.compliance import ( AnalyzeResponse, ComplianceChatRequest, ) from app.services.mock_data import generate_task_id, get_mock_compliance_result -from app.shared.bootstrap import get_agent_conversation_service +from app.shared.bootstrap import get_agent_conversation_service, get_retrieval_service +from app.config.settings import settings router = APIRouter(prefix="/compliance", tags=["合规分析"]) @@ -62,6 +64,128 @@ async def get_result(task_id: str): return task["result"] +def _sse(data: dict) -> str: + return f"event: message\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +@router.post("/analyze-stream") +async def analyze_stream( + text: Optional[str] = Form(None), + doc_id: Optional[str] = Form(None), + file: Optional[UploadFile] = File(None), + domains: Optional[str] = Form(None), + title: Optional[str] = Form(None), +): + """Stream compliance analysis as SSE events. + + Stages: clause_split → retrieval (per clause) → gap_check → conclusion + Events: stage | source | finding | done | error + """ + from app.application.compliance.pipeline import ( + check_clause_compliance, + extract_text_from_doc_id, + extract_text_from_file, + retrieve_for_clause, + split_into_clauses, + synthesize_conclusion, + ) + from app.services.llm.llm_factory import get_llm_client + from app.shared.bootstrap import get_retrieval_service + + # Read file content eagerly (before async generator) + file_content: bytes | None = None + file_name: str | None = None + if file is not None: + file_content = await file.read() + file_name = file.filename + + async def generate() -> AsyncGenerator[str, None]: + try: + client = get_llm_client(provider=settings.llm_provider, model=settings.llm_model) + retrieval_service = get_retrieval_service() + + # ── Stage 1: extract text ───────────────────────────────────── + yield _sse({"type": "stage", "stage": "extracting", "label": "Extracting text…"}) + await asyncio.sleep(0) + + if text: + para_text = text.strip() + elif doc_id: + try: + para_text = await asyncio.to_thread(extract_text_from_doc_id, doc_id) + except Exception as exc: + yield _sse({"type": "error", "text": f"Document not found: {exc}"}) + return + elif file_content is not None: + para_text = await asyncio.to_thread( + extract_text_from_file, file_content, file_name or "upload" + ) + else: + yield _sse({"type": "error", "text": "No input provided"}) + return + + if not para_text.strip(): + yield _sse({"type": "error", "text": "Could not extract text from the provided input"}) + return + + # ── Stage 2: split into clauses ─────────────────────────────── + yield _sse({"type": "stage", "stage": "splitting", "label": "Splitting into clauses…"}) + await asyncio.sleep(0) + clauses: list[str] = await asyncio.to_thread(split_into_clauses, para_text, client) + + # ── Stage 3: retrieve + gap check per clause ────────────────── + findings: list[dict] = [] + + for i, clause in enumerate(clauses): + yield _sse({ + "type": "stage", + "stage": "analyzing", + "label": f"Analyzing clause {i + 1}/{len(clauses)}…", + }) + await asyncio.sleep(0) + + chunks = await asyncio.to_thread( + retrieve_for_clause, clause, retrieval_service, 5, domains or None + ) + + # Emit source events + for chunk in chunks[:3]: + yield _sse({ + "type": "source", + "standard": getattr(chunk, "doc_title", "") or getattr(chunk, "doc_name", ""), + "clause": getattr(chunk, "section_title", "") or "", + "score": round(float(getattr(chunk, "score", 0)), 3), + "status": "retrieved", + "full_content": (getattr(chunk, "text", "") or "")[:300], + }) + await asyncio.sleep(0) + + finding = await asyncio.to_thread(check_clause_compliance, clause, chunks, client) + if finding: + findings.append(finding) + yield _sse({"type": "finding", **finding}) + await asyncio.sleep(0) + + # ── Stage 4: synthesize conclusion ──────────────────────────── + yield _sse({"type": "stage", "stage": "concluding", "label": "Generating conclusion…"}) + await asyncio.sleep(0) + + conclusion_data = await asyncio.to_thread( + synthesize_conclusion, para_text, findings, client + ) + yield _sse({"type": "done", **conclusion_data}) + + except Exception as exc: + logger.exception("analyze-stream pipeline error") + yield _sse({"type": "error", "text": str(exc)}) + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, + ) + + @router.post("/chat/{segment_id}") async def compliance_chat(segment_id: int, request: ComplianceChatRequest): """Stream compliance Q&A grounded in real vector retrieval.""" diff --git a/backend/app/application/compliance/__init__.py b/backend/app/application/compliance/__init__.py new file mode 100644 index 0000000..6f999c1 --- /dev/null +++ b/backend/app/application/compliance/__init__.py @@ -0,0 +1 @@ +"""Compliance application layer.""" diff --git a/backend/app/application/compliance/pipeline.py b/backend/app/application/compliance/pipeline.py new file mode 100644 index 0000000..92b406e --- /dev/null +++ b/backend/app/application/compliance/pipeline.py @@ -0,0 +1,215 @@ +"""Compliance analysis pipeline helpers. + +All functions are synchronous — call them via asyncio.to_thread() in async SSE generators. +""" + +from __future__ import annotations + +import json +import os +import re +import tempfile +from typing import TYPE_CHECKING + +from loguru import logger + +if TYPE_CHECKING: + from app.application.knowledge import KnowledgeRetrievalService + from app.domain.retrieval import RetrievedChunk + from app.services.llm.base_client import BaseLLMClient + + +def _extract_json(text: str): + """Extract JSON from LLM response, tolerating markdown wrappers.""" + stripped = text.strip() + match = re.search(r"```(?:json)?\s*([\s\S]*?)```", stripped) + if match: + stripped = match.group(1).strip() + try: + return json.loads(stripped) + except json.JSONDecodeError: + pass + for pattern in (r"(\[[\s\S]*\])", r"(\{[\s\S]*\})"): + m = re.search(pattern, stripped) + if m: + try: + return json.loads(m.group(1)) + except json.JSONDecodeError: + continue + raise ValueError(f"No valid JSON found in LLM response: {text[:300]}") + + +def extract_text_from_doc_id(doc_id: str) -> str: + from app.shared.bootstrap import get_document_query_service, get_retrieval_service + doc = get_document_query_service().get(doc_id) + if not doc: + raise ValueError(f"Document '{doc_id}' not found") + service = get_retrieval_service() + chunks = service.retrieve(query=doc.doc_name, top_k=30) + doc_chunks = [c for c in chunks if c.doc_id == doc_id] + if not doc_chunks: + doc_chunks = chunks[:15] + return "\n\n".join(c.text for c in doc_chunks[:15]) + + +def extract_text_from_file(content: bytes, filename: str) -> str: + from app.shared.bootstrap import get_document_command_service + suffix = os.path.splitext(filename or "doc.pdf")[1] or ".pdf" + tmp_path = "" + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: + tmp.write(content) + tmp_path = tmp.name + service = get_document_command_service() + parsed = service.parser.parse(file_path=tmp_path, doc_id="tmp_analysis", doc_name=filename) + if parsed.raw_text: + return parsed.raw_text[:4000] + return "\n".join( + b.get("text", "") for b in parsed.semantic_blocks[:30] if b.get("text") + )[:4000] + except Exception as exc: + logger.warning("File text extraction failed: {}", exc) + return "" + finally: + if tmp_path: + try: os.unlink(tmp_path) + except OSError: pass + + +def split_into_clauses(text: str, client: "BaseLLMClient") -> list[str]: + prompt = ( + "You are a compliance analysis expert. Split the following text into 3-8 " + "semantically complete compliance clauses. Each clause should be an independent " + "compliance requirement or technical statement.\n" + "Return as JSON array of strings, e.g.:\n" + '["Clause one...", "Clause two..."]\n' + "Return ONLY the JSON array.\n\n" + f"Text:\n{text[:2000]}" + ) + response = client.chat([{"role": "user", "content": prompt}], max_tokens=1000) + if response.is_success: + try: + result = _extract_json(response.content) + if isinstance(result, list): + clauses = [str(c).strip() for c in result if str(c).strip()] + if clauses: + return clauses[:8] + except (ValueError, TypeError): + logger.warning("Clause split JSON parse failed, using fallback") + sentences = re.split(r"[.?!;\n]+", text) + return [s.strip() for s in sentences if len(s.strip()) > 20][:6] + + +def retrieve_for_clause( + clause: str, + retrieval_service: "KnowledgeRetrievalService", + top_k: int = 5, + domains: str | None = None, +) -> list["RetrievedChunk"]: + return retrieval_service.retrieve(query=clause, top_k=top_k, filters=domains) + + +def check_clause_compliance( + clause: str, + chunks: list["RetrievedChunk"], + client: "BaseLLMClient", +) -> dict | None: + if not chunks: + return None + reg_context = "\n".join( + f"[{i+1}] {c.doc_title} {c.section_title or ''}: {c.text[:300]}" + for i, c in enumerate(chunks[:5]) + ) + prompt = ( + "You are a compliance expert. Judge whether the following business clause " + "complies with the retrieved regulations.\n\n" + f"Business clause:\n{clause}\n\n" + f"Retrieved regulations:\n{reg_context}\n\n" + "Return JSON:\n" + "{\n" + ' "status": "ok" | "warn" | "risk",\n' + ' "title": "Short finding title (max 30 chars)",\n' + ' "desc": "Description (50-120 chars)",\n' + ' "clause_ref": "Regulation clause reference e.g. Art.9.1 or Sec.3.1"\n' + "}\n" + "status: ok=compliant, warn=gap exists, risk=critical/missing\n" + "Return ONLY the JSON object." + ) + response = client.chat([{"role": "user", "content": prompt}], max_tokens=500) + if not response.is_success: + return None + try: + result = _extract_json(response.content) + if isinstance(result, dict) and "status" in result: + return { + "title": str(result.get("title", "Compliance finding")), + "desc": str(result.get("desc", "")), + "status": result.get("status", "info"), + "clause_ref": result.get("clause_ref"), + } + except (ValueError, TypeError) as exc: + logger.warning("Gap check JSON parse failed: {}", exc) + return None + + +def synthesize_conclusion( + para_text: str, + findings: list[dict], + client: "BaseLLMClient", +) -> dict: + if not findings: + return { + "conclusion": "No significant compliance gaps found. Continue monitoring regulation updates.", + "actions": [{"label": "Next action", "value": "Monitor regulation updates"}], + "risk_score": 10, + "highlight_terms": [], + "para_text": para_text[:800], + } + findings_text = "\n".join( + f"- [{f['status'].upper()}] {f['title']}: {f['desc']}" + for f in findings + ) + prompt = ( + "You are a compliance analysis expert. Generate a summary report " + "based on the following compliance findings.\n\n" + f"Original text (first 600 chars):\n{para_text[:600]}\n\n" + f"Findings:\n{findings_text}\n\n" + "Return JSON:\n" + "{\n" + ' "conclusion": "Overall compliance conclusion (100-200 chars)",\n' + ' "actions": [\n' + ' {"label": "Action label", "value": "Description"},\n' + ' {"label": "Priority", "value": "High/Medium/Low", "risk": true}\n' + ' ],\n' + ' "risk_score": 0-100 (integer, higher=riskier),\n' + ' "highlight_terms": ["Key terms to highlight, max 10 terms"],\n' + ' "para_text": "Original text or summary (max 600 chars)"\n' + "}\n" + "Return ONLY the JSON object." + ) + response = client.chat([{"role": "user", "content": prompt}], max_tokens=1200) + fallback = { + "conclusion": "Compliance analysis complete. Review findings and create remediation plan.", + "actions": [ + {"label": "Next action", "value": "Review critical findings"}, + {"label": "Escalation", "value": "Legal review required", "risk": True}, + ], + "risk_score": 60, + "highlight_terms": [], + "para_text": para_text[:800], + } + if not response.is_success: + return fallback + try: + result = _extract_json(response.content) + if isinstance(result, dict): + return { + "conclusion": str(result.get("conclusion", fallback["conclusion"])), + "actions": result.get("actions", fallback["actions"]), + "risk_score": int(result.get("risk_score", 60)), + "highlight_terms": result.get("highlight_terms", []), + "para_text": str(result.get("para_text", para_text[:800])), + } + except (ValueError, TypeError) as exc: + logger.warning("Conclusion synthesis JSON parse failed: {}", exc) + return fallback \ No newline at end of file diff --git a/backend/app/schemas/compliance.py b/backend/app/schemas/compliance.py index 3218a46..b4a0200 100644 --- a/backend/app/schemas/compliance.py +++ b/backend/app/schemas/compliance.py @@ -80,4 +80,30 @@ class ComplianceChatRequest(BaseModel): class AnalyzeResponse(BaseModel): """Define the Analyze Response API model.""" task_id: str - status: str = "processing" \ No newline at end of file + status: str = "processing" + + +class AnalyzeStreamSource(BaseModel): + """SSE source event payload for analyze-stream.""" + standard: str + clause: str + score: float + status: str + full_content: str + + +class AnalyzeStreamFinding(BaseModel): + """SSE finding event payload for analyze-stream.""" + title: str + desc: str + status: str + clause_ref: Optional[str] = None + + +class AnalyzeStreamDone(BaseModel): + """SSE done event payload for analyze-stream.""" + conclusion: str + actions: list[dict] + risk_score: int + highlight_terms: list[str] + para_text: str \ No newline at end of file diff --git a/docs/superpowers/plans/2026-06-04-compliance-new-analysis.md b/docs/superpowers/plans/2026-06-04-compliance-new-analysis.md new file mode 100644 index 0000000..a29d669 --- /dev/null +++ b/docs/superpowers/plans/2026-06-04-compliance-new-analysis.md @@ -0,0 +1,1489 @@ +# Compliance Analysis — New Analysis (Document Review Loop) + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Implement the full "New Analysis" Document Review Loop — user submits text/document/file, backend runs a 4-stage SSE pipeline (clause extraction → RAG retrieval → gap analysis → conclusion synthesis), and the three-column workspace renders real AI findings in real time. + +**Architecture:** New backend endpoint `POST /api/v1/compliance/analyze-stream` streams SSE events through 4 stages using the existing `KnowledgeRetrievalService` (Milvus BM25+dense) and `get_llm_client()` (DeepSeek/Qwen). Frontend: `NewAnalysisModal` (3 input tabs) + `useComplianceAnalysis` hook + refactored `CompliancePage` that drives the three-column workspace from SSE state instead of static mock data. + +**Tech Stack:** FastAPI + Python asyncio (backend SSE), existing `get_llm_client()` / `get_retrieval_service()` / `get_document_query_service()`, React 19 + TypeScript + globals.css CSS variables (no Tailwind in new components). + +--- + +## File Map + +**CREATE:** +- `backend/app/application/compliance/__init__.py` +- `backend/app/application/compliance/pipeline.py` — LLM pipeline helpers (clause split, gap check, conclusion) +- `frontend/src/pages/Compliance/NewAnalysisModal.tsx` — 3-tab input modal +- `frontend/src/pages/Compliance/useComplianceAnalysis.ts` — SSE state hook + +**MODIFY:** +- `backend/app/schemas/compliance.py` — add `AnalyzeStreamRequest` and SSE payload types +- `backend/app/api/routes/compliance.py` — add `POST /analyze-stream` +- `frontend/src/styles/globals.css` — add modal-tab, compliance-status-bar, analysis-empty styles +- `frontend/src/pages/Compliance/CompliancePage.tsx` — replace static mock with hook-driven state + +--- + +## Task 1: Backend — Compliance pipeline helpers + +**Files:** +- Create: `backend/app/application/compliance/__init__.py` +- Create: `backend/app/application/compliance/pipeline.py` + +### Context + +The pipeline module provides three synchronous helpers called from the async SSE generator via `asyncio.to_thread()`. They use `get_llm_client()` for structured LLM calls and `get_retrieval_service()` for Milvus retrieval. + +LLM client interface (from `backend/app/services/llm/base_client.py`): +```python +client.chat(messages: List[Dict[str,str]], max_tokens: Optional[int] = None) -> LLMResponse +# LLMResponse.content: str — the text response +# LLMResponse.is_success: bool +``` + +Retrieval service interface (from `backend/app/application/knowledge/services.py`): +```python +retrieval_service.retrieve(*, query: str, top_k: int, filters: str | None = None) -> list[RetrievedChunk] +# RetrievedChunk fields: chunk_id, doc_id, doc_title, text, score, section_title, page_start +``` + +--- + +- [ ] **Step 1: Create `backend/app/application/compliance/__init__.py`** + +```python +"""Compliance application layer.""" +``` + +- [ ] **Step 2: Create `backend/app/application/compliance/pipeline.py`** + +```python +"""Compliance analysis pipeline helpers. + +All functions are synchronous — call them via asyncio.to_thread() in async SSE generators. +""" + +from __future__ import annotations + +import json +import os +import re +import tempfile +from typing import TYPE_CHECKING + +from loguru import logger + +if TYPE_CHECKING: + from app.application.knowledge import KnowledgeRetrievalService + from app.domain.retrieval import RetrievedChunk + from app.services.llm.base_client import BaseLLMClient + + +# ── JSON extraction ──────────────────────────────────────────────────────────── + +def _extract_json(text: str): + """Extract JSON from LLM response, tolerating markdown wrappers and extra prose.""" + # Strip markdown code fences + stripped = text.strip() + match = re.search(r"```(?:json)?\s*([\s\S]*?)```", stripped) + if match: + stripped = match.group(1).strip() + # Direct parse + try: + return json.loads(stripped) + except json.JSONDecodeError: + pass + # Find first JSON array or object + for pattern in (r"(\[[\s\S]*\])", r"(\{[\s\S]*\})"): + m = re.search(pattern, stripped) + if m: + try: + return json.loads(m.group(1)) + except json.JSONDecodeError: + continue + raise ValueError(f"No valid JSON found in LLM response: {text[:300]}") + + +# ── Text extraction ──────────────────────────────────────────────────────────── + +def extract_text_from_doc_id(doc_id: str) -> str: + """Retrieve document text by querying its chunks from Milvus.""" + from app.shared.bootstrap import get_document_query_service, get_retrieval_service + + doc = get_document_query_service().get(doc_id) + if not doc: + raise ValueError(f"Document '{doc_id}' not found") + + service = get_retrieval_service() + # Broad query using doc name to pull representative chunks + chunks = service.retrieve(query=doc.doc_name, top_k=30) + # Filter to this document only + doc_chunks = [c for c in chunks if c.doc_id == doc_id] + # Fallback: use all results if none matched (e.g., different doc_id in Milvus) + if not doc_chunks: + doc_chunks = chunks[:15] + return "\n\n".join(c.text for c in doc_chunks[:15]) + + +def extract_text_from_file(content: bytes, filename: str) -> str: + """Parse an uploaded file and return its raw text.""" + from app.shared.bootstrap import get_document_command_service + + suffix = os.path.splitext(filename or "doc.pdf")[1] or ".pdf" + tmp_path = "" + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: + tmp.write(content) + tmp_path = tmp.name + service = get_document_command_service() + parsed = service.parser.parse(file_path=tmp_path, doc_id="tmp_analysis", doc_name=filename) + # Prefer raw_text; fall back to joining semantic blocks + if parsed.raw_text: + return parsed.raw_text[:4000] + return "\n".join( + b.get("text", "") for b in parsed.semantic_blocks[:30] if b.get("text") + )[:4000] + except Exception as exc: + logger.warning("File text extraction failed: {}", exc) + return "" + finally: + if tmp_path: + try: + os.unlink(tmp_path) + except OSError: + pass + + +# ── Pipeline stage 1: clause extraction ─────────────────────────────────────── + +def split_into_clauses(text: str, client: "BaseLLMClient") -> list[str]: + """Split input text into 3-8 semantic compliance clauses using LLM. + + Falls back to sentence-level splitting if LLM returns invalid JSON. + """ + prompt = ( + "你是合规分析专家。请将以下文本分割为3-8个语义完整的合规条款," + "每个条款应是一个独立的合规要求或技术声明。\n" + "以JSON数组格式返回,每个元素是一个字符串,例如:\n" + "[\"条款一的内容...\", \"条款二的内容...\"]\n" + "只返回JSON数组,不要有其他内容。\n\n" + f"文本:\n{text[:2000]}" + ) + response = client.chat([{"role": "user", "content": prompt}], max_tokens=1000) + + if response.is_success: + try: + result = _extract_json(response.content) + if isinstance(result, list): + clauses = [str(c).strip() for c in result if str(c).strip()] + if clauses: + return clauses[:8] + except (ValueError, TypeError): + logger.warning("Clause split JSON parse failed, using fallback") + + # Fallback: sentence-level split + sentences = re.split(r"[。;\n]+", text) + return [s.strip() for s in sentences if len(s.strip()) > 20][:6] + + +# ── Pipeline stage 2+3: retrieval + gap check ───────────────────────────────── + +def retrieve_for_clause( + clause: str, + retrieval_service: "KnowledgeRetrievalService", + top_k: int = 5, + domains: str | None = None, +) -> list["RetrievedChunk"]: + """Retrieve top-K regulation chunks relevant to a clause.""" + return retrieval_service.retrieve(query=clause, top_k=top_k, filters=domains) + + +def check_clause_compliance( + clause: str, + chunks: list["RetrievedChunk"], + client: "BaseLLMClient", +) -> dict | None: + """Check whether a clause complies with retrieved regulations. + + Returns a finding dict with keys: title, desc, status, clause_ref. + Returns None if the clause is fully compliant and no finding is needed. + """ + if not chunks: + return None + + reg_context = "\n".join( + f"[{i+1}] {c.doc_title} {c.section_title or ''}: {c.text[:300]}" + for i, c in enumerate(chunks[:5]) + ) + + prompt = ( + "你是合规专家。请判断以下业务条款是否符合已检索的法规要求。\n\n" + f"业务条款:\n{clause}\n\n" + f"相关法规(已检索):\n{reg_context}\n\n" + "请以JSON格式返回判断结果:\n" + "{\n" + ' "status": "ok" | "warn" | "risk",\n' + ' "title": "简短发现标题(不超过30字)",\n' + ' "desc": "具体说明(50-120字)",\n' + ' "clause_ref": "引用的法规条款编号,如 Art.9.1 或 §3.1"\n' + "}\n" + "status含义:ok=已符合,warn=存在差距需关注,risk=严重缺失需立即整改\n" + "只返回JSON,不要其他内容。" + ) + response = client.chat([{"role": "user", "content": prompt}], max_tokens=500) + + if not response.is_success: + return None + try: + result = _extract_json(response.content) + if isinstance(result, dict) and "status" in result: + return { + "title": str(result.get("title", "Compliance finding")), + "desc": str(result.get("desc", "")), + "status": result.get("status", "info"), + "clause_ref": result.get("clause_ref"), + } + except (ValueError, TypeError) as exc: + logger.warning("Gap check JSON parse failed: {}", exc) + return None + + +# ── Pipeline stage 4: conclusion synthesis ──────────────────────────────────── + +def synthesize_conclusion( + para_text: str, + findings: list[dict], + client: "BaseLLMClient", +) -> dict: + """Synthesize overall compliance conclusion, action items, risk score, and highlight terms.""" + if not findings: + return { + "conclusion": "经过合规分析,未发现重大合规差距。建议持续跟踪相关法规更新。", + "actions": [{"label": "Next action", "value": "Monitor regulation updates"}], + "risk_score": 10, + "highlight_terms": [], + "para_text": para_text[:800], + } + + findings_text = "\n".join( + f"- [{f['status'].upper()}] {f['title']}: {f['desc']}" + for f in findings + ) + prompt = ( + "你是合规分析专家。请基于以下合规分析发现,生成总结报告。\n\n" + f"原文摘要(前600字):\n{para_text[:600]}\n\n" + f"发现列表:\n{findings_text}\n\n" + "请以JSON格式返回:\n" + "{\n" + ' "conclusion": "总体合规结论(100-200字)",\n' + ' "actions": [\n' + ' {"label": "下一步行动", "value": "具体描述"},\n' + ' {"label": "优先级", "value": "高/中/低", "risk": true}\n' + ' ],\n' + ' "risk_score": 整体风险评分(0-100的整数,越高风险越大),\n' + ' "highlight_terms": ["需要在原文中高亮的关键术语列表,最多10个"],\n' + ' "para_text": "用于显示的原文或摘要(不超过600字)"\n' + "}\n" + "只返回JSON,不要其他内容。" + ) + response = client.chat([{"role": "user", "content": prompt}], max_tokens=1200) + + fallback = { + "conclusion": "合规分析完成。请查看各发现项并制定整改计划。", + "actions": [ + {"label": "Next action", "value": "Review critical findings"}, + {"label": "Escalation", "value": "Legal review required", "risk": True}, + ], + "risk_score": 60, + "highlight_terms": [], + "para_text": para_text[:800], + } + + if not response.is_success: + return fallback + + try: + result = _extract_json(response.content) + if isinstance(result, dict): + return { + "conclusion": str(result.get("conclusion", fallback["conclusion"])), + "actions": result.get("actions", fallback["actions"]), + "risk_score": int(result.get("risk_score", 60)), + "highlight_terms": result.get("highlight_terms", []), + "para_text": str(result.get("para_text", para_text[:800])), + } + except (ValueError, TypeError) as exc: + logger.warning("Conclusion synthesis JSON parse failed: {}", exc) + return fallback +``` + +- [ ] **Step 3: Verify imports resolve** + +```powershell +cd backend +python -c "from app.application.compliance.pipeline import split_into_clauses, check_clause_compliance, synthesize_conclusion; print('OK')" +``` + +Expected: `OK` + +--- + +## Task 2: Backend — Schema additions + +**Files:** +- Modify: `backend/app/schemas/compliance.py` + +Add the SSE event payload types to the existing schema file. These are used for documentation/typing only — the SSE endpoint yields raw JSON strings. + +- [ ] **Step 1: Append to `backend/app/schemas/compliance.py`** + +Add these classes at the end of the file (after the existing `AnalyzeResponse` class): + +```python +class AnalyzeStreamSource(BaseModel): + """Define the Analyze Stream Source API model.""" + standard: str + clause: str + score: float # 0.0–1.0 + status: str # "ok" | "warn" | "risk" + full_content: str + + +class AnalyzeStreamFinding(BaseModel): + """Define the Analyze Stream Finding API model.""" + title: str + desc: str + status: str # "ok" | "warn" | "risk" + clause_ref: Optional[str] = None + + +class AnalyzeStreamDone(BaseModel): + """Define the Analyze Stream Done API model.""" + conclusion: str + actions: list[dict] + risk_score: int + highlight_terms: list[str] + para_text: str +``` + +--- + +## Task 3: Backend — analyze-stream SSE endpoint + +**Files:** +- Modify: `backend/app/api/routes/compliance.py` + +Add `POST /api/v1/compliance/analyze-stream`. This endpoint accepts multipart/form-data with one of `text`, `doc_id`, or `file`, plus optional `domains` and `title`. It streams SSE events in the pattern: `stage → source* → stage → finding* → stage → done`. + +- [ ] **Step 1: Add imports at the top of `compliance.py`** + +Replace the current import block with: + +```python +"""Define API routes for compliance.""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +from typing import AsyncGenerator, Optional + +from fastapi import APIRouter, File, Form, HTTPException, UploadFile +from fastapi.responses import StreamingResponse +from loguru import logger + +from app.application.compliance.pipeline import ( + check_clause_compliance, + extract_text_from_doc_id, + extract_text_from_file, + retrieve_for_clause, + split_into_clauses, + synthesize_conclusion, +) +from app.config.settings import settings +from app.schemas.compliance import ( + AnalyzeResponse, + ComplianceChatRequest, +) +from app.services.mock_data import generate_task_id, get_mock_compliance_result +from app.services.llm.llm_factory import get_llm_client +from app.shared.bootstrap import get_agent_conversation_service, get_retrieval_service +``` + +- [ ] **Step 2: Add `_sse` helper and `analyze_stream` endpoint at the end of `compliance.py`** + +Append after the existing `compliance_chat` function: + +```python +def _sse(data: dict) -> str: + """Format a dict as an SSE data line.""" + return f"event: message\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +@router.post("/analyze-stream") +async def analyze_stream( + text: Optional[str] = Form(None), + doc_id: Optional[str] = Form(None), + file: Optional[UploadFile] = File(None), + domains: Optional[str] = Form(None), + title: Optional[str] = Form(None), +): + """Stream a 4-stage compliance analysis as SSE. + + Input priority: text → doc_id → file (first non-empty wins). + SSE event types: stage | source | finding | done | error + """ + # ── Validate that at least one input is provided ────────────────────────── + if not text and not doc_id and not file: + raise HTTPException(status_code=400, detail="Provide text, doc_id, or file") + + file_content: bytes = b"" + file_name: str = "" + if file: + file_content = await file.read() + file_name = file.filename or "document" + + async def generate() -> AsyncGenerator[str, None]: + # ── Stage 1: Extract text ───────────────────────────────────────────── + yield _sse({"type": "stage", "stage": "Clause extraction", "pct": 5, + "message": "Extracting input text..."}) + await asyncio.sleep(0) + + try: + if text: + input_text = text + elif doc_id: + input_text = await asyncio.to_thread(extract_text_from_doc_id, doc_id) + else: + input_text = await asyncio.to_thread( + extract_text_from_file, file_content, file_name + ) + except Exception as exc: + logger.exception("Text extraction failed") + yield _sse({"type": "error", "message": f"Text extraction failed: {exc}"}) + return + + if not input_text.strip(): + yield _sse({"type": "error", "message": "No text could be extracted from the input"}) + return + + # ── Split into clauses ──────────────────────────────────────────────── + client = get_llm_client(provider=settings.llm_provider, model=settings.llm_model) + yield _sse({"type": "stage", "stage": "Clause extraction", "pct": 20, + "message": "Splitting into semantic clauses..."}) + await asyncio.sleep(0) + + clauses = await asyncio.to_thread(split_into_clauses, input_text, client) + yield _sse({"type": "stage", "stage": "Clause extraction", "pct": 100, + "message": f"Extracted {len(clauses)} clauses"}) + await asyncio.sleep(0) + + # ── Stage 2: Retrieve regulations per clause ────────────────────────── + yield _sse({"type": "stage", "stage": "Regulation retrieval", "pct": 10, + "message": "Retrieving relevant regulations from knowledge base..."}) + await asyncio.sleep(0) + + retrieval_service = get_retrieval_service() + seen_sources: set[str] = set() + clause_chunks: list[list] = [] + + for i, clause in enumerate(clauses): + pct = 10 + int(90 * (i + 1) / len(clauses)) + chunks = await asyncio.to_thread( + retrieve_for_clause, clause, retrieval_service, 5, domains + ) + clause_chunks.append(chunks) + + for chunk in chunks[:3]: + source_key = f"{chunk.doc_title}|{chunk.section_title}" + if source_key not in seen_sources: + seen_sources.add(source_key) + score = chunk.score if chunk.score <= 1.0 else chunk.score / 100.0 + status = "risk" if score >= 0.85 else "warn" if score >= 0.65 else "ok" + yield _sse({ + "type": "source", + "data": { + "standard": chunk.doc_title, + "clause": chunk.section_title or "—", + "score": round(score, 3), + "status": status, + "full_content": chunk.text[:400], + } + }) + await asyncio.sleep(0) + + yield _sse({"type": "stage", "stage": "Regulation retrieval", "pct": pct, + "message": f"Processed clause {i+1}/{len(clauses)}"}) + await asyncio.sleep(0) + + yield _sse({"type": "stage", "stage": "Regulation retrieval", "pct": 100, + "message": "Retrieval complete"}) + await asyncio.sleep(0) + + # ── Stage 3: Gap analysis per clause ───────────────────────────────── + yield _sse({"type": "stage", "stage": "Gap analysis", "pct": 5, + "message": "Analyzing compliance gaps..."}) + await asyncio.sleep(0) + + findings: list[dict] = [] + for i, (clause, chunks) in enumerate(zip(clauses, clause_chunks)): + pct = 5 + int(95 * (i + 1) / len(clauses)) + finding = await asyncio.to_thread(check_clause_compliance, clause, chunks, client) + if finding: + findings.append(finding) + yield _sse({"type": "finding", "data": finding}) + await asyncio.sleep(0) + yield _sse({"type": "stage", "stage": "Gap analysis", "pct": pct, + "message": f"Checked clause {i+1}/{len(clauses)}"}) + await asyncio.sleep(0) + + yield _sse({"type": "stage", "stage": "Gap analysis", "pct": 100, + "message": f"Found {len(findings)} issues"}) + await asyncio.sleep(0) + + # ── Stage 4: Conclusion synthesis ───────────────────────────────────── + yield _sse({"type": "stage", "stage": "Recommendation synthesis", "pct": 20, + "message": "Synthesizing recommendations..."}) + await asyncio.sleep(0) + + done_data = await asyncio.to_thread(synthesize_conclusion, input_text, findings, client) + + yield _sse({"type": "stage", "stage": "Recommendation synthesis", "pct": 100, + "message": "Analysis complete"}) + yield _sse({"type": "done", "data": done_data}) + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) +``` + +- [ ] **Step 3: Verify backend starts without import errors** + +```powershell +cd backend +python -c "from app.api.routes.compliance import router; print('compliance router OK')" +``` + +Expected: `compliance router OK` + +--- + +## Task 4: Frontend — CSS additions + +**Files:** +- Modify: `frontend/src/styles/globals.css` + +Add styles for the modal tab switcher, compliance status bar, analysis empty state, and stage-running animation. Append to the end of globals.css. + +- [ ] **Step 1: Append CSS to the end of `frontend/src/styles/globals.css`** + +```css +/* ── New Analysis Modal — Tab Switcher ──────────────────── */ +.modal-tabs { + display: flex; + gap: 0; + border-bottom: 1px solid var(--border); + margin: 12px 0 16px; + flex-shrink: 0; +} +.modal-tab { + height: 34px; + padding: 0 14px; + font-size: 12px; + font-weight: 500; + border: none; + background: none; + color: var(--muted); + cursor: pointer; + border-bottom: 2px solid transparent; + margin-bottom: -1px; + transition: color 0.12s, border-color 0.12s; +} +.modal-tab:hover { color: var(--fg); } +.modal-tab.active { color: var(--accent); border-bottom-color: var(--accent); font-weight: 600; } + +/* ── Compliance Status Bar ──────────────────────────────── */ +.compliance-status-bar { + display: flex; + align-items: center; + gap: 12px; + padding: 10px 24px; + background: var(--surface); + border-bottom: 1px solid var(--border); + flex-shrink: 0; + font-size: 13px; +} +.compliance-status-bar .analysis-title { + font-weight: 600; + font-family: var(--font-display); + color: var(--fg); +} +.compliance-status-bar .risk-badge { + display: inline-flex; + align-items: center; + gap: 5px; + padding: 2px 10px; + border-radius: var(--radius-pill); + font-size: 11px; + font-weight: 700; + font-family: var(--font-mono); +} +.risk-badge.high { background: var(--danger-bg); color: var(--danger); } +.risk-badge.medium { background: var(--warn-bg); color: var(--warn); } +.risk-badge.low { background: var(--success-bg); color: var(--success); } + +/* ── Analysis Empty State ───────────────────────────────── */ +.compliance-empty { + flex: 1; + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + gap: 14px; + color: var(--muted); + padding: 48px 24px; + text-align: center; +} +.compliance-empty .empty-ring { + width: 64px; height: 64px; + border-radius: 50%; + border: 2px dashed var(--border); +} +.compliance-empty p { font-size: 13px; max-width: 280px; line-height: 1.6; } + +/* ── Stage running animation ────────────────────────────── */ +.stage-fill.stage-running { + background: var(--accent); + animation: stage-pulse 1.2s ease-in-out infinite; +} +@keyframes stage-pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.55; } +} + +/* ── Domain chips in modal ──────────────────────────────── */ +.domain-chips { display: flex; gap: 6px; flex-wrap: wrap; margin-top: 10px; } + +/* ── Doc select in library tab ──────────────────────────── */ +.doc-select-list { + display: flex; + flex-direction: column; + gap: 6px; + max-height: 220px; + overflow-y: auto; + margin-top: 8px; +} +.doc-select-item { + display: flex; + align-items: center; + gap: 10px; + padding: 10px 12px; + border: 1px solid var(--border); + border-radius: var(--radius-sm); + cursor: pointer; + transition: background 0.1s; + font-size: 13px; +} +.doc-select-item:hover { background: var(--bg); } +.doc-select-item.selected { + border-color: var(--accent); + background: var(--accent-dim); + color: var(--accent); +} +.doc-select-name { font-weight: 500; flex: 1; min-width: 0; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } +.doc-select-type { font-size: 10px; color: var(--muted); font-family: var(--font-mono); } +``` + +--- + +## Task 5: Frontend — useComplianceAnalysis hook + +**Files:** +- Create: `frontend/src/pages/Compliance/useComplianceAnalysis.ts` + +This hook encapsulates all SSE state. It exposes `state`, `run(formData, title)`, and `reset()`. No JSX — pure TypeScript. + +- [ ] **Step 1: Create `frontend/src/pages/Compliance/useComplianceAnalysis.ts`** + +```typescript +import { useState, useRef, useCallback } from 'react'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +export interface AnalysisSource { + standard: string; + clause: string; + score: number; // 0–1 + status: 'ok' | 'warn' | 'risk'; + full_content: string; +} + +export interface AnalysisFinding { + title: string; + desc: string; + status: 'ok' | 'warn' | 'risk'; + clause_ref?: string; +} + +export interface AnalysisStage { + label: string; + pct: number; + status: 'idle' | 'running' | 'ok' | 'warn'; +} + +export interface AnalysisConclusion { + text: string; + actions: Array<{ label: string; value: string; risk?: boolean }>; + riskScore: number; +} + +export interface AnalysisState { + status: 'idle' | 'running' | 'done' | 'error'; + title: string; + paraText: string; + highlightTerms: string[]; + sources: AnalysisSource[]; + stages: AnalysisStage[]; + findings: AnalysisFinding[]; + conclusion: AnalysisConclusion | null; + errorMessage: string; +} + +// ── Constants ───────────────────────────────────────────────────────────────── + +const STAGE_LABELS = [ + 'Clause extraction', + 'Regulation retrieval', + 'Gap analysis', + 'Recommendation synthesis', +] as const; + +const STAGE_INDEX: Record = { + 'Clause extraction': 0, + 'Regulation retrieval': 1, + 'Gap analysis': 2, + 'Recommendation synthesis': 3, +}; + +function makeInitialStages(): AnalysisStage[] { + return STAGE_LABELS.map(label => ({ label, pct: 0, status: 'idle' as const })); +} + +const INITIAL_STATE: AnalysisState = { + status: 'idle', + title: '', + paraText: '', + highlightTerms: [], + sources: [], + stages: makeInitialStages(), + findings: [], + conclusion: null, + errorMessage: '', +}; + +// ── Hook ────────────────────────────────────────────────────────────────────── + +export function useComplianceAnalysis() { + const [state, setState] = useState(INITIAL_STATE); + const abortRef = useRef(null); + + const run = useCallback(async (formData: FormData, title: string) => { + abortRef.current?.abort(); + const ctrl = new AbortController(); + abortRef.current = ctrl; + + setState({ + ...INITIAL_STATE, + status: 'running', + title, + stages: makeInitialStages(), + }); + + try { + const res = await fetch('/api/v1/compliance/analyze-stream', { + method: 'POST', + body: formData, + signal: ctrl.signal, + }); + + if (!res.ok) { + const errText = await res.text().catch(() => res.statusText); + setState(s => ({ ...s, status: 'error', errorMessage: errText })); + return; + } + + if (!res.body) throw new Error('No SSE stream body'); + const reader = res.body.getReader(); + const dec = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += dec.decode(value, { stream: true }); + + // SSE blocks are separated by double newlines + const blocks = buffer.split('\n\n'); + buffer = blocks.pop() ?? ''; + + for (const block of blocks) { + const dataLine = block.split('\n').find(l => l.startsWith('data: ')); + if (!dataLine) continue; + const raw = dataLine.slice(6).trim(); + if (!raw) continue; + + try { + const j = JSON.parse(raw) as { + type: string; + stage?: string; + pct?: number; + data?: Record; + message?: string; + }; + + if (j.type === 'stage' && j.stage !== undefined) { + const idx = STAGE_INDEX[j.stage] ?? -1; + const pct = j.pct ?? 0; + setState(s => { + const stages = s.stages.map((st, i) => { + if (i < idx) return { ...st, pct: 100, status: 'ok' as const }; + if (i === idx) return { ...st, pct, status: pct >= 100 ? 'ok' as const : 'running' as const }; + return st; + }); + return { ...s, stages }; + }); + + } else if (j.type === 'source' && j.data) { + const src = j.data as unknown as AnalysisSource; + setState(s => ({ ...s, sources: [...s.sources, src] })); + + } else if (j.type === 'finding' && j.data) { + const finding = j.data as unknown as AnalysisFinding; + setState(s => ({ ...s, findings: [...s.findings, finding] })); + + } else if (j.type === 'done' && j.data) { + const d = j.data as { + conclusion: string; + actions: Array<{ label: string; value: string; risk?: boolean }>; + risk_score: number; + highlight_terms: string[]; + para_text: string; + }; + setState(s => ({ + ...s, + status: 'done', + paraText: d.para_text ?? '', + highlightTerms: d.highlight_terms ?? [], + conclusion: { + text: d.conclusion, + actions: d.actions ?? [], + riskScore: d.risk_score ?? 0, + }, + stages: s.stages.map(st => ({ ...st, pct: 100, status: 'ok' as const })), + })); + + } else if (j.type === 'error') { + setState(s => ({ + ...s, + status: 'error', + errorMessage: j.message ?? 'Analysis failed', + })); + } + } catch { /* skip malformed SSE data */ } + } + } + } catch (e: unknown) { + if (e instanceof Error && e.name === 'AbortError') return; + setState(s => ({ + ...s, + status: 'error', + errorMessage: e instanceof Error ? e.message : 'Unknown error', + })); + } + }, []); + + const reset = useCallback(() => { + abortRef.current?.abort(); + setState(INITIAL_STATE); + }, []); + + return { state, run, reset }; +} +``` + +--- + +## Task 6: Frontend — NewAnalysisModal component + +**Files:** +- Create: `frontend/src/pages/Compliance/NewAnalysisModal.tsx` + +Three-tab input modal: Tab 1 = paste text, Tab 2 = select from document library, Tab 3 = upload file. Shared: domain chips, optional title, Run button. + +- [ ] **Step 1: Create `frontend/src/pages/Compliance/NewAnalysisModal.tsx`** + +```tsx +import { useState, useEffect, useRef } from 'react'; +import { X, Upload } from 'lucide-react'; + +interface Props { + onClose: () => void; + onRun: (formData: FormData, title: string) => void; +} + +interface DocItem { + doc_id: string; + doc_name: string; + regulation_type: string; + status: string; +} + +const DOMAIN_OPTIONS = [ + { key: 'vehicle', label: 'Vehicle Safety' }, + { key: 'data_security', label: 'Data Security' }, + { key: 'ehs', label: 'EHS' }, + { key: 'carbon', label: 'Carbon' }, + { key: 'quality', label: 'Quality System' }, +]; + +type Tab = 'text' | 'library' | 'upload'; + +export function NewAnalysisModal({ onClose, onRun }: Props) { + const [tab, setTab] = useState('text'); + const [inputText, setInputText] = useState(''); + const [selectedDocId, setSelectedDocId] = useState(''); + const [uploadedFile, setUploadedFile] = useState(null); + const [domains, setDomains] = useState([]); + const [title, setTitle] = useState(''); + const [docs, setDocs] = useState([]); + const [dragging, setDragging] = useState(false); + const fileInputRef = useRef(null); + + // Load document list for Tab 2 + useEffect(() => { + fetch('/api/v1/documents/management-list') + .then(r => r.json()) + .then(d => { if (Array.isArray(d?.documents)) setDocs(d.documents); }) + .catch(() => {}); + }, []); + + function toggleDomain(key: string) { + setDomains(prev => + prev.includes(key) ? prev.filter(d => d !== key) : [...prev, key] + ); + } + + function isReady(): boolean { + if (tab === 'text') return inputText.trim().length > 20; + if (tab === 'library') return !!selectedDocId; + if (tab === 'upload') return !!uploadedFile; + return false; + } + + function handleRun() { + if (!isReady()) return; + const form = new FormData(); + if (tab === 'text') form.append('text', inputText.trim()); + if (tab === 'library') form.append('doc_id', selectedDocId); + if (tab === 'upload' && uploadedFile) form.append('file', uploadedFile); + if (domains.length) form.append('domains', domains.join(',')); + if (title.trim()) form.append('title', title.trim()); + const analysisTitle = title.trim() || ( + tab === 'library' + ? docs.find(d => d.doc_id === selectedDocId)?.doc_name ?? 'Document Analysis' + : tab === 'upload' ? uploadedFile?.name ?? 'File Analysis' : 'Text Analysis' + ); + onRun(form, analysisTitle); + onClose(); + } + + return ( +
+
e.stopPropagation()} + > + + +
+
New Analysis
+
Set up a compliance document review
+

Choose your input, select regulation domains to focus on, and run the AI pipeline.

+ + {/* Tab switcher */} +
+ {([ + { key: 'text', label: 'Paste text' }, + { key: 'library', label: 'From library' }, + { key: 'upload', label: 'Upload file' }, + ] as const).map(t => ( + + ))} +
+ + {/* Tab 1: Paste text */} + {tab === 'text' && ( +