Files

151 lines
7.4 KiB
Python
Raw Permalink Normal View History

2025-09-26 17:15:54 +08:00
import json
import os
import time
from datetime import datetime
from typing import Any
from azure.storage.blob import ContainerClient, BlobProperties
from utils import custom_serializer, keep_latest
def check_files(blob_url:str, doc_time:datetime|None) -> list[dict[str, Any]]:
# If blob, get blob properties; if local file, get system modification time
container_client = ContainerClient.from_container_url(blob_url)
updated_files: list[dict[str, Any]] = []
blobs: list[BlobProperties] = list(container_client.list_blobs())
# Sort by modification time ascending
blobs_by_last_modified = sorted(blobs, key=lambda b: b.last_modified) #datetime.fromisoformat()
for blob in blobs_by_last_modified:
if blob.name.endswith('.doc_metadata.json'):
continue
else:
last_modified: datetime = blob.last_modified.replace(tzinfo=None) #datetime.fromisoformat(blob.last_modified)
name = blob.name
if doc_time is None or last_modified > doc_time:
updated_files.append({"name": name, "doc_upper_time": last_modified})
return updated_files
def load_metadata(blob_url:str, directory_path: str, data_directory: str) -> list[Any]:
"""Download .doc_metadata.json file from blob_url and return the parsed metadata list."""
downloadToLocalFolder(blob_url, data_directory, directory_path, ".doc_metadata.json")
if not os.path.exists(f"{directory_path}/.doc_metadata.json"):
return []
#raise FileNotFoundError(f"Metadata file not found in {directory_path}")
with open(f"{directory_path}/.doc_metadata.json", "rb") as doc_metadata_file:
doc_metadata = json.load(doc_metadata_file)
sorted_list = sorted(doc_metadata["doc_metadata"], key=lambda x: x["timestamp"], reverse=True)
# For testing: replace '-' with '_' in keys
[dic.update({k.replace("-", "_"): dic.pop(k)}) for dic in sorted_list for k in list(dic.keys()) if "-" in k]
return sorted_list
def check_meta(blob_url:str, meta_upper_time:Any, current_tmp_directory: str, data_dir: str) -> list[dict[Any,Any]]:
"""Check .doc_metadata.json records under blob_url and compare with processed meta_upper_time, return updated metadata list."""
sorted_list = load_metadata(blob_url, current_tmp_directory, data_directory=data_dir)
filter_list = filter(lambda x: meta_upper_time is None or datetime.fromisoformat(x["timestamp"]).replace(tzinfo=None) > meta_upper_time, sorted_list)
updated_metas: list[dict[str,Any]] = []
for item in filter_list:
# Parse string to datetime object
dt = datetime.fromisoformat(item["timestamp"]).replace(tzinfo=None)
# Keep the latest meta_upper_time data
updated_metas.append({"name": item["filepath"], "meta_upper_time": dt})
return keep_latest(updated_metas, "name", "meta_upper_time")
def downloadToLocalFolder(blob_url:str, data_dir:str, local_folder: str, name_starts_with:str) -> list[str]:
"""Check if .doc_metadata.json exists in the directory, download if not."""
# If local_folder is empty, use temp directory
if os.path.exists(f"{local_folder}/{name_starts_with}"):
return []
path = data_dir
if path and not path.endswith('/'):
path = path + '/'
container_client = ContainerClient.from_container_url(blob_url)
last_destination_folder = None
destination_paths: list[str] = []
for blob in container_client.list_blobs(name_starts_with=name_starts_with):
relative_path = blob.name[len(path):]
destination_path = os.path.join(local_folder, relative_path)
destination_folder = os.path.dirname(destination_path)
if destination_folder != last_destination_folder:
os.makedirs(destination_folder, exist_ok=True)
last_destination_folder = destination_folder
blob_client = container_client.get_blob_client(blob.name)
with open(file=destination_path, mode='wb') as local_file:
stream = blob_client.download_blob()
local_file.write(stream.readall())
destination_paths.append(destination_path)
return destination_paths
def blob_upload_content(blob_sas_url: str, file_name: str, content: str, retry_count: int = 3) -> str:
for i in range(retry_count):
try:
# Upload file to Azure blob
container_client: ContainerClient = ContainerClient.from_container_url(blob_sas_url)
container_client.upload_blob(name=file_name, data=content, overwrite=True) # type: ignore
return f"{blob_sas_url}/{file_name}"
except Exception as e:
print(f"Error uploading content for {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left")
time.sleep(5)
raise Exception(f"Error uploading content for: {file_name}")
def blob_upload_object(blob_sas_url: str, file_name: str, obj: Any, retry_count: int = 3) -> str:
if not blob_sas_url:
return ''
content = json.dumps(obj, default=custom_serializer,ensure_ascii=False, indent=4)
for i in range(retry_count):
try:
# Upload file to Azure blob
container_client: ContainerClient = ContainerClient.from_container_url(blob_sas_url)
container_client.upload_blob(name=file_name, data=content, overwrite=True) # type: ignore
return f"{blob_sas_url}/{file_name}"
except Exception as e:
print(f"Error uploading content for {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left")
time.sleep(5)
raise Exception(f"Error uploading content for: {file_name}")
def blob_exists(blob_sas_url: str, file_name: str) -> bool:
"""Check if a blob exists in the container."""
try:
container_client = ContainerClient.from_container_url(blob_sas_url)
blob_client = container_client.get_blob_client(file_name)
return blob_client.exists()
except Exception as e:
print(f"Error checking existence of blob {file_name}: {e}")
return False
def load_content(blob_sas_url: str, file_name: str, retry_count: int = 3) -> str:
"""Download the file from blob storage."""
for i in range(retry_count):
try:
container_client = ContainerClient.from_container_url(blob_sas_url)
blob_client = container_client.get_blob_client(file_name)
# Download blob content as bytes and decode to string
blob_data = blob_client.download_blob().readall() # type: ignore
# Try to decode as UTF-8 first, fallback to other encodings if needed
try:
return blob_data.decode('utf-8')
except UnicodeDecodeError:
# Try other common encodings
for encoding in ['gbk', 'latin-1', 'cp1252']:
try:
return blob_data.decode(encoding)
except UnicodeDecodeError:
continue
# If all encodings fail, return with error replacement
return blob_data.decode('utf-8', errors='replace')
except Exception as e:
print(f"Error loading content from {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left")
if i < retry_count - 1:
time.sleep(5)
# If all retries fail, raise exception
raise Exception(f"Error loading content from blob: {file_name} after {retry_count} retries")