import json import os import time from datetime import datetime from typing import Any from azure.storage.blob import ContainerClient, BlobProperties from utils import custom_serializer, keep_latest def check_files(blob_url:str, doc_time:datetime|None) -> list[dict[str, Any]]: # If blob, get blob properties; if local file, get system modification time container_client = ContainerClient.from_container_url(blob_url) updated_files: list[dict[str, Any]] = [] blobs: list[BlobProperties] = list(container_client.list_blobs()) # Sort by modification time ascending blobs_by_last_modified = sorted(blobs, key=lambda b: b.last_modified) #datetime.fromisoformat() for blob in blobs_by_last_modified: if blob.name.endswith('.doc_metadata.json'): continue else: last_modified: datetime = blob.last_modified.replace(tzinfo=None) #datetime.fromisoformat(blob.last_modified) name = blob.name if doc_time is None or last_modified > doc_time: updated_files.append({"name": name, "doc_upper_time": last_modified}) return updated_files def load_metadata(blob_url:str, directory_path: str, data_directory: str) -> list[Any]: """Download .doc_metadata.json file from blob_url and return the parsed metadata list.""" downloadToLocalFolder(blob_url, data_directory, directory_path, ".doc_metadata.json") if not os.path.exists(f"{directory_path}/.doc_metadata.json"): return [] #raise FileNotFoundError(f"Metadata file not found in {directory_path}") with open(f"{directory_path}/.doc_metadata.json", "rb") as doc_metadata_file: doc_metadata = json.load(doc_metadata_file) sorted_list = sorted(doc_metadata["doc_metadata"], key=lambda x: x["timestamp"], reverse=True) # For testing: replace '-' with '_' in keys [dic.update({k.replace("-", "_"): dic.pop(k)}) for dic in sorted_list for k in list(dic.keys()) if "-" in k] return sorted_list def check_meta(blob_url:str, meta_upper_time:Any, current_tmp_directory: str, data_dir: str) -> list[dict[Any,Any]]: """Check .doc_metadata.json records under blob_url and compare with processed meta_upper_time, return updated metadata list.""" sorted_list = load_metadata(blob_url, current_tmp_directory, data_directory=data_dir) filter_list = filter(lambda x: meta_upper_time is None or datetime.fromisoformat(x["timestamp"]).replace(tzinfo=None) > meta_upper_time, sorted_list) updated_metas: list[dict[str,Any]] = [] for item in filter_list: # Parse string to datetime object dt = datetime.fromisoformat(item["timestamp"]).replace(tzinfo=None) # Keep the latest meta_upper_time data updated_metas.append({"name": item["filepath"], "meta_upper_time": dt}) return keep_latest(updated_metas, "name", "meta_upper_time") def downloadToLocalFolder(blob_url:str, data_dir:str, local_folder: str, name_starts_with:str) -> list[str]: """Check if .doc_metadata.json exists in the directory, download if not.""" # If local_folder is empty, use temp directory if os.path.exists(f"{local_folder}/{name_starts_with}"): return [] path = data_dir if path and not path.endswith('/'): path = path + '/' container_client = ContainerClient.from_container_url(blob_url) last_destination_folder = None destination_paths: list[str] = [] for blob in container_client.list_blobs(name_starts_with=name_starts_with): relative_path = blob.name[len(path):] destination_path = os.path.join(local_folder, relative_path) destination_folder = os.path.dirname(destination_path) if destination_folder != last_destination_folder: os.makedirs(destination_folder, exist_ok=True) last_destination_folder = destination_folder blob_client = container_client.get_blob_client(blob.name) with open(file=destination_path, mode='wb') as local_file: stream = blob_client.download_blob() local_file.write(stream.readall()) destination_paths.append(destination_path) return destination_paths def blob_upload_content(blob_sas_url: str, file_name: str, content: str, retry_count: int = 3) -> str: for i in range(retry_count): try: # Upload file to Azure blob container_client: ContainerClient = ContainerClient.from_container_url(blob_sas_url) container_client.upload_blob(name=file_name, data=content, overwrite=True) # type: ignore return f"{blob_sas_url}/{file_name}" except Exception as e: print(f"Error uploading content for {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left") time.sleep(5) raise Exception(f"Error uploading content for: {file_name}") def blob_upload_object(blob_sas_url: str, file_name: str, obj: Any, retry_count: int = 3) -> str: if not blob_sas_url: return '' content = json.dumps(obj, default=custom_serializer,ensure_ascii=False, indent=4) for i in range(retry_count): try: # Upload file to Azure blob container_client: ContainerClient = ContainerClient.from_container_url(blob_sas_url) container_client.upload_blob(name=file_name, data=content, overwrite=True) # type: ignore return f"{blob_sas_url}/{file_name}" except Exception as e: print(f"Error uploading content for {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left") time.sleep(5) raise Exception(f"Error uploading content for: {file_name}") def blob_exists(blob_sas_url: str, file_name: str) -> bool: """Check if a blob exists in the container.""" try: container_client = ContainerClient.from_container_url(blob_sas_url) blob_client = container_client.get_blob_client(file_name) return blob_client.exists() except Exception as e: print(f"Error checking existence of blob {file_name}: {e}") return False def load_content(blob_sas_url: str, file_name: str, retry_count: int = 3) -> str: """Download the file from blob storage.""" for i in range(retry_count): try: container_client = ContainerClient.from_container_url(blob_sas_url) blob_client = container_client.get_blob_client(file_name) # Download blob content as bytes and decode to string blob_data = blob_client.download_blob().readall() # type: ignore # Try to decode as UTF-8 first, fallback to other encodings if needed try: return blob_data.decode('utf-8') except UnicodeDecodeError: # Try other common encodings for encoding in ['gbk', 'latin-1', 'cp1252']: try: return blob_data.decode(encoding) except UnicodeDecodeError: continue # If all encodings fail, return with error replacement return blob_data.decode('utf-8', errors='replace') except Exception as e: print(f"Error loading content from {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left") if i < retry_count - 1: time.sleep(5) # If all retries fail, raise exception raise Exception(f"Error loading content from blob: {file_name} after {retry_count} retries")