"""智能分块器 - 章节级+条款级双粒度切割""" import re from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, field from loguru import logger @dataclass class ChunkMetadata: """分块元数据""" doc_id: str = "" doc_name: str = "" chunk_id: str = "" section_number: str = "" # 章节编号(如 "第一章") section_title: str = "" # 章节标题 clause_number: str = "" # 条款编号(如 "第一条") page_number: int = 0 start_position: int = 0 # 在原文中的起始位置 end_position: int = 0 # 在原文中的结束位置 regulation_type: str = "" # 法规类型 version: str = "" @dataclass class TextChunk: """文本分块""" content: str metadata: ChunkMetadata token_count: int = 0 # 估算的token数量 class RegulationChunker: """ 法规文档智能分块器 实现章节级/条款级双粒度切割,适配国标GB文档结构: - 国标文档通常有明确的层级结构:章 > 节 > 条 - 每个条款应作为一个独立的语义单元 - 保留条款完整性,避免跨条款截断 """ # 法规标题模式 CHAPTER_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+章\s+[^\n]+') SECTION_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+节\s+[^\n]+') CLAUSE_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+条\s') # 条款子项模式 SUB_ITEM_PATTERN = re.compile(r'^[\((][一二三四五六七八九十]+[\))]\s') NUMBER_ITEM_PATTERN = re.compile(r'^[\d]+[\.、]\s') def __init__( self, chunk_size: int = 512, chunk_overlap: int = 50, max_chunk_size: int = 2048, min_chunk_size: int = 100 ): """ 初始化分块器 Args: chunk_size: 默认分块大小(字符数) chunk_overlap: 分块重叠大小 max_chunk_size: 最大分块大小(防止单个条款过长) min_chunk_size: 最小分块大小(防止碎片化) """ self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.max_chunk_size = max_chunk_size self.min_chunk_size = min_chunk_size def chunk_document( self, markdown_text: str, doc_id: str = "", doc_name: str = "", regulation_type: str = "", version: str = "" ) -> List[TextChunk]: """ 对法规文档进行智能分块 Args: markdown_text: Markdown格式的文档内容 doc_id: 文档ID doc_name: 文档名称 regulation_type: 法规类型 version: 文档版本 Returns: List[TextChunk]: 分块列表 """ logger.info(f"开始分块文档: {doc_name}") # 1. 按章节分割(一级分块) sections = self._split_by_sections(markdown_text) # 2. 在每个章节内按条款分割(二级分块) chunks = [] global_position = 0 for section_num, section_title, section_content, section_start in sections: # 在章节内按条款分割 clause_chunks = self._split_by_clauses( section_content, section_num, section_title, section_start + global_position ) for chunk_content, clause_num, clause_title, start_pos, end_pos in clause_chunks: # 处理过长的条款(进一步细分) if len(chunk_content) > self.max_chunk_size: sub_chunks = self._split_long_clause( chunk_content, clause_num, clause_title ) for sub_content, sub_start, sub_end in sub_chunks: chunk = self._create_chunk( sub_content, doc_id, doc_name, section_num, section_title, clause_num, sub_start + start_pos, sub_end + start_pos, regulation_type, version ) chunks.append(chunk) else: chunk = self._create_chunk( chunk_content, doc_id, doc_name, section_num, section_title, clause_num, start_pos, end_pos, regulation_type, version ) chunks.append(chunk) logger.success(f"分块完成,共{len(chunks)}个chunk") return chunks def _split_by_sections(self, markdown_text: str) -> List[Tuple[str, str, str, int]]: """ 按章节分割文档 Returns: List of (section_number, section_title, section_content, start_position) """ sections = [] lines = markdown_text.split('\n') current_section_num = "" current_section_title = "" current_section_content = [] current_section_start = 0 for i, line in enumerate(lines): # 检测章节标题 chapter_match = self.CHAPTER_PATTERN.match(line.strip()) section_match = self.SECTION_PATTERN.match(line.strip()) if chapter_match or section_match: # 保存上一个章节 if current_section_content: content = '\n'.join(current_section_content) sections.append(( current_section_num, current_section_title, content, current_section_start )) # 开始新章节 current_section_start = sum(len(l) + 1 for l in lines[:i]) current_section_content = [] if chapter_match: current_section_num = line.strip() current_section_title = self._extract_title(line.strip()) else: current_section_num = line.strip() current_section_title = self._extract_title(line.strip()) current_section_content.append(line) # 保存最后一个章节 if current_section_content: content = '\n'.join(current_section_content) sections.append(( current_section_num, current_section_title, content, current_section_start )) # 如果没有检测到章节,将整个文档作为一个大章节 if not sections: sections.append(( "", "全文", markdown_text, 0 )) return sections def _split_by_clauses( self, section_content: str, section_num: str, section_title: str, section_start: int ) -> List[Tuple[str, str, str, int, int]]: """ 在章节内按条款分割 Returns: List of (content, clause_number, clause_title, start_position, end_position) """ clauses = [] lines = section_content.split('\n') current_clause_num = "" current_clause_title = "" current_clause_content = [] current_clause_start = section_start for i, line in enumerate(lines): # 检测条款标题 clause_match = self.CLAUSE_PATTERN.match(line.strip()) if clause_match: # 保存上一个条款 if current_clause_content: content = '\n'.join(current_clause_content) end_pos = current_clause_start + len(content) clauses.append(( content, current_clause_num, current_clause_title, current_clause_start, end_pos )) # 开始新条款 current_clause_start = section_start + sum(len(l) + 1 for l in lines[:i]) current_clause_content = [] current_clause_num = self._extract_clause_number(line.strip()) current_clause_title = line.strip() current_clause_content.append(line) # 保存最后一个条款 if current_clause_content: content = '\n'.join(current_clause_content) end_pos = current_clause_start + len(content) clauses.append(( content, current_clause_num, current_clause_title, current_clause_start, end_pos )) # 如果没有检测到条款,将整个章节作为一个条款 if not clauses: clauses.append(( section_content, "", section_title, section_start, section_start + len(section_content) )) return clauses def _split_long_clause( self, content: str, clause_num: str, clause_title: str ) -> List[Tuple[str, int, int]]: """ 分割过长的条款内容 按条款子项或段落分割,保持语义完整性 """ sub_chunks = [] lines = content.split('\n') # 检测是否有子项结构 has_sub_items = any( self.SUB_ITEM_PATTERN.match(line.strip()) or self.NUMBER_ITEM_PATTERN.match(line.strip()) for line in lines ) if has_sub_items: # 按子项分割 current_sub_content = [] current_sub_start = 0 for i, line in enumerate(lines): is_sub_item = ( self.SUB_ITEM_PATTERN.match(line.strip()) or self.NUMBER_ITEM_PATTERN.match(line.strip()) ) if is_sub_item and current_sub_content: sub_content = '\n'.join(current_sub_content) sub_end = current_sub_start + len(sub_content) if len(sub_content) >= self.min_chunk_size: sub_chunks.append((sub_content, current_sub_start, sub_end)) current_sub_content = [] current_sub_start = sum(len(l) + 1 for l in lines[:i]) current_sub_content.append(line) # 保存最后一个子项 if current_sub_content: sub_content = '\n'.join(current_sub_content) sub_end = current_sub_start + len(sub_content) sub_chunks.append((sub_content, current_sub_start, sub_end)) else: # 按段落分割(滑动窗口) paragraphs = [] current_para = [] for line in lines: if line.strip(): current_para.append(line) else: if current_para: paragraphs.append('\n'.join(current_para)) current_para = [] if current_para: paragraphs.append('\n'.join(current_para)) # 合并段落直到达到chunk_size current_chunk = [] current_length = 0 chunk_start = 0 for para in paragraphs: if current_length + len(para) > self.chunk_size and current_chunk: chunk_content = '\n'.join(current_chunk) chunk_end = chunk_start + len(chunk_content) sub_chunks.append((chunk_content, chunk_start, chunk_end)) current_chunk = [] current_length = 0 chunk_start = chunk_end current_chunk.append(para) current_length += len(para) # 保存最后一个chunk if current_chunk: chunk_content = '\n'.join(current_chunk) chunk_end = chunk_start + len(chunk_content) sub_chunks.append((chunk_content, chunk_start, chunk_end)) return sub_chunks def _extract_title(self, header_line: str) -> str: """从标题行提取标题内容""" # 移除"第X章"、"第X节"前缀 title = re.sub(r'^第[一二三四五六七八九十百]+[章节]\s+', '', header_line) return title.strip() def _extract_clause_number(self, clause_line: str) -> str: """从条款行提取条款编号""" match = self.CLAUSE_PATTERN.match(clause_line) if match: return match.group(0).strip() return "" def _create_chunk( self, content: str, doc_id: str, doc_name: str, section_num: str, section_title: str, clause_num: str, start_pos: int, end_pos: int, regulation_type: str, version: str ) -> TextChunk: """创建文本分块""" # 清理内容 content = content.strip() # 计算估算token数(中文约1.5字符/token) token_count = int(len(content) * 0.7) # 简化估算 # 生成chunk_id chunk_id = f"{doc_id}_{section_num}_{clause_num}_{start_pos}" metadata = ChunkMetadata( doc_id=doc_id, doc_name=doc_name, chunk_id=chunk_id, section_number=section_num, section_title=section_title, clause_number=clause_num, start_position=start_pos, end_position=end_pos, regulation_type=regulation_type, version=version ) return TextChunk( content=content, metadata=metadata, token_count=token_count ) def chunk_regulation_document( markdown_text: str, doc_id: str = "", doc_name: str = "", regulation_type: str = "", version: str = "", chunk_size: int = 512 ) -> List[TextChunk]: """便捷函数:对法规文档进行分块""" chunker = RegulationChunker(chunk_size=chunk_size) return chunker.chunk_document( markdown_text, doc_id, doc_name, regulation_type, version )