Files

334 lines
12 KiB
Python
Raw Permalink Normal View History

2025-09-26 17:15:54 +08:00
import shutil
from dataclasses import fields
import json
import os
import logging
from datetime import datetime
from decimal import Decimal
import random
from typing import Any, List, Optional, Union
import string
from PIL import Image
import tiktoken
from PIL.Image import Resampling
from entity_models import Document, FigureFlat
class TokenEstimator(object):
GPT2_TOKENIZER = tiktoken.get_encoding("gpt2")
def estimate_tokens(self, text: str) -> int:
return len(self.GPT2_TOKENIZER.encode(text, allowed_special="all"))
def construct_tokens_with_size(self, tokens: str, numofTokens: int) -> str:
newTokens = self.GPT2_TOKENIZER.decode(
self.GPT2_TOKENIZER.encode(tokens, allowed_special="all")[:numofTokens]
)
return newTokens
TOKEN_ESTIMATOR = TokenEstimator()
def generate_random_name(length:int=12):
# Characters to use: letters and digits
characters = string.ascii_letters + string.digits
# Randomly select `length` characters
folder_name = ''.join(random.choices(characters, k=length))
return folder_name
def asdict_with_dynamic(obj:Any) -> dict[str, Any]:
"""Returns a dictionary containing dynamic attributes"""
# Use predefined fields as the basis
result = {f.name: getattr(obj, f.name) for f in fields(obj)}
# Add dynamic attributes
all_attrs = dir(obj)
predefined_attrs = [f.name for f in fields(obj)]
for attr in all_attrs:
# Skip special attributes, private attributes, methods, and predefined attributes
if (
not attr.startswith("__")
and not callable(getattr(obj, attr))
and attr not in predefined_attrs
):
result[attr] = getattr(obj, attr)
return result
def write_log(message: str):
"""Write log message (INFO level) to data_preparation logger."""
logging.getLogger("data_preparation").info(msg=message)
def init_current_data_directory(base_path:str) -> str:
"""Initialize the current data directory and return its path."""
folder_name = generate_random_name(10)
if base_path == "":
base_path = os.path.expanduser("~")
# Create the directory path
local_data_folder = os.path.join(base_path , "doc-extractor", folder_name)
os.makedirs(local_data_folder, exist_ok=True)
return local_data_folder
def write_content(content: str, directory_path: str, file_name: str):
"""Write merged content to a markdown file in the .extracted directory, and optionally upload to blob storage."""
output_folder = directory_path + "/.extracted/" + file_name
os.makedirs(f"{output_folder}", exist_ok=True)
with open(f"{output_folder}/_merged.md", "w", encoding="utf-8") as file:
file.write(content)
print(f"Merged Saved: {output_folder}/_merged.md")
def write_object(obj: Any, directory_path: str, file_name: str):
"""Write a dictionary to a JSON file in the specified directory."""
output_folder = directory_path + "/.extracted/" + file_name
os.makedirs(f"{output_folder}", exist_ok=True)
with open(f"{output_folder}/_merged.json", "w", encoding="utf-8") as file:
json.dump(obj, file, indent=4, ensure_ascii=False, default=custom_serializer)
print(f"Dict Saved: {output_folder}/_merged.json")
def write_document(documents: list[Document], file_path: str, directory_path: str, rel_file_path: str):
"""Write the parsed document list to a JSON file in the specified directory."""
chunks_save = []
for chunk_idx, chunk_doc in enumerate(documents):
chunk_doc.filepath = rel_file_path
chunk_doc.metadata = json.dumps({"chunk_id": str(chunk_idx)})
chunk_doc.image_mapping = json.dumps(chunk_doc.image_mapping) if chunk_doc.image_mapping else None
chunks_save.append(asdict_with_dynamic(chunk_doc))
output_folder = directory_path + "/.chunked"
os.makedirs(f"{output_folder}", exist_ok=True)
with open(f"{output_folder}/{rel_file_path}.json", "w", encoding="utf-8") as file:
file.write(json.dumps(chunks_save, indent=4, ensure_ascii=False))
print(f"Processed {file_path} to {len(documents)} chunks. Document Schema: {documents[0].document_schema}")
print(f"Saved Result: {output_folder}/{rel_file_path}.json")
# Custom serializer function
def custom_serializer(obj:Any)->Any:
"""Handle types that cannot be serialized by JSON"""
if isinstance(obj, datetime):
return obj.isoformat() # Convert to ISO 8601 string
elif isinstance(obj, Decimal):
return float(obj) # Decimal to float
elif hasattr(obj, '__dict__'):
return obj.__dict__ # Class object to dict
else:
raise TypeError(f"Type {type(obj)} cannot be JSON serialized")
def keep_latest(data_list: list[dict[str,Any]] , id_key:str, timestamp_key:Optional[str]='')->list[dict[str,Any]]:
"""
Advanced method to keep the latest records
Args:
data_list: List of dictionaries containing records
id_key: Key to identify the entity
timestamp_key: Timestamp key (optional, if not provided, keep the last occurrence)
Returns:
List of the latest records for each entity
"""
latest_dict = {}
for idx, record in enumerate(data_list):
entity_id = record[id_key]
# If no timestamp, keep the last occurrence by position
if timestamp_key is None or timestamp_key not in record:
# Record index to handle same id cases
latest_dict[entity_id] = (idx, record)
continue
current_time = record[timestamp_key]
# If the current record is newer, update
if entity_id not in latest_dict or current_time > latest_dict[entity_id][1][timestamp_key]:
latest_dict[entity_id] = (idx, record)
# Sort by original position (optional)
return [record for _, record in sorted(latest_dict.values(), key=lambda x: x[0])]
def max_datetime_safe(
dt1: Union[datetime, None],
dt2: Union[datetime, None]
) -> Union[datetime, None]:
"""
Safely get the maximum of two datetimes, handling None values
Args:
dt1: First datetime (may be None)
dt2: Second datetime (may be None)
Returns:
The maximum datetime, or None if both are None
"""
if dt1 is None:
return dt2
if dt2 is None:
return dt1
return max(dt1, dt2)
def min_datetime_safe(
dt1: Union[datetime, None],
dt2: Union[datetime, None]
) -> Union[datetime, None]:
"""
Safely get the minimum of two datetimes, handling None values
Rules:
- Both datetimes are None return None
- One datetime is None return the other
- Both datetimes are not None return the smaller one
Args:
dt1: First datetime (may be None)
dt2: Second datetime (may be None)
Returns:
The minimum datetime, or None if both are None
"""
if dt1 is None:
return dt2
if dt2 is None:
return dt1
return min(dt1, dt2)
def write_json_to_file(data: list[dict], filename: str):
"""Write data to a JSON file."""
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w", encoding="utf-8") as file:
json.dump(data, file, indent=4, ensure_ascii=False, default=custom_serializer)
print(f"JSON file saved: {filename}")
def write_grouped_index_files(to_upload_dicts: list[dict[str,Any]],index_name:str, base_directory: str = ""):
"""
Write to the corresponding json file in the .index directory, grouped by the filepath field in to_upload_dicts
Args:
to_upload_dicts: List of dictionaries to upload
base_directory: Basic directory path
"""
if not to_upload_dicts:
print("No data to write.")
return
# Group by filepath field
grouped_data = {}
for item in to_upload_dicts:
filepath = item.get("filepath", "unknown")
if filepath not in grouped_data:
grouped_data[filepath] = []
grouped_data[filepath].append(item)
# Create .index directory
index_dir = os.path.join(base_directory, ".index")
os.makedirs(index_dir, exist_ok=True)
# Create corresponding json files for each filepath
for filepath, items in grouped_data.items():
# Convert filepath to a safe filename
safe_filename = filepath.replace("/", "_").replace("\\", "_").replace(":", "_")
if safe_filename.endswith(".pdf"):
safe_filename = safe_filename[:-4] # Remove .pdf extension
json_filename = f"{safe_filename}.{index_name}.json"
json_filepath = os.path.join(index_dir, json_filename)
# Write JSON file
with open(json_filepath, "w", encoding="utf-8") as file:
json.dump(items, file, indent=4, ensure_ascii=False, default=custom_serializer)
print(f"Grouped index file saved: {json_filepath} (contains {len(items)} items)")
print(f"Total {len(grouped_data)} files written to .index directory")
def replace_urls_in_content(content:str, replacements: List[FigureFlat])->str:
"""
Insert URLs from the replacement list into the specified positions in the content
:param content: Original text content
:param replacements: Replacement list, each element contains:
- 'url': Image URL
- 'offset': Offset in the original content
- 'length': Length of the text to be replaced
:return: New content with replacements
"""
if not replacements:
return content
# Sort by offset in descending order (process in reverse order)
sorted_replacements = sorted(replacements, key=lambda x: x.offset, reverse=True)
# List to store text fragments
fragments = []
current_index = len(content) # Current position (start from the end)
for item in sorted_replacements:
url = f"![{item.content}]({item.url})"
offset = item.offset
length = item.length
# Check offset validity
if offset >= current_index:
continue # Skip invalid offset
# Calculate actual end position for replacement
end_pos = min(offset + length, current_index)
# 1. Add text between current position and end of replacement
fragments.append(content[end_pos:current_index])
# 2. Add URL (replace original content)
fragments.append(url)
# Update current position to start of replacement
current_index = offset
# Add remaining head content
fragments.append(content[:current_index])
# Concatenate fragments in reverse order (since processed backwards)
return ''.join(fragments[::-1])
def resize_image(input_path:str, output_path:str=None, max_size:int=10000)->str:
"""Scaling PNG pictures in an equal ratio to ensure that the length and width do not exceed max_size pixels"""
with Image.open(input_path) as img:
# Calculate the scaling ratio
ratio = min(max_size / max(img.size), 1.0)
if ratio >= 1: # No scaling required
return input_path
# Calculate new dimensions (maintain aspect ratio)
new_size = tuple(round(dim * ratio) for dim in img.size)
# Using high-quality scaling algorithm
resized_img = img.resize(new_size, Resampling.LANCZOS)
# Process the output path
if not output_path:
filename, ext = os.path.splitext(input_path)
output_path = f"{filename}_resized{ext}"
# Save the zoomed image (preserve PNG features)
resized_img.save(output_path, format="PNG", optimize=True)
print(f"Images have been scaled:{img.size}{new_size} | Save to: {output_path}")
return output_path
def file_rename(input_path:str)->str:
filename, ext = os.path.splitext(input_path)
if ext.lower() == ".doc":
new_path = f"{filename}.docx"
shutil.copy2(input_path, new_path)
print("file renamed to ", new_path)
return new_path
return input_path