"""Compliance analysis pipeline helpers. All functions are synchronous — call them via asyncio.to_thread() in async SSE generators. """ from __future__ import annotations import asyncio import json import os import re import tempfile from typing import TYPE_CHECKING from loguru import logger from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential # Shared retry policy for LLM calls: 3 attempts, exponential back-off 1–4 s. _llm_retry = retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=4), retry=retry_if_exception_type((ValueError, TimeoutError, ConnectionError)), reraise=True, ) if TYPE_CHECKING: from app.application.knowledge import KnowledgeRetrievalService from app.domain.retrieval import RetrievedChunk from app.domain.compliance.ports import AnalysisRecord, FindingRecord from app.services.llm.base_client import BaseLLMClient def _extract_json(text: str): """Extract JSON from LLM response, tolerating markdown wrappers.""" stripped = text.strip() match = re.search(r"```(?:json)?\s*([\s\S]*?)```", stripped) if match: stripped = match.group(1).strip() try: return json.loads(stripped) except json.JSONDecodeError: pass for pattern in (r"(\[[\s\S]*\])", r"(\{[\s\S]*\})"): m = re.search(pattern, stripped) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: continue raise ValueError(f"No valid JSON found in LLM response: {text[:300]}") def extract_text_from_doc_id(doc_id: str) -> str: from app.shared.bootstrap import get_document_query_service, get_retrieval_service doc = get_document_query_service().get(doc_id) if not doc: raise ValueError(f"Document '{doc_id}' not found") service = get_retrieval_service() chunks = service.retrieve(query=doc.doc_name, top_k=30) doc_chunks = [c for c in chunks if c.doc_id == doc_id] if not doc_chunks: doc_chunks = chunks[:15] return "\n\n".join(c.text for c in doc_chunks[:15]) def extract_text_from_file(content: bytes, filename: str) -> str: from app.shared.bootstrap import get_document_command_service suffix = os.path.splitext(filename or "doc.pdf")[1] or ".pdf" tmp_path = "" try: with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(content) tmp_path = tmp.name service = get_document_command_service() parsed = service.parser.parse(file_path=tmp_path, doc_id="tmp_analysis", doc_name=filename) if parsed.raw_text: return parsed.raw_text[:4000] return "\n".join( b.get("text", "") for b in parsed.semantic_blocks[:30] if b.get("text") )[:4000] except Exception as exc: logger.warning("File text extraction failed: {}", exc) return "" finally: if tmp_path: try: os.unlink(tmp_path) except OSError: pass def split_into_clauses(text: str, client: "BaseLLMClient") -> list[str]: prompt = ( "You are a compliance analysis expert. Split the following text into 3-8 " "semantically complete compliance clauses. Each clause should be an independent " "compliance requirement or technical statement.\n" "Return as JSON array of strings, e.g.:\n" '["Clause one...", "Clause two..."]\n' "Return ONLY the JSON array.\n\n" f"Text:\n{text[:2000]}" ) response = client.chat([{"role": "user", "content": prompt}], max_tokens=1000) if response.is_success: try: result = _extract_json(response.content) if isinstance(result, list): clauses = [str(c).strip() for c in result if str(c).strip()] if clauses: return clauses[:8] except (ValueError, TypeError): logger.warning("Clause split JSON parse failed, using fallback") sentences = re.split(r"[.?!;\n]+", text) return [s.strip() for s in sentences if len(s.strip()) > 20][:6] def retrieve_for_clause( clause: str, retrieval_service: "KnowledgeRetrievalService", top_k: int = 5, domains: str | None = None, ) -> list["RetrievedChunk"]: return retrieval_service.retrieve(query=clause, top_k=top_k, filters=domains) def process_single_clause( clause: str, index: int, retrieval_service: "KnowledgeRetrievalService", client: "BaseLLMClient", top_k: int = 5, domains: str | None = None, ) -> dict: """Process one clause: retrieve relevant regulations then check compliance. Returns a dict with keys: index, chunks, finding (may be None on LLM failure). Designed to run inside asyncio.to_thread() for parallel execution. """ chunks = retrieve_for_clause(clause, retrieval_service, top_k, domains) finding = check_clause_compliance(clause, chunks, client) return {"index": index, "chunks": chunks, "finding": finding} async def run_clauses_parallel( clauses: list[str], retrieval_service: "KnowledgeRetrievalService", client: "BaseLLMClient", top_k: int = 5, domains: str | None = None, ) -> list[dict]: """Run all clauses through retrieve+gap-check in parallel. Results are returned in the original clause order even though processing is concurrent. Exceptions in individual clauses are caught and returned as dicts with finding=None so the stream continues for remaining clauses. Both retrieval_service and client must be thread-safe — they are shared across all asyncio.to_thread() calls without locking. """ tasks = [ asyncio.to_thread( process_single_clause, clause, i, retrieval_service, client, top_k, domains, ) for i, clause in enumerate(clauses) ] raw = await asyncio.gather(*tasks, return_exceptions=True) results = [] for i, r in enumerate(raw): if isinstance(r, Exception): logger.warning("Clause {} processing failed: {}", i, r) results.append({"index": i, "chunks": [], "finding": None}) else: results.append(r) return results def check_clause_compliance( clause: str, chunks: list["RetrievedChunk"], client: "BaseLLMClient", ) -> dict | None: reg_context = "\n".join( f"[{i+1}] {c.doc_title} {c.section_title or ''}: {c.text[:300]}" for i, c in enumerate(chunks[:5]) ) if chunks else "(no regulatory context retrieved)" prompt = ( "You are a compliance expert. Judge whether the following business clause " "complies with the retrieved regulations.\n\n" f"Business clause:\n{clause}\n\n" f"Retrieved regulations:\n{reg_context}\n\n" "Return JSON:\n" "{\n" ' "status": "ok" | "warn" | "risk",\n' ' "title": "Short finding title (max 30 chars)",\n' ' "desc": "Description (50-120 chars)",\n' ' "clause_ref": "Regulation clause reference e.g. Art.9.1 or Sec.3.1"\n' "}\n" "status: ok=compliant, warn=gap exists, risk=critical/missing\n" "Return ONLY the JSON object." ) def _do_check(): resp = client.chat([{"role": "user", "content": prompt}], max_tokens=500) if not resp.is_success: raise ValueError("LLM returned non-success for gap check") return resp try: response = _llm_retry(_do_check)() except Exception as exc: logger.warning("check_clause_compliance LLM call failed after retries: {}", exc) return None try: result = _extract_json(response.content) if isinstance(result, dict) and "status" in result: return { "title": str(result.get("title", "Compliance finding")), "desc": str(result.get("desc", "")), "status": result.get("status", "info"), "clause_ref": result.get("clause_ref"), } except (ValueError, TypeError) as exc: logger.warning("Gap check JSON parse failed: {}", exc) return None def synthesize_conclusion( para_text: str, findings: list[dict], client: "BaseLLMClient", ) -> dict: if not findings: return { "conclusion": "No significant compliance gaps found. Continue monitoring regulation updates.", "actions": [{"label": "Next action", "value": "Monitor regulation updates"}], "risk_score": 10, "highlight_terms": [], "para_text": para_text[:800], } findings_text = "\n".join( f"- [{f['status'].upper()}] {f['title']}: {f['desc']}" for f in findings ) prompt = ( "You are a compliance analysis expert. Generate a summary report " "based on the following compliance findings.\n\n" f"Original text (first 600 chars):\n{para_text[:600]}\n\n" f"Findings:\n{findings_text}\n\n" "Return JSON:\n" "{\n" ' "conclusion": "Overall compliance conclusion (100-200 chars)",\n' ' "actions": [\n' ' {"label": "Action label", "value": "Description"},\n' ' {"label": "Priority", "value": "High/Medium/Low", "risk": true}\n' ' ],\n' ' "risk_score": 0-100 (integer, higher=riskier),\n' ' "highlight_terms": ["term1", "term2"], // up to 10 key technical/legal terms actually present in the text\n' ' "para_text": "Original text or summary (max 600 chars)"\n' "}\n" "Return ONLY the JSON object." ) fallback = { "conclusion": "Compliance analysis complete. Review findings and create remediation plan.", "actions": [ {"label": "Next action", "value": "Review critical findings"}, {"label": "Escalation", "value": "Legal review required", "risk": True}, ], "risk_score": 60, "highlight_terms": [], "para_text": para_text[:800], } def _do_synthesize(): resp = client.chat([{"role": "user", "content": prompt}], max_tokens=1200) if not resp.is_success: raise ValueError("LLM returned non-success for synthesis") return resp try: response = _llm_retry(_do_synthesize)() except Exception as exc: logger.warning("synthesize_conclusion LLM call failed after retries: {}", exc) return fallback try: result = _extract_json(response.content) if isinstance(result, dict): return { "conclusion": str(result.get("conclusion", fallback["conclusion"])), "actions": result.get("actions", fallback["actions"]), "risk_score": int(result.get("risk_score", 60)), "highlight_terms": result.get("highlight_terms", []), "para_text": str(result.get("para_text", para_text[:800])), } except (ValueError, TypeError) as exc: logger.warning("Conclusion synthesis JSON parse failed: {}", exc) return fallback _SUGGESTION_FOCUS = { "risk": "Focus on remediation steps, required certifications, and timeline to resolve.", "warn": "Focus on identifying the specific compliance gap and how to close it.", "ok": "Focus on maintaining compliance evidence and monitoring future changes.", } _SUGGESTION_FALLBACK = { "risk": [ "What specific certifications or documents are required to remediate this finding?", "What is the typical remediation timeline for this type of non-compliance?", "Which regulation clause defines the exact requirement?", ], "warn": [ "What is the exact gap between the current state and the requirement?", "What evidence would demonstrate partial compliance?", "Which regulation clause applies to this warning?", ], "ok": [ "What documentation should be maintained to evidence this compliance?", "How should this area be monitored as regulations evolve?", "Are there related clauses that may affect this compliant area?", ], } def build_finding_context(finding: "FindingRecord", analysis: "AnalysisRecord") -> str: """Build a grounded system context string for a finding chat thread. Combines finding details with analysis metadata so the LLM has full context without relying on the frontend to pass segment_context. """ return ( f"Document: {analysis.doc_name}\n" f"Standard: {analysis.standard_name}\n" f"Finding [{finding.seq + 1}]: {finding.title}\n" f"Status: {finding.status}\n" f"Clause reference: {finding.clause_ref or 'N/A'}\n" f"Description: {finding.description}\n" f"Overall conclusion: {analysis.conclusion}\n" ) def generate_suggestions( finding: "FindingRecord", analysis: "AnalysisRecord", client: "BaseLLMClient", ) -> list[str]: """Generate 3 context-aware follow-up questions for a finding chat thread. Returns exactly 3 question strings. Falls back to static templates on error. """ fallback = _SUGGESTION_FALLBACK.get(finding.status, _SUGGESTION_FALLBACK["warn"]) context = build_finding_context(finding, analysis) focus = _SUGGESTION_FOCUS.get(finding.status, _SUGGESTION_FOCUS["warn"]) prompt = ( f"{context}\n\n" f"Task: {focus}\n" "Generate exactly 3 concise follow-up questions a compliance analyst would ask.\n" 'Return JSON: {"questions": ["question 1", "question 2", "question 3"]}\n' "Return ONLY the JSON object." ) response = client.chat([{"role": "user", "content": prompt}], max_tokens=300) if not response.is_success: return fallback try: result = _extract_json(response.content) questions = result.get("questions", []) if isinstance(questions, list) and len(questions) >= 3: return [str(q) for q in questions[:3]] except (ValueError, TypeError) as exc: logger.warning("generate_suggestions JSON parse failed: {}", exc) return fallback