import json import os from os import makedirs import re import time from typing import Any, List from langchain_text_splitters import MarkdownHeaderTextSplitter, MarkdownTextSplitter, RecursiveCharacterTextSplitter from entity_models import Document, ChunkingResult from hierarchy_fix import HierarchyFixer from third_level_service import get_recommended_hash_count_simple from utils import TOKEN_ESTIMATOR, custom_serializer # Compile once for efficiency _specific_comments = re.compile( r""" # closing """, flags=re.VERBOSE ) def remove_specific_comments(text: str) -> str: return _specific_comments.sub('', text) def infer_level_from_number(): pass def chunk_docs_by_section(extracted_doc: Document, num_tokens:int, token_overlap:int,tmp_path:str) -> List[Document]: headers_to_split_on = [ ("#", "h1"), ("##", "h2"), ("###", "h3"), ("####", "h4"), ("#####", "h5"), ("######", "h6") ] filepath:str = extracted_doc.filepath if extracted_doc.filepath else "" extracted_content:str = extracted_doc.content or "" merged_content:str = extracted_content if os.getenv("header_fix","false").lower() == "true": #merge content of all extracted_docs into one string fixer = HierarchyFixer() fix_result:dict[str,Any] = fixer.fix_hierarchy(content=extracted_content) # If a fix exists, the fix report is saved by file merged_content = fix_result["fixed_content"] makedirs(tmp_path + f"/.extracted/{filepath}", exist_ok=True) if tmp_path and fix_result["fixes_applied"] > 0: with open(tmp_path + f"/.extracted/{filepath}/hierarchy_fix_log.json", "a", encoding="utf-8") as log_file: json.dump(fix_result, log_file, default=custom_serializer, ensure_ascii=False) # Dynamically get the number of # for level 3 headers third_level_counts:int = get_recommended_hash_count_simple(merged_content)['recommendation'] headers_to_split_on = [( "#" * i, f"h{i}") for i in range(1, third_level_counts + 1)] with open(tmp_path + f"/.extracted/{filepath}/get_recommended_hash_count.txt", "a", encoding="utf-8") as md_file: md_file.write(str(headers_to_split_on)) with open(tmp_path + f"/.extracted/{filepath}/new_merged_hierarchy.md", "a", encoding="utf-8") as md_file: md_file.write(merged_content) # MD splits markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False ) md_header_splits = markdown_splitter.split_text(merged_content) chunk_size = num_tokens chunk_overlap = token_overlap text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) splits = text_splitter.split_documents(md_header_splits) pre_document = extracted_doc chunked_docs: List[Document] = [] for i, split in enumerate(splits): if TOKEN_ESTIMATOR.estimate_tokens(split.page_content) < num_tokens * 1.5: chunked_doc = Document( document_schema=pre_document.document_schema, main_title=pre_document.main_title, sub_title=pre_document.sub_title, publisher=pre_document.publisher, document_code=pre_document.document_code, document_category=pre_document.document_category, main_title_sec_language=pre_document.main_title_sec_language, sub_title_sec_language=pre_document.sub_title_sec_language, primary_language=pre_document.primary_language, secondary_language=pre_document.secondary_language, title=pre_document.title, doc_metadata=pre_document.doc_metadata, filepath=pre_document.filepath, ) chunked_doc.copy_dynamic_attrs(pre_document) chunked_doc.content = split.page_content chunked_doc.h1 = split.metadata.get("h1", "") chunked_doc.h2 = split.metadata.get("h2", "") chunked_doc.h3 = split.metadata.get("h3", "") chunked_doc.h4 = split.metadata.get("h4", "") chunked_doc.h5 = split.metadata.get("h5", "") chunked_doc.h6 = split.metadata.get("h6", "") chunked_doc.h7 = split.metadata.get("h7", "") # chunked_doc.h4 =split.metadata.get("h4", "") chunked_doc.full_headers = "||".join(h for h in [chunked_doc.h6, chunked_doc.h5, chunked_doc.h4, chunked_doc.h3, chunked_doc.h2, chunked_doc.h1] if h) chunked_doc.id = chunked_doc.filepath + f"_{i}" chunked_docs.append(chunked_doc) else: splitter = MarkdownTextSplitter.from_tiktoken_encoder( chunk_size=num_tokens, chunk_overlap=token_overlap) chunked_content_list = splitter.split_text( split.page_content) # chunk the original content for j, chunked_content in enumerate(chunked_content_list): chunked_doc = Document( document_schema=pre_document.document_schema, main_title=pre_document.main_title, sub_title=pre_document.sub_title, publisher=pre_document.publisher, document_code=pre_document.document_code, document_category=pre_document.document_category, main_title_sec_language=pre_document.main_title_sec_language, sub_title_sec_language=pre_document.sub_title_sec_language, primary_language=pre_document.primary_language, secondary_language=pre_document.secondary_language, title=pre_document.title, doc_metadata=pre_document.doc_metadata, filepath=pre_document.filepath ) chunked_doc.copy_dynamic_attrs(pre_document) chunked_doc.content = chunked_content chunked_doc.h1 = split.metadata.get("h1", "") chunked_doc.h2 = split.metadata.get("h2", "") chunked_doc.h3 = split.metadata.get("h3", "") chunked_doc.h4 = split.metadata.get("h4", "") chunked_doc.h5 = split.metadata.get("h5", "") chunked_doc.h6 = split.metadata.get("h6", "") chunked_doc.h7 = split.metadata.get("h7", "") chunked_doc.full_headers = "||".join(h for h in [chunked_doc.h6, chunked_doc.h5, chunked_doc.h4, chunked_doc.h3, chunked_doc.h2, chunked_doc.h1] if h) chunked_doc.id = chunked_doc.filepath + f"_{i}_{j}" chunked_docs.append(chunked_doc) return chunked_docs def chunk_di_doc(extracted_doc: Document, data_config: dict[str, Any], tmp_path: str) -> ChunkingResult: """ Chunk the document. Args: extracted_doc: The document object to be processed. data_config: Processing configuration. Returns: ChunkingResult: The result containing the list of chunks and total files. """ num_tokens:int = data_config["chunk_size"] if "chunk_size" in data_config else 1024 token_overlap:int = data_config["token_overlap"] if "token_overlap" in data_config else 128 print({"index_name":extracted_doc.filepath , "num_tokens": num_tokens, "token_overlap": token_overlap}) extracted_doc.content = remove_specific_comments(text=extracted_doc.content or "") chunked_docs: List[Document] = chunk_docs_by_section(extracted_doc= extracted_doc,num_tokens=num_tokens, token_overlap=token_overlap,tmp_path=tmp_path) time.sleep(0.1) return ChunkingResult(chunks=chunked_docs, total_files=1)