Files
catonline_ai/vw-document-ai-indexer/azure_index_service.py
2025-09-26 17:15:54 +08:00

752 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Azure AI index search service
Provides operations for Azure AI Search Index, including creating indexes, uploading documents, checking if an index exists, etc.
"""
import base64
import json
import logging
import os
import time
import uuid
from dataclasses import fields
from typing import List, Dict, Any, Optional
from tqdm import tqdm
import uuid6
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.search.documents import SearchClient, IndexDocumentsBatch
from azure.search.documents._generated.models import IndexingResult
from azure.search.documents.indexes.models import SearchIndex, SimpleField # type: ignore
from azure.search.documents.indexes import SearchIndexClient
from resilient_http_pool import get_cloud_api_client
from entity_models import Document
from utils import asdict_with_dynamic, write_log, write_grouped_index_files
from di_extractor import retry_get_embedding
SUPPORTED_LANGUAGE_CODES = {
"ar": "Arabic",
"hy": "Armenian",
"eu": "Basque",
"bg": "Bulgarian",
"ca": "Catalan",
"zh-Hans": "Chinese Simplified",
"zh-Hant": "Chinese Traditional",
"cs": "Czech",
"da": "Danish",
"nl": "Dutch",
"en": "English",
"fi": "Finnish",
"fr": "French",
"gl": "Galician",
"de": "German",
"el": "Greek",
"hi": "Hindi",
"hu": "Hungarian",
"id": "Indonesian (Bahasa)",
"ga": "Irish",
"it": "Italian",
"ja": "Japanese",
"ko": "Korean",
"lv": "Latvian",
"no": "Norwegian",
"fa": "Persian",
"pl": "Polish",
"pt-Br": "Portuguese (Brazil)",
"pt-Pt": "Portuguese (Portugal)",
"ro": "Romanian",
"ru": "Russian",
"es": "Spanish",
"sv": "Swedish",
"th": "Thai",
"tr": "Turkish"
}
def index_init(data_config: dict[str, Any] , search_admin_key:str, search_service_name:str) -> None:
index_schemas: dict[str, Any] = data_config.get("index_schemas") if data_config else None # type: ignore
admin_key = search_admin_key if search_admin_key else None
service_name = search_service_name
for schema_name in index_schemas:
language = data_config.get("language", None)
if language and language not in SUPPORTED_LANGUAGE_CODES:
raise Exception(f"ERROR: Ingestion does not support {language} documents. "
f"Please use one of {SUPPORTED_LANGUAGE_CODES}."
f"Language is set as two letter code for e.g. 'en' for English."
f"If you donot want to set a language just remove this prompt config or set as None")
# Basic index structure initialization
create_or_update_search_index(service_name=service_name, index_name=schema_name["index_name"],
semantic_config_name=schema_name["semantic_config_name"],
vector_config_name=schema_name["vector_config_name"],
language=language,admin_key=admin_key,
meta_fields = schema_name["fields"])
def create_or_update_search_index(service_name: str|None, index_name: str|None, semantic_config_name: str = "default", vector_config_name: str = "", language:str="", admin_key: str = "", meta_fields: list[str]|None = None):
url = f"{service_name}/indexes/{index_name}?api-version=2024-11-01-Preview"
headers: dict[str, str] = {"Content-Type": "application/json", "api-key": admin_key}
body: dict[str, Any] = {
"fields": [
{"name":"session_id","type":"Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": True},
{"name": "id","type": "Edm.String","searchable": True,"key": True,},
{"name": "content","type": "Edm.String","searchable": True,"sortable": False,"facetable": False,"filterable": False,"analyzer": f"{language}.lucene" if language else None,},
{"name": "title","type": "Edm.String","searchable": True,"sortable": True,"facetable": False,"filterable": False,"analyzer": f"{language}.lucene" if language else None,},
{"name": "filepath","type": "Edm.String", "searchable": True,"sortable": True,"facetable": False,"filterable": True},
{"name": "url","type": "Edm.String","searchable": True,"sortable": True,"filterable": True},
{ "name": "metadata", "type": "Edm.String", "searchable": True, "filterable": True },
{ "name": "image_mapping", "type": "Edm.String", "searchable": False, "sortable": False, "facetable": False, "filterable": True },
{ "name": "doc_metadata", "type": "Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": False },
{ "name": "document_schema", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True },
{ "name": "main_title", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True },
{
"name": "sub_title",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "publisher",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "document_code",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "document_category",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "main_title_sec_language",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "sub_title_sec_language",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "primary_language",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "secondary_language",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "full_headers",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "h1",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "h2",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "h3",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "h4",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "h5",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "h6",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "timestamp",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": True,
"filterable": True
},
{
"name": "publish_date",
"type": "Edm.String",
"searchable": True,
"sortable": True,
"facetable": False,
"filterable": True
},
{
"name": "description",
"type": "Edm.String",
"searchable": True,
"sortable": False,
"facetable": False,
"filterable": True
}
],
"suggesters": [],
"scoringProfiles": [],
"semantic": {
"configurations": [
{
"name": semantic_config_name,
"prioritizedFields": {
"titleField": {"fieldName": "title"},
"prioritizedContentFields": [{"fieldName": "content"}],
"prioritizedKeywordsFields": [{"fieldName": "full_headers"}, {"fieldName": "doc_metadata"}],
},
}
]
},
}
if vector_config_name:
body["fields"].append({
"name": "contentVector",
"type": "Collection(Edm.Single)",
"searchable": True,
"retrievable": True,
"stored": True,
"dimensions": int(os.getenv("VECTOR_DIMENSION", "1536")),
"vectorSearchProfile": vector_config_name
})
body["fields"].append({
"name": "full_metadata_vector",
"type": "Collection(Edm.Single)",
"searchable": True,
"retrievable": True,
"stored": True,
"dimensions": int(os.getenv("VECTOR_DIMENSION", "1536")),
"vectorSearchProfile": vector_config_name
})
body["vectorSearch"] = {
"algorithms": [
{
"name": "my-hnsw-config-1",
"kind": "hnsw",
"hnswParameters": {
"m": 4,
"efConstruction": 400,
"efSearch": 500,
"metric": "cosine"
}
}
],
"profiles": [
{
"name": "vectorSearchProfile",
"algorithm": "my-hnsw-config-1",
# "vectorizer": "azure_vectorizer"
}
],
}
if os.getenv("AOAI_EMBEDDING_ENDPOINT"):
body["vectorSearch"]["profiles"][0]["vectorizer"] = "azure_vectorizer"
body["vectorSearch"]["vectorizers"] = [
{
"name": "azure_vectorizer",
"kind": "azureOpenAI",
"azureOpenAIParameters": {
"resourceUri": os.getenv("AOAI_EMBEDDING_ENDPOINT"),
"deploymentId": os.getenv("AOAI_EMBEDDING_DEPLOYMENT"),
"apiKey": os.getenv("AOAI_EMBEDDING_KEY"),
"modelName": os.getenv("AOAI_EMBEDDING_MODEL")
}
}
]
for field in meta_fields if meta_fields is not None else []:
if not any(str(item["name"]) == field for item in body['fields']):
sortable:bool = True
facetable:bool = True
filterable:bool = True
if field in ["x_Standard_Range"]:
sortable = False
facetable = False
filterable = False
body["fields"].append({
"name": field,
"type": "Edm.String",
"searchable": True,
"sortable": sortable,
"facetable": facetable,
"filterable": filterable
})
client = get_cloud_api_client()
response = client.put(url, json=body, headers=headers)
if response.status_code == 201:
print(f"Created search index {index_name}")
elif response.status_code == 204:
print(f"Updated existing search index {index_name}")
else:
raise Exception(f"Failed to create search index. Status Code:{response.status_code}, Error: {response.text}")
return True
def upload_documents_to_index(service_name:str, index_name:str, docs, upload_batch_size:int=50, admin_key:str|None=None):
if admin_key is None:
raise ValueError("credential and admin_key cannot be None")
to_upload_dicts = []
for d in docs:
# Get dynamically added attributes
if type(d) is not dict:
d = asdict_with_dynamic(d)
# add id to documents
d.update({"@search.action": "upload", "id": d["id"]})
if "contentVector" in d and d["contentVector"] is None:
del d["contentVector"]
if "full_metadata_vector" in d and d["full_metadata_vector"] is None:
del d["full_metadata_vector"]
to_upload_dicts.append(d)
# endpoint = "https://{}.search.windows.net/".format(service_name)
endpoint: str = service_name
search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(admin_key))
# Upload the documents in batches of upload_batch_size
for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc="Indexing Chunks..."):
batch = to_upload_dicts[i: i + upload_batch_size]
results = search_client.upload_documents(documents=batch)
num_failures = 0
errors = set()
for result in results:
if not result.succeeded:
print(f"Indexing Failed for {result.key} with ERROR: {result.error_message}")
num_failures += 1
errors.add(result.error_message)
if num_failures > 0:
raise Exception(f"INDEXING FAILED for {num_failures} documents. Please recreate the index."
f"To Debug: PLEASE CHECK chunk_size and upload_batch_size. \n Error Messages: {list(errors)}")
def upload_merge_index(index_config: Any, docs:list[dict[str,Any]],merge_fields:list[dict[str,Any]]|None=None,current_tmp_directory:str='') -> bool:
"""
Merge chunk information and upload to AI search index
"""
index_name: str = index_config["index_name"]
embedding_endpoint: str = os.environ.get("embedding_model_endpoint", '')
embedding_model_key: str = os.environ.get("embedding_model_key", '') #config.embedding_model_key
fields_meta: Any = index_config["fields"] or []
merge_content_fields: Any = index_config[ "merge_content_fields"] if "merge_content_fields" in index_config.keys() else []
key_fields: Any = index_config["key_fields"] if "key_fields" in index_config.keys() else []
all_fields = list(dict.fromkeys(["id"] + fields_meta + merge_content_fields + key_fields + [f.name for f in fields(Document)] ))
upload_batch_size = index_config["upload_batch_size"] if "upload_batch_size" in index_config.keys() else 1
original_to_upload_dicts: list[Any] = []
for d in docs:
# Get dynamically added attributes
if type(d) is not dict:
d = asdict_with_dynamic(d)
for key in list(d.keys()):
if key not in all_fields:
del d[key]
if ("contentVector" in d) and (d["contentVector"] is None or "contentVector" not in all_fields):
del d["contentVector"]
if ("full_metadata_vector" in d) and (
d["full_metadata_vector"] is None or "full_metadata_vector" not in all_fields):
del d["full_metadata_vector"]
# Default id primary key assignment, key_fields content merge and base64
id_value = d["id"] if "id" in d else ""
if "key_fields" in index_config.keys():
id_value = '_'.join(str(d[k]) for k in key_fields if k in d)
if id_value is None or id_value == "":
continue
# Select certain fields, concatenate to another field
for merge_field in merge_fields:
d[merge_field["key"]] = json.dumps( {field: d[field] for field in merge_field["fields"] if field in d and (value := d[field]) is not None and value != ""}, ensure_ascii=False)
d["id"] = base64.urlsafe_b64encode(id_value.encode('utf-8')).decode('utf-8') \
# add id to documents
d.update({"@search.action": "upload", "id": d["id"]})
d.update({"session_id":str(uuid6.uuid7())})
original_to_upload_dicts.append(d)
to_upload_dicts = original_to_upload_dicts
current_object_key = to_upload_dicts[0]["filepath"] if len(to_upload_dicts) > 0 and "filepath" in to_upload_dicts[0] else ''
# Calculate vector data based on configuration fields
for vector_config in index_config["vector_fields"] if "vector_fields" in index_config.keys() else []:
for i in tqdm(range(0, len(to_upload_dicts), 1), desc=f"{current_object_key} vector {vector_config["field"]} embedding..."):
d = to_upload_dicts[i: i + 1][0]
vector_dict = {}
for field in vector_config["append_fields"]:
if isinstance(d[field], dict):
vector_dict |= d[field]
elif isinstance(d[field], str):
vector_dict[field] = d[field]
vector_str = str(vector_dict) if vector_dict else ""
embedding = retry_get_embedding(text=vector_str, embedding_model_key=embedding_model_key, embedding_endpoint=embedding_endpoint)
if embedding:
d[vector_config["field"]] = retry_get_embedding(text=vector_str, embedding_model_key=embedding_model_key, embedding_endpoint=embedding_endpoint)
# 根据to_upload_dicts种的filepath字段分组写入到.index目录下对应的json文件
write_grouped_index_files(to_upload_dicts, index_name=index_name, base_directory=current_tmp_directory)
results: list[bool] = []
# Upload the documents in batches of upload_batch_size
for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc=f"Indexing {index_name} Chunks..."):
batch = to_upload_dicts[i: i + upload_batch_size]
results.append(upload_and_ensure(index_name=index_name, docs=batch, key_field="session_id"))
return all(results)
def merge_dicts(data_list, key_fields, merge_fields, separator='\n'):
"""
Merge dictionary list based on specified fields
Arguments:
data_list -- Original dictionary list
key_fields -- Fields used for deduplication (e.g., ['title', 'filepath'])
merge_fields -- Fields to be merged (e.g., ['content'])
separator -- Separator used for merging fields (default is newline)
Returns:
New dictionary list after merging
"""
merged_dict = {}
for item in data_list:
# Create a unique key - a tuple of all key fields
key = tuple(item.get(field) for field in key_fields)
if key in merged_dict:
# Merge fields
existing = merged_dict[key]
for field in merge_fields:
# Merge new value with old value
existing[field] = separator.join([
existing.get(field, ''),
item.get(field, '')
]).strip(separator)
else:
# Create new record
merged_dict[key] = {
**item, # Copy original fields
# Pre-initialize merged fields
**{field: item.get(field, '') for field in merge_fields}
}
return list(merged_dict.values())
def validate_index(service_name:str, index_name:str, admin_key:str=None):
api_version = "2024-11-01-Preview"
headers = {"Content-Type": "application/json", "api-key": admin_key}
params = {"api-version": api_version}
url = f"{service_name}/indexes/{index_name}/stats"
client = get_cloud_api_client()
for retry_count in range(5):
response = client.get(url, headers=headers, params=params)
if response.status_code == 200:
response_data = response.json()
num_chunks = response_data['documentCount']
if num_chunks == 0 and retry_count < 10:
print("Index is empty. Waiting 20 seconds to check again...")
time.sleep(20)
elif num_chunks == 0 and retry_count == 10:
print("Index is empty. Please investigate and re-index.")
else:
print(f"The index contains {num_chunks} chunks.")
average_chunk_size = response_data['storageSize'] / num_chunks
print(f"The average chunk size of the index is {average_chunk_size} bytes.")
break
else:
if response.status_code == 404:
print("The index does not seem to exist. Please make sure the index was created correctly, and that you are using the correct service and index names")
elif response.status_code == 403:
print("Authentication Failure: Make sure you are using the correct key")
else:
print(f"Request failed. Please investigate. Status code: {response.status_code}")
break
def index_exists(index_name: str) -> bool:
try:
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
endpoint = search_service_name
credential = AzureKeyCredential(search_admin_key)
index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
index_client.get_index(index_name)
return True
except Exception as e:
write_log(f"Index '{index_name}' does not exist: {e}")
return False
def create_index(index_name:str, index_fields: list[dict[str, Any]], suggesters: Optional[list[dict[str, Any]]] = None) -> None:
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
endpoint = search_service_name
credential = AzureKeyCredential(search_admin_key)
index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
if index_exists(index_name=index_name):
write_log(f"Index '{index_name}' already exists.")
return
search_fields = [SimpleField(**field) for field in index_fields]
index = SearchIndex(name=index_name, fields=search_fields, suggesters=suggesters or [])
index_client.create_index(index)
write_log(f"Index '{index_name}' created.")
def upload_documents(index_name:str, documents: List[Dict[str, Any]]) -> None:
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
endpoint = search_service_name
credential = AzureKeyCredential(search_admin_key)
search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
batch = IndexDocumentsBatch()
batch.add_merge_or_upload_actions(documents) #type: ignore
results = search_client.index_documents(batch)
write_log(f"Uploaded {len(documents)} documents to index '{index_name}'. Result: {results}")
def delete_index(index_name:str) -> None:
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
endpoint = search_service_name
credential = AzureKeyCredential(search_admin_key)
index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
if index_exists(index_name=index_name):
index_client.delete_index(index_name)
write_log(f"Index '{index_name}' deleted.")
else:
write_log(f"Index '{index_name}' does not exist.")
def search(index_name, search_text: str, **kwargs) -> Any:
endpoint = os.getenv("search_service_name","")
credential = AzureKeyCredential(os.getenv("search_admin_key",""))
index_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
return index_client.search(search_text, **kwargs)
def documents_with_field_value_exist(index_name:str, field_name: str, value: Any) -> bool:
"""
Check if there are documents in the index where a specific field equals the given value.
"""
endpoint = os.getenv("search_service_name", "")
credential = AzureKeyCredential(os.getenv("search_admin_key", ""))
index_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
filter_query = f"{field_name} eq '{value}'" if isinstance(value, str) else f"{field_name} eq {value}"
results: Any = index_client.search("*", filter=filter_query, top=1)
for _ in results:
return True
return False
def delete_documents_by_field(index_name:str,field_name: str, value: Any) -> bool:
"""
Delete all documents where the specified field equals the given value.
"""
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
search_client = SearchClient(endpoint=search_service_name, index_name=index_name, credential=AzureKeyCredential(search_admin_key))
# Step 1: Retrieve documents that meet the criteria (here looking for documents with status field as "inactive")
query = f"{field_name} eq '{value}'"
results: Any = search_client.search(select=["id"], filter=query)
if not results:
return True
# Step 2: Extract the primary keys (id) of the documents to be deleted
keys_to_delete = [doc['id'] for doc in results]
# Step 3: Delete the documents that meet the criteria
if keys_to_delete:
# Use batch delete API to remove documents
delete_results:list[IndexingResult] = search_client.delete_documents(documents=[{'id': key} for key in keys_to_delete])#type: ignore
logging.getLogger().info(f"Deleted documents with keys: {keys_to_delete}")
return all(result.succeeded for result in delete_results)
else:
return False
def query_by_field( index_name: str, field_name: str, value: Any, top: int = 99999) -> list[dict[Any,Any]]:
"""
Query documents in the index where a specific field equals the given value.
:param field_name: The field to filter on.
:param value: The value to match.
:param top: Maximum number of results to return.
:return: List of matching documents.
"""
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
search_client = SearchClient(endpoint = search_service_name, index_name=index_name,credential=AzureKeyCredential(search_admin_key))
filter_query = f"{field_name} eq '{value}'" if isinstance(value, str) else f"{field_name} eq {value}"
results:Any = search_client.search("*", filter=filter_query, top=top)
return [doc for doc in results]
def upload_and_ensure(index_name:str, docs: list[dict[Any, Any]], key_field="session_id", delay_seconds:int=5, max_retries:int=5) -> bool:
search_service_name = os.getenv("search_service_name", "")
search_admin_key = os.getenv("search_admin_key", "")
endpoint = search_service_name
api_key = search_admin_key
client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(api_key))
# Step 1: Batch submit MergeOrUpload
batch = IndexDocumentsBatch()
batch.add_merge_or_upload_actions(docs) # type: ignore
results = client.index_documents(batch)
# Step 2: Check status of each document
failed = [r.key for r in results if not r.succeeded]
if failed:
raise Exception(f"Initial submission failed for documents: {failed}")
return True
# # Step 3: Delay waiting for background index
# time.sleep(delay_seconds)
# # Step 4: Verify and retry
# keys: list[str] = [doc[key_field] for doc in docs]
# return verify_and_retry(client, keys, docs, key_field, delay_seconds, max_retries)
def verify_and_retry(client: SearchClient, keys: list[str], docs, key_field, delay_seconds, max_retries) -> bool:
attempt = 0
session_id = str(uuid.uuid4())
while attempt <= max_retries:
missing = find_missing(client, keys, session_id)
if not missing:
return True
attempt += 1
print(f"Retry {attempt}, missing: {missing}")
to_retry = [doc for doc in docs if doc[key_field] in missing]
batch = IndexDocumentsBatch()
actions = [batch.add_merge_or_upload_actions([doc]) for doc in to_retry]
client.index_documents(batch)
time.sleep(delay_seconds)
# Final check
missing = find_missing(client, keys, session_id)
if missing:
raise Exception(f"Index verification failed, the following documents were not indexed: {missing}")
return True
def find_missing(client: SearchClient, keys: list[str], session_id: str) -> list[str]:
missing: list[str] = []
for key in keys:
try:
results = client.search(filter=f"session_id eq '{key}'", top=1)
if not any(results):
missing.append(key)
except HttpResponseError:
missing.append(key)
return missing