Files

371 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Compliance analysis pipeline helpers.
All functions are synchronous — call them via asyncio.to_thread() in async SSE generators.
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import tempfile
from typing import TYPE_CHECKING
from loguru import logger
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
# Shared retry policy for LLM calls: 3 attempts, exponential back-off 14 s.
_llm_retry = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((ValueError, TimeoutError, ConnectionError)),
reraise=True,
)
if TYPE_CHECKING:
from app.application.knowledge import KnowledgeRetrievalService
from app.domain.retrieval import RetrievedChunk
from app.domain.compliance.ports import AnalysisRecord, FindingRecord
from app.services.llm.base_client import BaseLLMClient
def _extract_json(text: str):
"""Extract JSON from LLM response, tolerating markdown wrappers."""
stripped = text.strip()
match = re.search(r"```(?:json)?\s*([\s\S]*?)```", stripped)
if match:
stripped = match.group(1).strip()
try:
return json.loads(stripped)
except json.JSONDecodeError:
pass
for pattern in (r"(\[[\s\S]*\])", r"(\{[\s\S]*\})"):
m = re.search(pattern, stripped)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
continue
raise ValueError(f"No valid JSON found in LLM response: {text[:300]}")
def extract_text_from_doc_id(doc_id: str) -> str:
from app.shared.bootstrap import get_document_query_service, get_retrieval_service
doc = get_document_query_service().get(doc_id)
if not doc:
raise ValueError(f"Document '{doc_id}' not found")
service = get_retrieval_service()
chunks = service.retrieve(query=doc.doc_name, top_k=30)
doc_chunks = [c for c in chunks if c.doc_id == doc_id]
if not doc_chunks:
doc_chunks = chunks[:15]
return "\n\n".join(c.text for c in doc_chunks[:15])
def extract_text_from_file(content: bytes, filename: str) -> str:
from app.shared.bootstrap import get_document_command_service
suffix = os.path.splitext(filename or "doc.pdf")[1] or ".pdf"
tmp_path = ""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(content)
tmp_path = tmp.name
service = get_document_command_service()
parsed = service.parser.parse(file_path=tmp_path, doc_id="tmp_analysis", doc_name=filename)
if parsed.raw_text:
return parsed.raw_text[:4000]
return "\n".join(
b.get("text", "") for b in parsed.semantic_blocks[:30] if b.get("text")
)[:4000]
except Exception as exc:
logger.warning("File text extraction failed: {}", exc)
return ""
finally:
if tmp_path:
try: os.unlink(tmp_path)
except OSError: pass
def split_into_clauses(text: str, client: "BaseLLMClient") -> list[str]:
prompt = (
"You are a compliance analysis expert. Split the following text into 3-8 "
"semantically complete compliance clauses. Each clause should be an independent "
"compliance requirement or technical statement.\n"
"Return as JSON array of strings, e.g.:\n"
'["Clause one...", "Clause two..."]\n'
"Return ONLY the JSON array.\n\n"
f"Text:\n{text[:2000]}"
)
response = client.chat([{"role": "user", "content": prompt}], max_tokens=1000)
if response.is_success:
try:
result = _extract_json(response.content)
if isinstance(result, list):
clauses = [str(c).strip() for c in result if str(c).strip()]
if clauses:
return clauses[:8]
except (ValueError, TypeError):
logger.warning("Clause split JSON parse failed, using fallback")
sentences = re.split(r"[.?!;\n]+", text)
return [s.strip() for s in sentences if len(s.strip()) > 20][:6]
def retrieve_for_clause(
clause: str,
retrieval_service: "KnowledgeRetrievalService",
top_k: int = 5,
domains: str | None = None,
) -> list["RetrievedChunk"]:
return retrieval_service.retrieve(query=clause, top_k=top_k, filters=domains)
def process_single_clause(
clause: str,
index: int,
retrieval_service: "KnowledgeRetrievalService",
client: "BaseLLMClient",
top_k: int = 5,
domains: str | None = None,
) -> dict:
"""Process one clause: retrieve relevant regulations then check compliance.
Returns a dict with keys: index, chunks, finding (may be None on LLM failure).
Designed to run inside asyncio.to_thread() for parallel execution.
"""
chunks = retrieve_for_clause(clause, retrieval_service, top_k, domains)
finding = check_clause_compliance(clause, chunks, client)
return {"index": index, "chunks": chunks, "finding": finding}
async def run_clauses_parallel(
clauses: list[str],
retrieval_service: "KnowledgeRetrievalService",
client: "BaseLLMClient",
top_k: int = 5,
domains: str | None = None,
) -> list[dict]:
"""Run all clauses through retrieve+gap-check in parallel.
Results are returned in the original clause order even though processing
is concurrent. Exceptions in individual clauses are caught and returned as
dicts with finding=None so the stream continues for remaining clauses.
Both retrieval_service and client must be thread-safe — they are shared
across all asyncio.to_thread() calls without locking.
"""
tasks = [
asyncio.to_thread(
process_single_clause,
clause, i, retrieval_service, client, top_k, domains,
)
for i, clause in enumerate(clauses)
]
raw = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, r in enumerate(raw):
if isinstance(r, Exception):
logger.warning("Clause {} processing failed: {}", i, r)
results.append({"index": i, "chunks": [], "finding": None})
else:
results.append(r)
return results
def check_clause_compliance(
clause: str,
chunks: list["RetrievedChunk"],
client: "BaseLLMClient",
) -> dict | None:
reg_context = "\n".join(
f"[{i+1}] {c.doc_title} {c.section_title or ''}: {c.text[:300]}"
for i, c in enumerate(chunks[:5])
) if chunks else "(no regulatory context retrieved)"
prompt = (
"You are a compliance expert. Judge whether the following business clause "
"complies with the retrieved regulations.\n\n"
f"Business clause:\n{clause}\n\n"
f"Retrieved regulations:\n{reg_context}\n\n"
"Return JSON:\n"
"{\n"
' "status": "ok" | "warn" | "risk",\n'
' "title": "Short finding title (max 30 chars)",\n'
' "desc": "Description (50-120 chars)",\n'
' "clause_ref": "Regulation clause reference e.g. Art.9.1 or Sec.3.1"\n'
"}\n"
"status: ok=compliant, warn=gap exists, risk=critical/missing\n"
"Return ONLY the JSON object."
)
def _do_check():
resp = client.chat([{"role": "user", "content": prompt}], max_tokens=500)
if not resp.is_success:
raise ValueError("LLM returned non-success for gap check")
return resp
try:
response = _llm_retry(_do_check)()
except Exception as exc:
logger.warning("check_clause_compliance LLM call failed after retries: {}", exc)
return None
try:
result = _extract_json(response.content)
if isinstance(result, dict) and "status" in result:
return {
"title": str(result.get("title", "Compliance finding")),
"desc": str(result.get("desc", "")),
"status": result.get("status", "info"),
"clause_ref": result.get("clause_ref"),
}
except (ValueError, TypeError) as exc:
logger.warning("Gap check JSON parse failed: {}", exc)
return None
def synthesize_conclusion(
para_text: str,
findings: list[dict],
client: "BaseLLMClient",
) -> dict:
if not findings:
return {
"conclusion": "No significant compliance gaps found. Continue monitoring regulation updates.",
"actions": [{"label": "Next action", "value": "Monitor regulation updates"}],
"risk_score": 10,
"highlight_terms": [],
"para_text": para_text[:800],
}
findings_text = "\n".join(
f"- [{f['status'].upper()}] {f['title']}: {f['desc']}"
for f in findings
)
prompt = (
"You are a compliance analysis expert. Generate a summary report "
"based on the following compliance findings.\n\n"
f"Original text (first 600 chars):\n{para_text[:600]}\n\n"
f"Findings:\n{findings_text}\n\n"
"Return JSON:\n"
"{\n"
' "conclusion": "Overall compliance conclusion (100-200 chars)",\n'
' "actions": [\n'
' {"label": "Action label", "value": "Description"},\n'
' {"label": "Priority", "value": "High/Medium/Low", "risk": true}\n'
' ],\n'
' "risk_score": 0-100 (integer, higher=riskier),\n'
' "highlight_terms": ["term1", "term2"], // up to 10 key technical/legal terms actually present in the text\n'
' "para_text": "Original text or summary (max 600 chars)"\n'
"}\n"
"Return ONLY the JSON object."
)
fallback = {
"conclusion": "Compliance analysis complete. Review findings and create remediation plan.",
"actions": [
{"label": "Next action", "value": "Review critical findings"},
{"label": "Escalation", "value": "Legal review required", "risk": True},
],
"risk_score": 60,
"highlight_terms": [],
"para_text": para_text[:800],
}
def _do_synthesize():
resp = client.chat([{"role": "user", "content": prompt}], max_tokens=1200)
if not resp.is_success:
raise ValueError("LLM returned non-success for synthesis")
return resp
try:
response = _llm_retry(_do_synthesize)()
except Exception as exc:
logger.warning("synthesize_conclusion LLM call failed after retries: {}", exc)
return fallback
try:
result = _extract_json(response.content)
if isinstance(result, dict):
return {
"conclusion": str(result.get("conclusion", fallback["conclusion"])),
"actions": result.get("actions", fallback["actions"]),
"risk_score": int(result.get("risk_score", 60)),
"highlight_terms": result.get("highlight_terms", []),
"para_text": str(result.get("para_text", para_text[:800])),
}
except (ValueError, TypeError) as exc:
logger.warning("Conclusion synthesis JSON parse failed: {}", exc)
return fallback
_SUGGESTION_FOCUS = {
"risk": "Focus on remediation steps, required certifications, and timeline to resolve.",
"warn": "Focus on identifying the specific compliance gap and how to close it.",
"ok": "Focus on maintaining compliance evidence and monitoring future changes.",
}
_SUGGESTION_FALLBACK = {
"risk": [
"What specific certifications or documents are required to remediate this finding?",
"What is the typical remediation timeline for this type of non-compliance?",
"Which regulation clause defines the exact requirement?",
],
"warn": [
"What is the exact gap between the current state and the requirement?",
"What evidence would demonstrate partial compliance?",
"Which regulation clause applies to this warning?",
],
"ok": [
"What documentation should be maintained to evidence this compliance?",
"How should this area be monitored as regulations evolve?",
"Are there related clauses that may affect this compliant area?",
],
}
def build_finding_context(finding: "FindingRecord", analysis: "AnalysisRecord") -> str:
"""Build a grounded system context string for a finding chat thread.
Combines finding details with analysis metadata so the LLM has full
context without relying on the frontend to pass segment_context.
"""
return (
f"Document: {analysis.doc_name}\n"
f"Standard: {analysis.standard_name}\n"
f"Finding [{finding.seq + 1}]: {finding.title}\n"
f"Status: {finding.status}\n"
f"Clause reference: {finding.clause_ref or 'N/A'}\n"
f"Description: {finding.description}\n"
f"Overall conclusion: {analysis.conclusion}\n"
)
def generate_suggestions(
finding: "FindingRecord",
analysis: "AnalysisRecord",
client: "BaseLLMClient",
) -> list[str]:
"""Generate 3 context-aware follow-up questions for a finding chat thread.
Returns exactly 3 question strings. Falls back to static templates on error.
"""
fallback = _SUGGESTION_FALLBACK.get(finding.status, _SUGGESTION_FALLBACK["warn"])
context = build_finding_context(finding, analysis)
focus = _SUGGESTION_FOCUS.get(finding.status, _SUGGESTION_FOCUS["warn"])
prompt = (
f"{context}\n\n"
f"Task: {focus}\n"
"Generate exactly 3 concise follow-up questions a compliance analyst would ask.\n"
'Return JSON: {"questions": ["question 1", "question 2", "question 3"]}\n'
"Return ONLY the JSON object."
)
response = client.chat([{"role": "user", "content": prompt}], max_tokens=300)
if not response.is_success:
return fallback
try:
result = _extract_json(response.content)
questions = result.get("questions", [])
if isinstance(questions, list) and len(questions) >= 3:
return [str(q) for q in questions[:3]]
except (ValueError, TypeError) as exc:
logger.warning("generate_suggestions JSON parse failed: {}", exc)
return fallback