151 lines
7.4 KiB
Python
151 lines
7.4 KiB
Python
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
from datetime import datetime
|
||
|
|
from typing import Any
|
||
|
|
from azure.storage.blob import ContainerClient, BlobProperties
|
||
|
|
|
||
|
|
from utils import custom_serializer, keep_latest
|
||
|
|
|
||
|
|
|
||
|
|
def check_files(blob_url:str, doc_time:datetime|None) -> list[dict[str, Any]]:
|
||
|
|
# If blob, get blob properties; if local file, get system modification time
|
||
|
|
container_client = ContainerClient.from_container_url(blob_url)
|
||
|
|
updated_files: list[dict[str, Any]] = []
|
||
|
|
blobs: list[BlobProperties] = list(container_client.list_blobs())
|
||
|
|
# Sort by modification time ascending
|
||
|
|
blobs_by_last_modified = sorted(blobs, key=lambda b: b.last_modified) #datetime.fromisoformat()
|
||
|
|
|
||
|
|
for blob in blobs_by_last_modified:
|
||
|
|
if blob.name.endswith('.doc_metadata.json'):
|
||
|
|
continue
|
||
|
|
else:
|
||
|
|
last_modified: datetime = blob.last_modified.replace(tzinfo=None) #datetime.fromisoformat(blob.last_modified)
|
||
|
|
name = blob.name
|
||
|
|
|
||
|
|
if doc_time is None or last_modified > doc_time:
|
||
|
|
updated_files.append({"name": name, "doc_upper_time": last_modified})
|
||
|
|
|
||
|
|
return updated_files
|
||
|
|
|
||
|
|
def load_metadata(blob_url:str, directory_path: str, data_directory: str) -> list[Any]:
|
||
|
|
"""Download .doc_metadata.json file from blob_url and return the parsed metadata list."""
|
||
|
|
downloadToLocalFolder(blob_url, data_directory, directory_path, ".doc_metadata.json")
|
||
|
|
if not os.path.exists(f"{directory_path}/.doc_metadata.json"):
|
||
|
|
return []
|
||
|
|
#raise FileNotFoundError(f"Metadata file not found in {directory_path}")
|
||
|
|
|
||
|
|
with open(f"{directory_path}/.doc_metadata.json", "rb") as doc_metadata_file:
|
||
|
|
doc_metadata = json.load(doc_metadata_file)
|
||
|
|
sorted_list = sorted(doc_metadata["doc_metadata"], key=lambda x: x["timestamp"], reverse=True)
|
||
|
|
# For testing: replace '-' with '_' in keys
|
||
|
|
[dic.update({k.replace("-", "_"): dic.pop(k)}) for dic in sorted_list for k in list(dic.keys()) if "-" in k]
|
||
|
|
return sorted_list
|
||
|
|
|
||
|
|
def check_meta(blob_url:str, meta_upper_time:Any, current_tmp_directory: str, data_dir: str) -> list[dict[Any,Any]]:
|
||
|
|
"""Check .doc_metadata.json records under blob_url and compare with processed meta_upper_time, return updated metadata list."""
|
||
|
|
sorted_list = load_metadata(blob_url, current_tmp_directory, data_directory=data_dir)
|
||
|
|
filter_list = filter(lambda x: meta_upper_time is None or datetime.fromisoformat(x["timestamp"]).replace(tzinfo=None) > meta_upper_time, sorted_list)
|
||
|
|
updated_metas: list[dict[str,Any]] = []
|
||
|
|
for item in filter_list:
|
||
|
|
# Parse string to datetime object
|
||
|
|
dt = datetime.fromisoformat(item["timestamp"]).replace(tzinfo=None)
|
||
|
|
# Keep the latest meta_upper_time data
|
||
|
|
updated_metas.append({"name": item["filepath"], "meta_upper_time": dt})
|
||
|
|
return keep_latest(updated_metas, "name", "meta_upper_time")
|
||
|
|
|
||
|
|
def downloadToLocalFolder(blob_url:str, data_dir:str, local_folder: str, name_starts_with:str) -> list[str]:
|
||
|
|
"""Check if .doc_metadata.json exists in the directory, download if not."""
|
||
|
|
# If local_folder is empty, use temp directory
|
||
|
|
if os.path.exists(f"{local_folder}/{name_starts_with}"):
|
||
|
|
return []
|
||
|
|
path = data_dir
|
||
|
|
if path and not path.endswith('/'):
|
||
|
|
path = path + '/'
|
||
|
|
container_client = ContainerClient.from_container_url(blob_url)
|
||
|
|
last_destination_folder = None
|
||
|
|
destination_paths: list[str] = []
|
||
|
|
for blob in container_client.list_blobs(name_starts_with=name_starts_with):
|
||
|
|
relative_path = blob.name[len(path):]
|
||
|
|
destination_path = os.path.join(local_folder, relative_path)
|
||
|
|
destination_folder = os.path.dirname(destination_path)
|
||
|
|
if destination_folder != last_destination_folder:
|
||
|
|
os.makedirs(destination_folder, exist_ok=True)
|
||
|
|
last_destination_folder = destination_folder
|
||
|
|
blob_client = container_client.get_blob_client(blob.name)
|
||
|
|
with open(file=destination_path, mode='wb') as local_file:
|
||
|
|
stream = blob_client.download_blob()
|
||
|
|
local_file.write(stream.readall())
|
||
|
|
destination_paths.append(destination_path)
|
||
|
|
return destination_paths
|
||
|
|
|
||
|
|
def blob_upload_content(blob_sas_url: str, file_name: str, content: str, retry_count: int = 3) -> str:
|
||
|
|
for i in range(retry_count):
|
||
|
|
try:
|
||
|
|
# Upload file to Azure blob
|
||
|
|
container_client: ContainerClient = ContainerClient.from_container_url(blob_sas_url)
|
||
|
|
container_client.upload_blob(name=file_name, data=content, overwrite=True) # type: ignore
|
||
|
|
return f"{blob_sas_url}/{file_name}"
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error uploading content for {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left")
|
||
|
|
time.sleep(5)
|
||
|
|
raise Exception(f"Error uploading content for: {file_name}")
|
||
|
|
|
||
|
|
def blob_upload_object(blob_sas_url: str, file_name: str, obj: Any, retry_count: int = 3) -> str:
|
||
|
|
|
||
|
|
if not blob_sas_url:
|
||
|
|
return ''
|
||
|
|
|
||
|
|
content = json.dumps(obj, default=custom_serializer,ensure_ascii=False, indent=4)
|
||
|
|
|
||
|
|
for i in range(retry_count):
|
||
|
|
try:
|
||
|
|
# Upload file to Azure blob
|
||
|
|
container_client: ContainerClient = ContainerClient.from_container_url(blob_sas_url)
|
||
|
|
container_client.upload_blob(name=file_name, data=content, overwrite=True) # type: ignore
|
||
|
|
return f"{blob_sas_url}/{file_name}"
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error uploading content for {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left")
|
||
|
|
time.sleep(5)
|
||
|
|
raise Exception(f"Error uploading content for: {file_name}")
|
||
|
|
|
||
|
|
def blob_exists(blob_sas_url: str, file_name: str) -> bool:
|
||
|
|
"""Check if a blob exists in the container."""
|
||
|
|
try:
|
||
|
|
container_client = ContainerClient.from_container_url(blob_sas_url)
|
||
|
|
blob_client = container_client.get_blob_client(file_name)
|
||
|
|
return blob_client.exists()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error checking existence of blob {file_name}: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def load_content(blob_sas_url: str, file_name: str, retry_count: int = 3) -> str:
|
||
|
|
"""Download the file from blob storage."""
|
||
|
|
for i in range(retry_count):
|
||
|
|
try:
|
||
|
|
container_client = ContainerClient.from_container_url(blob_sas_url)
|
||
|
|
blob_client = container_client.get_blob_client(file_name)
|
||
|
|
# Download blob content as bytes and decode to string
|
||
|
|
blob_data = blob_client.download_blob().readall() # type: ignore
|
||
|
|
# Try to decode as UTF-8 first, fallback to other encodings if needed
|
||
|
|
try:
|
||
|
|
return blob_data.decode('utf-8')
|
||
|
|
except UnicodeDecodeError:
|
||
|
|
# Try other common encodings
|
||
|
|
for encoding in ['gbk', 'latin-1', 'cp1252']:
|
||
|
|
try:
|
||
|
|
return blob_data.decode(encoding)
|
||
|
|
except UnicodeDecodeError:
|
||
|
|
continue
|
||
|
|
# If all encodings fail, return with error replacement
|
||
|
|
return blob_data.decode('utf-8', errors='replace')
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error loading content from {file_name} with error={e}, retrying, currently at {i + 1} retry, {retry_count - (i + 1)} retries left")
|
||
|
|
if i < retry_count - 1:
|
||
|
|
time.sleep(5)
|
||
|
|
|
||
|
|
# If all retries fail, raise exception
|
||
|
|
raise Exception(f"Error loading content from blob: {file_name} after {retry_count} retries")
|
||
|
|
|