334 lines
12 KiB
Python
334 lines
12 KiB
Python
|
|
import shutil
|
||
|
|
from dataclasses import fields
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import logging
|
||
|
|
from datetime import datetime
|
||
|
|
from decimal import Decimal
|
||
|
|
import random
|
||
|
|
from typing import Any, List, Optional, Union
|
||
|
|
import string
|
||
|
|
from PIL import Image
|
||
|
|
import tiktoken
|
||
|
|
from PIL.Image import Resampling
|
||
|
|
|
||
|
|
from entity_models import Document, FigureFlat
|
||
|
|
|
||
|
|
|
||
|
|
class TokenEstimator(object):
|
||
|
|
GPT2_TOKENIZER = tiktoken.get_encoding("gpt2")
|
||
|
|
|
||
|
|
def estimate_tokens(self, text: str) -> int:
|
||
|
|
|
||
|
|
return len(self.GPT2_TOKENIZER.encode(text, allowed_special="all"))
|
||
|
|
|
||
|
|
def construct_tokens_with_size(self, tokens: str, numofTokens: int) -> str:
|
||
|
|
newTokens = self.GPT2_TOKENIZER.decode(
|
||
|
|
self.GPT2_TOKENIZER.encode(tokens, allowed_special="all")[:numofTokens]
|
||
|
|
)
|
||
|
|
return newTokens
|
||
|
|
|
||
|
|
TOKEN_ESTIMATOR = TokenEstimator()
|
||
|
|
|
||
|
|
|
||
|
|
def generate_random_name(length:int=12):
|
||
|
|
# Characters to use: letters and digits
|
||
|
|
characters = string.ascii_letters + string.digits
|
||
|
|
# Randomly select `length` characters
|
||
|
|
folder_name = ''.join(random.choices(characters, k=length))
|
||
|
|
return folder_name
|
||
|
|
|
||
|
|
def asdict_with_dynamic(obj:Any) -> dict[str, Any]:
|
||
|
|
"""Returns a dictionary containing dynamic attributes"""
|
||
|
|
# Use predefined fields as the basis
|
||
|
|
result = {f.name: getattr(obj, f.name) for f in fields(obj)}
|
||
|
|
# Add dynamic attributes
|
||
|
|
all_attrs = dir(obj)
|
||
|
|
predefined_attrs = [f.name for f in fields(obj)]
|
||
|
|
for attr in all_attrs:
|
||
|
|
# Skip special attributes, private attributes, methods, and predefined attributes
|
||
|
|
if (
|
||
|
|
not attr.startswith("__")
|
||
|
|
and not callable(getattr(obj, attr))
|
||
|
|
and attr not in predefined_attrs
|
||
|
|
):
|
||
|
|
result[attr] = getattr(obj, attr)
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def write_log(message: str):
|
||
|
|
"""Write log message (INFO level) to data_preparation logger."""
|
||
|
|
logging.getLogger("data_preparation").info(msg=message)
|
||
|
|
|
||
|
|
def init_current_data_directory(base_path:str) -> str:
|
||
|
|
"""Initialize the current data directory and return its path."""
|
||
|
|
folder_name = generate_random_name(10)
|
||
|
|
if base_path == "":
|
||
|
|
base_path = os.path.expanduser("~")
|
||
|
|
# Create the directory path
|
||
|
|
local_data_folder = os.path.join(base_path , "doc-extractor", folder_name)
|
||
|
|
os.makedirs(local_data_folder, exist_ok=True)
|
||
|
|
return local_data_folder
|
||
|
|
|
||
|
|
def write_content(content: str, directory_path: str, file_name: str):
|
||
|
|
"""Write merged content to a markdown file in the .extracted directory, and optionally upload to blob storage."""
|
||
|
|
output_folder = directory_path + "/.extracted/" + file_name
|
||
|
|
os.makedirs(f"{output_folder}", exist_ok=True)
|
||
|
|
with open(f"{output_folder}/_merged.md", "w", encoding="utf-8") as file:
|
||
|
|
file.write(content)
|
||
|
|
|
||
|
|
print(f"Merged Saved: {output_folder}/_merged.md")
|
||
|
|
|
||
|
|
def write_object(obj: Any, directory_path: str, file_name: str):
|
||
|
|
"""Write a dictionary to a JSON file in the specified directory."""
|
||
|
|
output_folder = directory_path + "/.extracted/" + file_name
|
||
|
|
os.makedirs(f"{output_folder}", exist_ok=True)
|
||
|
|
with open(f"{output_folder}/_merged.json", "w", encoding="utf-8") as file:
|
||
|
|
json.dump(obj, file, indent=4, ensure_ascii=False, default=custom_serializer)
|
||
|
|
print(f"Dict Saved: {output_folder}/_merged.json")
|
||
|
|
|
||
|
|
def write_document(documents: list[Document], file_path: str, directory_path: str, rel_file_path: str):
|
||
|
|
"""Write the parsed document list to a JSON file in the specified directory."""
|
||
|
|
chunks_save = []
|
||
|
|
for chunk_idx, chunk_doc in enumerate(documents):
|
||
|
|
chunk_doc.filepath = rel_file_path
|
||
|
|
chunk_doc.metadata = json.dumps({"chunk_id": str(chunk_idx)})
|
||
|
|
chunk_doc.image_mapping = json.dumps(chunk_doc.image_mapping) if chunk_doc.image_mapping else None
|
||
|
|
chunks_save.append(asdict_with_dynamic(chunk_doc))
|
||
|
|
|
||
|
|
output_folder = directory_path + "/.chunked"
|
||
|
|
os.makedirs(f"{output_folder}", exist_ok=True)
|
||
|
|
with open(f"{output_folder}/{rel_file_path}.json", "w", encoding="utf-8") as file:
|
||
|
|
file.write(json.dumps(chunks_save, indent=4, ensure_ascii=False))
|
||
|
|
print(f"Processed {file_path} to {len(documents)} chunks. Document Schema: {documents[0].document_schema}")
|
||
|
|
print(f"Saved Result: {output_folder}/{rel_file_path}.json")
|
||
|
|
|
||
|
|
|
||
|
|
# Custom serializer function
|
||
|
|
def custom_serializer(obj:Any)->Any:
|
||
|
|
"""Handle types that cannot be serialized by JSON"""
|
||
|
|
if isinstance(obj, datetime):
|
||
|
|
return obj.isoformat() # Convert to ISO 8601 string
|
||
|
|
elif isinstance(obj, Decimal):
|
||
|
|
return float(obj) # Decimal to float
|
||
|
|
elif hasattr(obj, '__dict__'):
|
||
|
|
return obj.__dict__ # Class object to dict
|
||
|
|
else:
|
||
|
|
raise TypeError(f"Type {type(obj)} cannot be JSON serialized")
|
||
|
|
|
||
|
|
|
||
|
|
def keep_latest(data_list: list[dict[str,Any]] , id_key:str, timestamp_key:Optional[str]='')->list[dict[str,Any]]:
|
||
|
|
"""
|
||
|
|
Advanced method to keep the latest records
|
||
|
|
|
||
|
|
Args:
|
||
|
|
data_list: List of dictionaries containing records
|
||
|
|
id_key: Key to identify the entity
|
||
|
|
timestamp_key: Timestamp key (optional, if not provided, keep the last occurrence)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of the latest records for each entity
|
||
|
|
"""
|
||
|
|
latest_dict = {}
|
||
|
|
|
||
|
|
for idx, record in enumerate(data_list):
|
||
|
|
entity_id = record[id_key]
|
||
|
|
|
||
|
|
# If no timestamp, keep the last occurrence by position
|
||
|
|
if timestamp_key is None or timestamp_key not in record:
|
||
|
|
# Record index to handle same id cases
|
||
|
|
latest_dict[entity_id] = (idx, record)
|
||
|
|
continue
|
||
|
|
|
||
|
|
current_time = record[timestamp_key]
|
||
|
|
|
||
|
|
# If the current record is newer, update
|
||
|
|
if entity_id not in latest_dict or current_time > latest_dict[entity_id][1][timestamp_key]:
|
||
|
|
latest_dict[entity_id] = (idx, record)
|
||
|
|
|
||
|
|
# Sort by original position (optional)
|
||
|
|
return [record for _, record in sorted(latest_dict.values(), key=lambda x: x[0])]
|
||
|
|
|
||
|
|
|
||
|
|
def max_datetime_safe(
|
||
|
|
dt1: Union[datetime, None],
|
||
|
|
dt2: Union[datetime, None]
|
||
|
|
) -> Union[datetime, None]:
|
||
|
|
"""
|
||
|
|
Safely get the maximum of two datetimes, handling None values
|
||
|
|
|
||
|
|
Args:
|
||
|
|
dt1: First datetime (may be None)
|
||
|
|
dt2: Second datetime (may be None)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
The maximum datetime, or None if both are None
|
||
|
|
"""
|
||
|
|
if dt1 is None:
|
||
|
|
return dt2
|
||
|
|
if dt2 is None:
|
||
|
|
return dt1
|
||
|
|
return max(dt1, dt2)
|
||
|
|
|
||
|
|
|
||
|
|
def min_datetime_safe(
|
||
|
|
dt1: Union[datetime, None],
|
||
|
|
dt2: Union[datetime, None]
|
||
|
|
) -> Union[datetime, None]:
|
||
|
|
"""
|
||
|
|
Safely get the minimum of two datetimes, handling None values
|
||
|
|
|
||
|
|
Rules:
|
||
|
|
- Both datetimes are None → return None
|
||
|
|
- One datetime is None → return the other
|
||
|
|
- Both datetimes are not None → return the smaller one
|
||
|
|
|
||
|
|
Args:
|
||
|
|
dt1: First datetime (may be None)
|
||
|
|
dt2: Second datetime (may be None)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
The minimum datetime, or None if both are None
|
||
|
|
"""
|
||
|
|
if dt1 is None:
|
||
|
|
return dt2
|
||
|
|
if dt2 is None:
|
||
|
|
return dt1
|
||
|
|
return min(dt1, dt2)
|
||
|
|
|
||
|
|
|
||
|
|
def write_json_to_file(data: list[dict], filename: str):
|
||
|
|
"""Write data to a JSON file."""
|
||
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
||
|
|
with open(filename, "w", encoding="utf-8") as file:
|
||
|
|
json.dump(data, file, indent=4, ensure_ascii=False, default=custom_serializer)
|
||
|
|
print(f"JSON file saved: {filename}")
|
||
|
|
|
||
|
|
|
||
|
|
def write_grouped_index_files(to_upload_dicts: list[dict[str,Any]],index_name:str, base_directory: str = ""):
|
||
|
|
"""
|
||
|
|
Write to the corresponding json file in the .index directory, grouped by the filepath field in to_upload_dicts
|
||
|
|
|
||
|
|
Args:
|
||
|
|
to_upload_dicts: List of dictionaries to upload
|
||
|
|
base_directory: Basic directory path
|
||
|
|
"""
|
||
|
|
if not to_upload_dicts:
|
||
|
|
print("No data to write.")
|
||
|
|
return
|
||
|
|
|
||
|
|
# Group by filepath field
|
||
|
|
grouped_data = {}
|
||
|
|
for item in to_upload_dicts:
|
||
|
|
filepath = item.get("filepath", "unknown")
|
||
|
|
if filepath not in grouped_data:
|
||
|
|
grouped_data[filepath] = []
|
||
|
|
grouped_data[filepath].append(item)
|
||
|
|
|
||
|
|
# Create .index directory
|
||
|
|
index_dir = os.path.join(base_directory, ".index")
|
||
|
|
os.makedirs(index_dir, exist_ok=True)
|
||
|
|
|
||
|
|
# Create corresponding json files for each filepath
|
||
|
|
for filepath, items in grouped_data.items():
|
||
|
|
# Convert filepath to a safe filename
|
||
|
|
safe_filename = filepath.replace("/", "_").replace("\\", "_").replace(":", "_")
|
||
|
|
if safe_filename.endswith(".pdf"):
|
||
|
|
safe_filename = safe_filename[:-4] # Remove .pdf extension
|
||
|
|
|
||
|
|
json_filename = f"{safe_filename}.{index_name}.json"
|
||
|
|
json_filepath = os.path.join(index_dir, json_filename)
|
||
|
|
|
||
|
|
# Write JSON file
|
||
|
|
with open(json_filepath, "w", encoding="utf-8") as file:
|
||
|
|
json.dump(items, file, indent=4, ensure_ascii=False, default=custom_serializer)
|
||
|
|
|
||
|
|
print(f"Grouped index file saved: {json_filepath} (contains {len(items)} items)")
|
||
|
|
|
||
|
|
print(f"Total {len(grouped_data)} files written to .index directory")
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def replace_urls_in_content(content:str, replacements: List[FigureFlat])->str:
|
||
|
|
"""
|
||
|
|
Insert URLs from the replacement list into the specified positions in the content
|
||
|
|
|
||
|
|
:param content: Original text content
|
||
|
|
:param replacements: Replacement list, each element contains:
|
||
|
|
- 'url': Image URL
|
||
|
|
- 'offset': Offset in the original content
|
||
|
|
- 'length': Length of the text to be replaced
|
||
|
|
:return: New content with replacements
|
||
|
|
"""
|
||
|
|
if not replacements:
|
||
|
|
return content
|
||
|
|
|
||
|
|
# Sort by offset in descending order (process in reverse order)
|
||
|
|
sorted_replacements = sorted(replacements, key=lambda x: x.offset, reverse=True)
|
||
|
|
|
||
|
|
# List to store text fragments
|
||
|
|
fragments = []
|
||
|
|
current_index = len(content) # Current position (start from the end)
|
||
|
|
|
||
|
|
for item in sorted_replacements:
|
||
|
|
url = f""
|
||
|
|
offset = item.offset
|
||
|
|
length = item.length
|
||
|
|
|
||
|
|
# Check offset validity
|
||
|
|
if offset >= current_index:
|
||
|
|
continue # Skip invalid offset
|
||
|
|
|
||
|
|
# Calculate actual end position for replacement
|
||
|
|
end_pos = min(offset + length, current_index)
|
||
|
|
|
||
|
|
# 1. Add text between current position and end of replacement
|
||
|
|
fragments.append(content[end_pos:current_index])
|
||
|
|
|
||
|
|
# 2. Add URL (replace original content)
|
||
|
|
fragments.append(url)
|
||
|
|
|
||
|
|
# Update current position to start of replacement
|
||
|
|
current_index = offset
|
||
|
|
|
||
|
|
# Add remaining head content
|
||
|
|
fragments.append(content[:current_index])
|
||
|
|
|
||
|
|
# Concatenate fragments in reverse order (since processed backwards)
|
||
|
|
return ''.join(fragments[::-1])
|
||
|
|
|
||
|
|
|
||
|
|
def resize_image(input_path:str, output_path:str=None, max_size:int=10000)->str:
|
||
|
|
"""Scaling PNG pictures in an equal ratio to ensure that the length and width do not exceed max_size pixels"""
|
||
|
|
with Image.open(input_path) as img:
|
||
|
|
# Calculate the scaling ratio
|
||
|
|
ratio = min(max_size / max(img.size), 1.0)
|
||
|
|
|
||
|
|
if ratio >= 1: # No scaling required
|
||
|
|
return input_path
|
||
|
|
|
||
|
|
# Calculate new dimensions (maintain aspect ratio)
|
||
|
|
new_size = tuple(round(dim * ratio) for dim in img.size)
|
||
|
|
|
||
|
|
# Using high-quality scaling algorithm
|
||
|
|
resized_img = img.resize(new_size, Resampling.LANCZOS)
|
||
|
|
|
||
|
|
# Process the output path
|
||
|
|
if not output_path:
|
||
|
|
filename, ext = os.path.splitext(input_path)
|
||
|
|
output_path = f"{filename}_resized{ext}"
|
||
|
|
|
||
|
|
# Save the zoomed image (preserve PNG features)
|
||
|
|
resized_img.save(output_path, format="PNG", optimize=True)
|
||
|
|
print(f"Images have been scaled:{img.size} → {new_size} | Save to: {output_path}")
|
||
|
|
return output_path
|
||
|
|
|
||
|
|
def file_rename(input_path:str)->str:
|
||
|
|
filename, ext = os.path.splitext(input_path)
|
||
|
|
if ext.lower() == ".doc":
|
||
|
|
new_path = f"{filename}.docx"
|
||
|
|
shutil.copy2(input_path, new_path)
|
||
|
|
print("file renamed to ", new_path)
|
||
|
|
return new_path
|
||
|
|
return input_path
|