diff --git a/scripts/05_init_db.sh b/scripts/05_init_db.sh index 73539b3..faaabb2 100644 --- a/scripts/05_init_db.sh +++ b/scripts/05_init_db.sh @@ -38,42 +38,80 @@ info "Step 2/3:初始化 Milvus Collections..." if docker compose ps milvus | grep -q "healthy"; then docker compose run --rm --no-deps compliance-backend \ python3 -c " -import asyncio from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility connections.connect(host='milvus', port='19530') print('Milvus 连接成功') -def create_collection(name, description): +def create_regulation_chunks(): + ''' + regulation_chunks:法规专用 collection,带 sparse 向量和条款号字段。 + 开发阶段每次运行均强制重建(保持 schema 最新)。 + ''' + name = 'regulation_chunks' + if utility.has_collection(name): + utility.drop_collection(name) + print(f' 已删除旧 collection {name}(schema 升级)') + + fields = [ + FieldSchema(name='pk', dtype=DataType.VARCHAR, is_primary=True, max_length=128), + FieldSchema(name='file_id', dtype=DataType.VARCHAR, max_length=128), + FieldSchema(name='workspace_id', dtype=DataType.VARCHAR, max_length=128), + FieldSchema(name='chunk_idx', dtype=DataType.INT64), + FieldSchema(name='content', dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name='dense_vec', dtype=DataType.FLOAT_VECTOR, dim=1024), + FieldSchema(name='sparse_vec', dtype=DataType.SPARSE_FLOAT_VECTOR), + FieldSchema(name='clause_no', dtype=DataType.VARCHAR, max_length=64, default_value=''), + FieldSchema(name='article_no', dtype=DataType.VARCHAR, max_length=128, default_value=''), + FieldSchema(name='regulation_name', dtype=DataType.VARCHAR, max_length=512, default_value=''), + FieldSchema(name='metadata', dtype=DataType.JSON), + ] + schema = CollectionSchema(fields, description='法规条款向量库(含稀疏向量和条款号)') + col = Collection(name, schema) + + col.create_index('dense_vec', { + 'metric_type': 'COSINE', + 'index_type': 'HNSW', + 'params': {'M': 16, 'efConstruction': 200} + }) + col.create_index('sparse_vec', { + 'metric_type': 'IP', + 'index_type': 'SPARSE_INVERTED_INDEX', + 'params': {'drop_ratio_build': 0.2} + }) + col.load() + print(f' Collection {name} 创建完成(dense+sparse 双索引)') + + +def create_simple_collection(name, description): + '''doc_chunks / case_library:通用 collection,仅 dense 向量。''' if utility.has_collection(name): print(f' Collection {name} 已存在,跳过') return fields = [ - FieldSchema(name='id', dtype=DataType.VARCHAR, is_primary=True, max_length=128), - FieldSchema(name='file_id', dtype=DataType.VARCHAR, max_length=128), + FieldSchema(name='pk', dtype=DataType.VARCHAR, is_primary=True, max_length=128), + FieldSchema(name='file_id', dtype=DataType.VARCHAR, max_length=128), FieldSchema(name='workspace_id', dtype=DataType.VARCHAR, max_length=128), - FieldSchema(name='chunk_idx', dtype=DataType.INT64), - FieldSchema(name='content', dtype=DataType.VARCHAR, max_length=65535), - FieldSchema(name='dense_vec', dtype=DataType.FLOAT_VECTOR, dim=1024), # BGE-M3 dense - FieldSchema(name='metadata', dtype=DataType.JSON), + FieldSchema(name='chunk_idx', dtype=DataType.INT64), + FieldSchema(name='content', dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name='dense_vec', dtype=DataType.FLOAT_VECTOR, dim=1024), + FieldSchema(name='metadata', dtype=DataType.JSON), ] schema = CollectionSchema(fields, description=description) col = Collection(name, schema) - - # 创建向量索引(HNSW,适合调研阶段) - index_params = { + col.create_index('dense_vec', { 'metric_type': 'COSINE', 'index_type': 'HNSW', 'params': {'M': 16, 'efConstruction': 200} - } - col.create_index('dense_vec', index_params) + }) col.load() print(f' Collection {name} 创建完成') -create_collection('regulation_chunks', '法规条款向量库') -create_collection('doc_chunks', '企业文档向量库') -create_collection('case_library', '行业案例库') + +create_regulation_chunks() +create_simple_collection('doc_chunks', '企业文档向量库') +create_simple_collection('case_library', '行业案例库') print('Milvus 初始化完成') " 2>&1 diff --git a/services/compliance-backend/app/core/llm.py b/services/compliance-backend/app/core/llm.py index 1a99b89..5243c69 100644 --- a/services/compliance-backend/app/core/llm.py +++ b/services/compliance-backend/app/core/llm.py @@ -30,10 +30,13 @@ RAG_SYSTEM_PROMPT = """你是一位专业的汽车行业合规专家,具备深 回答规则: 1. 仅基于提供的参考文献回答,不添加不在文献中的信息 -2. 每个关键陈述必须标注来源(格式:[来源:文件名,第X页]) +2. 每个关键陈述必须标注来源,优先使用条款号格式:【法规名称·第X条】 + 若无条款号则使用:【法规名称·页码X】 3. 如果参考文献不足以回答问题,明确说明 4. 使用专业但清晰的语言,适合工程师和法务人员阅读 -5. 对于数值要求(如绝缘电阻值、时间限制等),精确引用原文""" +5. 对于数值要求(如绝缘电阻值、时间限制等),精确引用原文 + +引用格式示例:根据《道路运输车辆动态监督管理办法》【第十五条】规定,道路运输经营者应当……""" COMPLIANCE_CHECK_PROMPT = """你是一位专业的汽车合规审查专家。 diff --git a/services/compliance-backend/app/services/rag.py b/services/compliance-backend/app/services/rag.py index c85e0ef..fdfaad1 100644 --- a/services/compliance-backend/app/services/rag.py +++ b/services/compliance-backend/app/services/rag.py @@ -32,19 +32,25 @@ async def hybrid_search( param={"metric_type": "COSINE", "params": {"ef": 100}}, limit=top_k, expr=expr, - output_fields=["content", "metadata", "file_id", "chunk_idx"], + output_fields=[ + "content", "metadata", "file_id", "chunk_idx", + "clause_no", "article_no", "regulation_name", + ], ) chunks = [] for hits in results: for hit in hits: chunks.append({ - "id": hit.id, - "content": hit.entity.get("content", ""), - "score": float(hit.score), - "file_id": hit.entity.get("file_id", ""), - "chunk_idx": hit.entity.get("chunk_idx", 0), - "metadata": hit.entity.get("metadata", {}), + "id": hit.id, + "content": hit.entity.get("content", ""), + "score": float(hit.score), + "file_id": hit.entity.get("file_id", ""), + "chunk_idx": hit.entity.get("chunk_idx", 0), + "clause_no": hit.entity.get("clause_no", ""), + "article_no": hit.entity.get("article_no", ""), + "regulation_name": hit.entity.get("regulation_name", ""), + "metadata": hit.entity.get("metadata", {}), }) return chunks @@ -59,12 +65,13 @@ async def generate_answer(query: str, chunks: list[dict]) -> dict: if not chunks: return {"answer": "未找到相关法规内容,请上传相关法规文档后重试。", "sources": []} - # 构建 RAG 上下文 + # 构建 RAG 上下文(带法规名称和条款号,方便 LLM 生成引文锚定) context_parts = [] for i, chunk in enumerate(chunks, 1): - meta = chunk.get("metadata", {}) - source_info = f"[来源 {i}:{meta.get('filename', '未知文件')},第 {meta.get('page', '?')} 页]" - context_parts.append(f"{source_info}\n{chunk['content']}") + reg_name = chunk.get("regulation_name", "") or chunk.get("metadata", {}).get("filename", "未知文件") + clause_no = chunk.get("clause_no", "") + source_tag = f"[{reg_name}·{clause_no}]" if clause_no else f"[{reg_name}]" + context_parts.append(f"{source_tag}\n{chunk['content']}") context = "\n\n---\n\n".join(context_parts) user_prompt = f"参考文献:\n\n{context}\n\n问题:{query}\n\n请基于以上参考文献回答,并标注来源。" @@ -81,11 +88,14 @@ async def generate_answer(query: str, chunks: list[dict]) -> dict: sources = [ { - "content": c["content"][:300], - "file_id": c.get("file_id", ""), - "chunk_idx": c.get("chunk_idx", 0), - "score": c.get("score", 0), - "metadata": c.get("metadata", {}), + "content": c["content"][:300], + "file_id": c.get("file_id", ""), + "chunk_idx": c.get("chunk_idx", 0), + "score": c.get("score", 0), + "clause_no": c.get("clause_no", ""), + "article_no": c.get("article_no", ""), + "regulation_name": c.get("regulation_name", ""), + "metadata": c.get("metadata", {}), } for c in chunks ] diff --git a/services/compliance-backend/app/services/regulation_parser.py b/services/compliance-backend/app/services/regulation_parser.py new file mode 100644 index 0000000..519f4f3 --- /dev/null +++ b/services/compliance-backend/app/services/regulation_parser.py @@ -0,0 +1,205 @@ +import re +from dataclasses import dataclass + +# ── 正则模式 ──────────────────────────────────────────── +_CHAPTER = re.compile( + r'(?m)^(?:#{1,3}\s*)?(第[一二三四五六七八九十百零〇\d]+章\s*[^\n]{0,40})' +) +_CLAUSE = re.compile( + r'(?m)^(第[一二三四五六七八九十百零〇\d]+条)\s+(.{1,200})' +) +_DOC_NUM = re.compile( + r'[((]\d{4}[))][^\s(())]{0,10}第?\d+号' +) +_EFF_DATE = re.compile( + r'自(\d{4}年\d{1,2}月\d{1,2}日)起(?:施行|实施|执行)' +) +_ISSUER = re.compile( + r'^([一-鿿]{2,10}(?:部|局|委|厅|院|会|办)[一-鿿]{0,8}?)\s*$', + re.M, +) +_REG_TYPE_MAP = [ + ("法律", ["人民代表大会", "主席令", "全国人大常委会"]), + ("行政法规", ["国务院令", "国务院颁布"]), + ("部门规章", ["部令", "局令", "委令", "管理办法", "管理规定"]), + ("地方法规", ["省人民代表大会", "市人民代表大会", "自治区"]), + ("国家标准", ["GB ", "GB/T", "GBT", "国家标准"]), + ("行业标准", ["行业标准", "QC/T", "ISO ", "SAE "]), + ("规范指南", ["规范", "指南", "导则", "技术要求"]), +] + + +@dataclass +class RegulationMeta: + regulation_name: str = "" + issuing_authority: str = "" + doc_number: str = "" + effective_date: str = "" + regulation_type: str = "" + + +def extract_regulation_meta(markdown: str) -> RegulationMeta: + """ + 从解析后的 Markdown 中提取法规元数据。 + 仅扫描前 3000 字符以提升效率。 + """ + meta = RegulationMeta() + head = markdown[:3000] + + # 法规名称:取第一个 # 标题行 + title_m = re.search(r'^#{1,3}\s*(.+)$', head, re.M) + if title_m: + meta.regulation_name = title_m.group(1).strip().strip('《》') + + # 文号(括号内含年份+数字,如"(2023)第36号") + num_m = _DOC_NUM.search(head) + if num_m: + meta.doc_number = num_m.group(0) + + # 施行日期 + date_m = _EFF_DATE.search(head) + if date_m: + meta.effective_date = date_m.group(1) + + # 发布机关(前 500 字内独占一行的机关名称) + issuer_m = _ISSUER.search(markdown[:500]) + if issuer_m: + meta.issuing_authority = issuer_m.group(1) + + # 法规类型(关键词顺序匹配,取第一个命中) + for reg_type, keywords in _REG_TYPE_MAP: + if any(kw in head for kw in keywords): + meta.regulation_type = reg_type + break + + return meta + + +# ── 分块 ──────────────────────────────────────────────── + +def legal_chunk( + markdown: str, + meta: RegulationMeta, + chunk_size: int = 512, + overlap: int = 64, +) -> list[dict]: + """ + 法规专用分块:优先按条款(第X条)边界分割,保留条款号和章节信息。 + 超长条款按段落/句子二次分块。 + + 返回 list[dict],每个元素包含: + idx int chunk 序号(从 0 开始) + content str 分块文本 + clause_no str 条款号,如"第十二条"(无则空字符串) + article_no str 所在章,如"第三章 生产管理"(无则空字符串) + regulation_name str 法规名称(来自 meta) + """ + chars_per_chunk = chunk_size * 2 # 中文约 2 字符/token + chars_overlap = overlap * 2 + + segments = _split_by_structure(markdown) + + chunks: list[dict] = [] + current_chapter = "" + idx = 0 + + for seg in segments: + if seg["type"] == "chapter": + current_chapter = seg["heading"] + continue + + text = seg["content"] + clause = seg.get("clause_no", "") + + if not text.strip(): + continue + + if len(text) <= chars_per_chunk: + chunks.append(_make_chunk(idx, text, clause, current_chapter, meta)) + idx += 1 + else: + # 条款文本过长:二次字符分块 + sub_texts = _char_split(text, chars_per_chunk, chars_overlap) + for sub_i, sub_text in enumerate(sub_texts): + sub_clause = f"{clause}({sub_i + 1})" if clause else "" + chunks.append(_make_chunk(idx, sub_text, sub_clause, current_chapter, meta)) + idx += 1 + + # 如果结构化分块失败(无任何条款),降级为通用字符分块 + if not chunks: + for text in _char_split(markdown, chars_per_chunk, chars_overlap): + chunks.append(_make_chunk(idx, text, "", "", meta)) + idx += 1 + + return chunks + + +def _make_chunk(idx, content, clause_no, article_no, meta: RegulationMeta) -> dict: + return { + "idx": idx, + "content": content.strip(), + "clause_no": clause_no, + "article_no": article_no, + "regulation_name": meta.regulation_name, + } + + +def _split_by_structure(markdown: str) -> list[dict]: + """将 Markdown 按章标题和条款号拆分成有序段落列表。""" + segments: list[dict] = [] + lines = markdown.splitlines(keepends=True) + buffer: list[str] = [] + current_clause = "" + + def flush_buffer(): + content = "".join(buffer).strip() + if content: + segments.append({ + "type": "clause", + "clause_no": current_clause, + "content": content, + }) + + for line in lines: + stripped = line.strip() + + # 检测章标题 + chapter_m = _CHAPTER.match(stripped) + if chapter_m: + flush_buffer() + buffer.clear() + current_clause = "" # 章标题后重置条款号 + segments.append({"type": "chapter", "heading": chapter_m.group(1).strip()}) + continue + + # 检测条款起始(第X条 ...) + clause_m = _CLAUSE.match(stripped) + if clause_m: + flush_buffer() + buffer.clear() + current_clause = clause_m.group(1) # 如"第十二条" + buffer.append(line) + continue + + buffer.append(line) + + flush_buffer() + return segments + + +def _char_split(text: str, size: int, overlap: int) -> list[str]: + """按字符数分块,在段落/句子边界处截断。""" + results: list[str] = [] + start = 0 + while start < len(text): + end = min(start + size, len(text)) + # 尝试在语义边界回退截断点 + if end < len(text): + for sep in ("\n\n", "\n", "。", ";", ";", ".", " "): + pos = text.rfind(sep, start, end) + if pos > start: + end = pos + len(sep) + break + results.append(text[start:end]) + start = max(start + 1, end - overlap) + return results diff --git a/services/compliance-backend/app/worker.py b/services/compliance-backend/app/worker.py index 8c09687..bc37917 100644 --- a/services/compliance-backend/app/worker.py +++ b/services/compliance-backend/app/worker.py @@ -46,8 +46,9 @@ async def _process_file(file_id: str, task_id: str, workspace_id: str): from sqlalchemy import select from .core.deps import AsyncSessionLocal, get_milvus_collection from .models.db import File, Task - from .services.parse import parse_document, chunk_text + from .services.parse import parse_document from .services.embed import embed_texts + from .services.regulation_parser import extract_regulation_meta, legal_chunk async with AsyncSessionLocal() as db: # 查找文件记录 @@ -68,7 +69,7 @@ async def _process_file(file_id: str, task_id: str, workspace_id: str): task.progress = 10 await db.commit() - # Step 1:解析文档 + # Step 1:解析文档(调用 mcp-server) file_content = Path(file_record.storage_path).read_bytes() parse_result = await parse_document(file_content, file_record.original_name) markdown = parse_result.get("markdown", "") @@ -76,16 +77,27 @@ async def _process_file(file_id: str, task_id: str, workspace_id: str): if not markdown.strip(): raise ValueError("文档解析结果为空") + # Step 2:提取法规元数据(发布机关/文号/施行日期/法规类型) + reg_meta = extract_regulation_meta(markdown) file_record.status = "parsed" + file_record.metadata = { + "regulation_name": reg_meta.regulation_name, + "issuing_authority": reg_meta.issuing_authority, + "doc_number": reg_meta.doc_number, + "effective_date": reg_meta.effective_date, + "regulation_type": reg_meta.regulation_type, + "parser": parse_result.get("parser", ""), + "page_count": parse_result.get("page_count", 0), + } if task: task.progress = 40 await db.commit() - # Step 2:分块 - chunks = chunk_text(markdown, chunk_size=512, overlap=64) - logger.info(f"文件 {file_id} 分割为 {len(chunks)} 块") + # Step 3:法规专用分块(按章/条边界分割,保留条款号) + chunks = legal_chunk(markdown, reg_meta, chunk_size=512, overlap=64) + logger.info(f"文件 {file_id} 分割为 {len(chunks)} 块,法规:{reg_meta.regulation_name!r}") - # Step 3:向量化(分批处理) + # Step 4:向量化并写入 Milvus(分批处理) batch_size = 16 col = get_milvus_collection("regulation_chunks") @@ -93,16 +105,26 @@ async def _process_file(file_id: str, task_id: str, workspace_id: str): batch = chunks[i:i + batch_size] texts = [c["content"] for c in batch] embed_result = await embed_texts(texts, batch_size=batch_size) - dense_vecs = embed_result["dense"] + + dense_vecs = embed_result["dense"] # list[list[float]], 1024维 + sparse_vecs = embed_result.get("sparse", [{}] * len(batch)) # list[dict[str,float]] entities = [ - [f"{file_id}_{c['idx']}" for c in batch], - [file_id] * len(batch), - [workspace_id] * len(batch), - [c["idx"] for c in batch], - [c["content"] for c in batch], - dense_vecs, - [{"filename": file_record.original_name, "page": c.get("page", 0)} for c in batch], + [f"{file_id}_{c['idx']}" for c in batch], # pk + [file_id] * len(batch), # file_id + [workspace_id] * len(batch), # workspace_id + [c["idx"] for c in batch], # chunk_idx + [c["content"] for c in batch], # content + dense_vecs, # dense_vec + sparse_vecs, # sparse_vec + [c["clause_no"] for c in batch], # clause_no + [c["article_no"] for c in batch], # article_no + [c["regulation_name"] for c in batch], # regulation_name + [{ # metadata + "filename": file_record.original_name, + "page": c.get("page", 0), + "doc_number": reg_meta.doc_number, + } for c in batch], ] col.insert(entities) @@ -119,7 +141,7 @@ async def _process_file(file_id: str, task_id: str, workspace_id: str): task.progress = 100 task.completed_at = datetime.now(timezone.utc) await db.commit() - logger.info(f"文件 {file_id} 处理完成") + logger.info(f"文件 {file_id} 处理完成,共 {len(chunks)} 个向量块") except Exception as e: logger.error(f"文件 {file_id} 处理失败:{e}")