Files
AIRegulation-DocAnalysis/backend/app/services/embedding/text_chunker.py
2026-05-14 15:07:34 +08:00

449 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# src/services/embedding/text_chunker.py
"""智能分块器 - 章节级+条款级双粒度切割"""
import re
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
from loguru import logger
@dataclass
class ChunkMetadata:
"""分块元数据"""
doc_id: str = ""
doc_name: str = ""
chunk_id: str = ""
section_number: str = "" # 章节编号(如 "第一章"
section_title: str = "" # 章节标题
clause_number: str = "" # 条款编号(如 "第一条"
page_number: int = 0
start_position: int = 0 # 在原文中的起始位置
end_position: int = 0 # 在原文中的结束位置
regulation_type: str = "" # 法规类型
version: str = ""
@dataclass
class TextChunk:
"""文本分块"""
content: str
metadata: ChunkMetadata
token_count: int = 0 # 估算的token数量
class RegulationChunker:
"""
法规文档智能分块器
实现章节级/条款级双粒度切割适配国标GB文档结构
- 国标文档通常有明确的层级结构:章 > 节 > 条
- 每个条款应作为一个独立的语义单元
- 保留条款完整性,避免跨条款截断
"""
# 法规标题模式
CHAPTER_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+章\s+[^\n]+')
SECTION_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+节\s+[^\n]+')
CLAUSE_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+条\s')
# 条款子项模式
SUB_ITEM_PATTERN = re.compile(r'^[\(][一二三四五六七八九十]+[\)]\s')
NUMBER_ITEM_PATTERN = re.compile(r'^[\d]+[\.、]\s')
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 50,
max_chunk_size: int = 2048,
min_chunk_size: int = 100
):
"""
初始化分块器
Args:
chunk_size: 默认分块大小(字符数)
chunk_overlap: 分块重叠大小
max_chunk_size: 最大分块大小(防止单个条款过长)
min_chunk_size: 最小分块大小(防止碎片化)
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.max_chunk_size = max_chunk_size
self.min_chunk_size = min_chunk_size
def chunk_document(
self,
markdown_text: str,
doc_id: str = "",
doc_name: str = "",
regulation_type: str = "",
version: str = ""
) -> List[TextChunk]:
"""
对法规文档进行智能分块
Args:
markdown_text: Markdown格式的文档内容
doc_id: 文档ID
doc_name: 文档名称
regulation_type: 法规类型
version: 文档版本
Returns:
List[TextChunk]: 分块列表
"""
logger.info(f"开始分块文档: {doc_name}")
# 1. 按章节分割(一级分块)
sections = self._split_by_sections(markdown_text)
# 2. 在每个章节内按条款分割(二级分块)
chunks = []
global_position = 0
for section_num, section_title, section_content, section_start in sections:
# 在章节内按条款分割
clause_chunks = self._split_by_clauses(
section_content,
section_num,
section_title,
section_start + global_position
)
for chunk_content, clause_num, clause_title, start_pos, end_pos in clause_chunks:
# 处理过长的条款(进一步细分)
if len(chunk_content) > self.max_chunk_size:
sub_chunks = self._split_long_clause(
chunk_content,
clause_num,
clause_title
)
for sub_content, sub_start, sub_end in sub_chunks:
chunk = self._create_chunk(
sub_content,
doc_id,
doc_name,
section_num,
section_title,
clause_num,
sub_start + start_pos,
sub_end + start_pos,
regulation_type,
version
)
chunks.append(chunk)
else:
chunk = self._create_chunk(
chunk_content,
doc_id,
doc_name,
section_num,
section_title,
clause_num,
start_pos,
end_pos,
regulation_type,
version
)
chunks.append(chunk)
logger.success(f"分块完成,共{len(chunks)}个chunk")
return chunks
def _split_by_sections(self, markdown_text: str) -> List[Tuple[str, str, str, int]]:
"""
按章节分割文档
Returns:
List of (section_number, section_title, section_content, start_position)
"""
sections = []
lines = markdown_text.split('\n')
current_section_num = ""
current_section_title = ""
current_section_content = []
current_section_start = 0
for i, line in enumerate(lines):
# 检测章节标题
chapter_match = self.CHAPTER_PATTERN.match(line.strip())
section_match = self.SECTION_PATTERN.match(line.strip())
if chapter_match or section_match:
# 保存上一个章节
if current_section_content:
content = '\n'.join(current_section_content)
sections.append((
current_section_num,
current_section_title,
content,
current_section_start
))
# 开始新章节
current_section_start = sum(len(l) + 1 for l in lines[:i])
current_section_content = []
if chapter_match:
current_section_num = line.strip()
current_section_title = self._extract_title(line.strip())
else:
current_section_num = line.strip()
current_section_title = self._extract_title(line.strip())
current_section_content.append(line)
# 保存最后一个章节
if current_section_content:
content = '\n'.join(current_section_content)
sections.append((
current_section_num,
current_section_title,
content,
current_section_start
))
# 如果没有检测到章节,将整个文档作为一个大章节
if not sections:
sections.append((
"",
"全文",
markdown_text,
0
))
return sections
def _split_by_clauses(
self,
section_content: str,
section_num: str,
section_title: str,
section_start: int
) -> List[Tuple[str, str, str, int, int]]:
"""
在章节内按条款分割
Returns:
List of (content, clause_number, clause_title, start_position, end_position)
"""
clauses = []
lines = section_content.split('\n')
current_clause_num = ""
current_clause_title = ""
current_clause_content = []
current_clause_start = section_start
for i, line in enumerate(lines):
# 检测条款标题
clause_match = self.CLAUSE_PATTERN.match(line.strip())
if clause_match:
# 保存上一个条款
if current_clause_content:
content = '\n'.join(current_clause_content)
end_pos = current_clause_start + len(content)
clauses.append((
content,
current_clause_num,
current_clause_title,
current_clause_start,
end_pos
))
# 开始新条款
current_clause_start = section_start + sum(len(l) + 1 for l in lines[:i])
current_clause_content = []
current_clause_num = self._extract_clause_number(line.strip())
current_clause_title = line.strip()
current_clause_content.append(line)
# 保存最后一个条款
if current_clause_content:
content = '\n'.join(current_clause_content)
end_pos = current_clause_start + len(content)
clauses.append((
content,
current_clause_num,
current_clause_title,
current_clause_start,
end_pos
))
# 如果没有检测到条款,将整个章节作为一个条款
if not clauses:
clauses.append((
section_content,
"",
section_title,
section_start,
section_start + len(section_content)
))
return clauses
def _split_long_clause(
self,
content: str,
clause_num: str,
clause_title: str
) -> List[Tuple[str, int, int]]:
"""
分割过长的条款内容
按条款子项或段落分割,保持语义完整性
"""
sub_chunks = []
lines = content.split('\n')
# 检测是否有子项结构
has_sub_items = any(
self.SUB_ITEM_PATTERN.match(line.strip()) or
self.NUMBER_ITEM_PATTERN.match(line.strip())
for line in lines
)
if has_sub_items:
# 按子项分割
current_sub_content = []
current_sub_start = 0
for i, line in enumerate(lines):
is_sub_item = (
self.SUB_ITEM_PATTERN.match(line.strip()) or
self.NUMBER_ITEM_PATTERN.match(line.strip())
)
if is_sub_item and current_sub_content:
sub_content = '\n'.join(current_sub_content)
sub_end = current_sub_start + len(sub_content)
if len(sub_content) >= self.min_chunk_size:
sub_chunks.append((sub_content, current_sub_start, sub_end))
current_sub_content = []
current_sub_start = sum(len(l) + 1 for l in lines[:i])
current_sub_content.append(line)
# 保存最后一个子项
if current_sub_content:
sub_content = '\n'.join(current_sub_content)
sub_end = current_sub_start + len(sub_content)
sub_chunks.append((sub_content, current_sub_start, sub_end))
else:
# 按段落分割(滑动窗口)
paragraphs = []
current_para = []
for line in lines:
if line.strip():
current_para.append(line)
else:
if current_para:
paragraphs.append('\n'.join(current_para))
current_para = []
if current_para:
paragraphs.append('\n'.join(current_para))
# 合并段落直到达到chunk_size
current_chunk = []
current_length = 0
chunk_start = 0
for para in paragraphs:
if current_length + len(para) > self.chunk_size and current_chunk:
chunk_content = '\n'.join(current_chunk)
chunk_end = chunk_start + len(chunk_content)
sub_chunks.append((chunk_content, chunk_start, chunk_end))
current_chunk = []
current_length = 0
chunk_start = chunk_end
current_chunk.append(para)
current_length += len(para)
# 保存最后一个chunk
if current_chunk:
chunk_content = '\n'.join(current_chunk)
chunk_end = chunk_start + len(chunk_content)
sub_chunks.append((chunk_content, chunk_start, chunk_end))
return sub_chunks
def _extract_title(self, header_line: str) -> str:
"""从标题行提取标题内容"""
# 移除"第X章"、"第X节"前缀
title = re.sub(r'^第[一二三四五六七八九十百]+[章节]\s+', '', header_line)
return title.strip()
def _extract_clause_number(self, clause_line: str) -> str:
"""从条款行提取条款编号"""
match = self.CLAUSE_PATTERN.match(clause_line)
if match:
return match.group(0).strip()
return ""
def _create_chunk(
self,
content: str,
doc_id: str,
doc_name: str,
section_num: str,
section_title: str,
clause_num: str,
start_pos: int,
end_pos: int,
regulation_type: str,
version: str
) -> TextChunk:
"""创建文本分块"""
# 清理内容
content = content.strip()
# 计算估算token数中文约1.5字符/token
token_count = int(len(content) * 0.7) # 简化估算
# 生成chunk_id
chunk_id = f"{doc_id}_{section_num}_{clause_num}_{start_pos}"
metadata = ChunkMetadata(
doc_id=doc_id,
doc_name=doc_name,
chunk_id=chunk_id,
section_number=section_num,
section_title=section_title,
clause_number=clause_num,
start_position=start_pos,
end_position=end_pos,
regulation_type=regulation_type,
version=version
)
return TextChunk(
content=content,
metadata=metadata,
token_count=token_count
)
def chunk_regulation_document(
markdown_text: str,
doc_id: str = "",
doc_name: str = "",
regulation_type: str = "",
version: str = "",
chunk_size: int = 512
) -> List[TextChunk]:
"""便捷函数:对法规文档进行分块"""
chunker = RegulationChunker(chunk_size=chunk_size)
return chunker.chunk_document(
markdown_text,
doc_id,
doc_name,
regulation_type,
version
)