752 lines
30 KiB
Python
752 lines
30 KiB
Python
"""
|
||
Azure AI index search service
|
||
Provides operations for Azure AI Search Index, including creating indexes, uploading documents, checking if an index exists, etc.
|
||
"""
|
||
import base64
|
||
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
import uuid
|
||
from dataclasses import fields
|
||
from typing import List, Dict, Any, Optional
|
||
from tqdm import tqdm
|
||
import uuid6
|
||
from azure.core.credentials import AzureKeyCredential
|
||
from azure.core.exceptions import HttpResponseError
|
||
from azure.search.documents import SearchClient, IndexDocumentsBatch
|
||
from azure.search.documents._generated.models import IndexingResult
|
||
from azure.search.documents.indexes.models import SearchIndex, SimpleField # type: ignore
|
||
from azure.search.documents.indexes import SearchIndexClient
|
||
from resilient_http_pool import get_cloud_api_client
|
||
from entity_models import Document
|
||
from utils import asdict_with_dynamic, write_log, write_grouped_index_files
|
||
from di_extractor import retry_get_embedding
|
||
|
||
|
||
SUPPORTED_LANGUAGE_CODES = {
|
||
"ar": "Arabic",
|
||
"hy": "Armenian",
|
||
"eu": "Basque",
|
||
"bg": "Bulgarian",
|
||
"ca": "Catalan",
|
||
"zh-Hans": "Chinese Simplified",
|
||
"zh-Hant": "Chinese Traditional",
|
||
"cs": "Czech",
|
||
"da": "Danish",
|
||
"nl": "Dutch",
|
||
"en": "English",
|
||
"fi": "Finnish",
|
||
"fr": "French",
|
||
"gl": "Galician",
|
||
"de": "German",
|
||
"el": "Greek",
|
||
"hi": "Hindi",
|
||
"hu": "Hungarian",
|
||
"id": "Indonesian (Bahasa)",
|
||
"ga": "Irish",
|
||
"it": "Italian",
|
||
"ja": "Japanese",
|
||
"ko": "Korean",
|
||
"lv": "Latvian",
|
||
"no": "Norwegian",
|
||
"fa": "Persian",
|
||
"pl": "Polish",
|
||
"pt-Br": "Portuguese (Brazil)",
|
||
"pt-Pt": "Portuguese (Portugal)",
|
||
"ro": "Romanian",
|
||
"ru": "Russian",
|
||
"es": "Spanish",
|
||
"sv": "Swedish",
|
||
"th": "Thai",
|
||
"tr": "Turkish"
|
||
}
|
||
|
||
def index_init(data_config: dict[str, Any] , search_admin_key:str, search_service_name:str) -> None:
|
||
|
||
index_schemas: dict[str, Any] = data_config.get("index_schemas") if data_config else None # type: ignore
|
||
|
||
admin_key = search_admin_key if search_admin_key else None
|
||
service_name = search_service_name
|
||
for schema_name in index_schemas:
|
||
language = data_config.get("language", None)
|
||
|
||
if language and language not in SUPPORTED_LANGUAGE_CODES:
|
||
raise Exception(f"ERROR: Ingestion does not support {language} documents. "
|
||
f"Please use one of {SUPPORTED_LANGUAGE_CODES}."
|
||
f"Language is set as two letter code for e.g. 'en' for English."
|
||
f"If you donot want to set a language just remove this prompt config or set as None")
|
||
|
||
# Basic index structure initialization
|
||
create_or_update_search_index(service_name=service_name, index_name=schema_name["index_name"],
|
||
semantic_config_name=schema_name["semantic_config_name"],
|
||
vector_config_name=schema_name["vector_config_name"],
|
||
language=language,admin_key=admin_key,
|
||
meta_fields = schema_name["fields"])
|
||
|
||
|
||
def create_or_update_search_index(service_name: str|None, index_name: str|None, semantic_config_name: str = "default", vector_config_name: str = "", language:str="", admin_key: str = "", meta_fields: list[str]|None = None):
|
||
url = f"{service_name}/indexes/{index_name}?api-version=2024-11-01-Preview"
|
||
headers: dict[str, str] = {"Content-Type": "application/json", "api-key": admin_key}
|
||
|
||
body: dict[str, Any] = {
|
||
"fields": [
|
||
{"name":"session_id","type":"Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": True},
|
||
{"name": "id","type": "Edm.String","searchable": True,"key": True,},
|
||
{"name": "content","type": "Edm.String","searchable": True,"sortable": False,"facetable": False,"filterable": False,"analyzer": f"{language}.lucene" if language else None,},
|
||
{"name": "title","type": "Edm.String","searchable": True,"sortable": True,"facetable": False,"filterable": False,"analyzer": f"{language}.lucene" if language else None,},
|
||
{"name": "filepath","type": "Edm.String", "searchable": True,"sortable": True,"facetable": False,"filterable": True},
|
||
{"name": "url","type": "Edm.String","searchable": True,"sortable": True,"filterable": True},
|
||
{ "name": "metadata", "type": "Edm.String", "searchable": True, "filterable": True },
|
||
{ "name": "image_mapping", "type": "Edm.String", "searchable": False, "sortable": False, "facetable": False, "filterable": True },
|
||
{ "name": "doc_metadata", "type": "Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": False },
|
||
{ "name": "document_schema", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True },
|
||
{ "name": "main_title", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True },
|
||
{
|
||
"name": "sub_title",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "publisher",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "document_code",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "document_category",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "main_title_sec_language",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "sub_title_sec_language",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "primary_language",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "secondary_language",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "full_headers",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "h1",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "h2",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "h3",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "h4",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "h5",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "h6",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "timestamp",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": True,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "publish_date",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": True,
|
||
"facetable": False,
|
||
"filterable": True
|
||
},
|
||
{
|
||
"name": "description",
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": False,
|
||
"facetable": False,
|
||
"filterable": True
|
||
}
|
||
],
|
||
"suggesters": [],
|
||
"scoringProfiles": [],
|
||
"semantic": {
|
||
"configurations": [
|
||
{
|
||
"name": semantic_config_name,
|
||
"prioritizedFields": {
|
||
"titleField": {"fieldName": "title"},
|
||
"prioritizedContentFields": [{"fieldName": "content"}],
|
||
"prioritizedKeywordsFields": [{"fieldName": "full_headers"}, {"fieldName": "doc_metadata"}],
|
||
},
|
||
}
|
||
]
|
||
},
|
||
}
|
||
|
||
if vector_config_name:
|
||
body["fields"].append({
|
||
"name": "contentVector",
|
||
"type": "Collection(Edm.Single)",
|
||
"searchable": True,
|
||
"retrievable": True,
|
||
"stored": True,
|
||
"dimensions": int(os.getenv("VECTOR_DIMENSION", "1536")),
|
||
"vectorSearchProfile": vector_config_name
|
||
})
|
||
|
||
body["fields"].append({
|
||
"name": "full_metadata_vector",
|
||
"type": "Collection(Edm.Single)",
|
||
"searchable": True,
|
||
"retrievable": True,
|
||
"stored": True,
|
||
"dimensions": int(os.getenv("VECTOR_DIMENSION", "1536")),
|
||
"vectorSearchProfile": vector_config_name
|
||
})
|
||
|
||
body["vectorSearch"] = {
|
||
"algorithms": [
|
||
{
|
||
"name": "my-hnsw-config-1",
|
||
"kind": "hnsw",
|
||
"hnswParameters": {
|
||
"m": 4,
|
||
"efConstruction": 400,
|
||
"efSearch": 500,
|
||
"metric": "cosine"
|
||
}
|
||
}
|
||
],
|
||
"profiles": [
|
||
{
|
||
"name": "vectorSearchProfile",
|
||
"algorithm": "my-hnsw-config-1",
|
||
# "vectorizer": "azure_vectorizer"
|
||
}
|
||
],
|
||
}
|
||
|
||
if os.getenv("AOAI_EMBEDDING_ENDPOINT"):
|
||
body["vectorSearch"]["profiles"][0]["vectorizer"] = "azure_vectorizer"
|
||
body["vectorSearch"]["vectorizers"] = [
|
||
{
|
||
"name": "azure_vectorizer",
|
||
"kind": "azureOpenAI",
|
||
"azureOpenAIParameters": {
|
||
"resourceUri": os.getenv("AOAI_EMBEDDING_ENDPOINT"),
|
||
"deploymentId": os.getenv("AOAI_EMBEDDING_DEPLOYMENT"),
|
||
"apiKey": os.getenv("AOAI_EMBEDDING_KEY"),
|
||
"modelName": os.getenv("AOAI_EMBEDDING_MODEL")
|
||
}
|
||
}
|
||
]
|
||
|
||
|
||
for field in meta_fields if meta_fields is not None else []:
|
||
if not any(str(item["name"]) == field for item in body['fields']):
|
||
sortable:bool = True
|
||
facetable:bool = True
|
||
filterable:bool = True
|
||
if field in ["x_Standard_Range"]:
|
||
sortable = False
|
||
facetable = False
|
||
filterable = False
|
||
body["fields"].append({
|
||
"name": field,
|
||
"type": "Edm.String",
|
||
"searchable": True,
|
||
"sortable": sortable,
|
||
"facetable": facetable,
|
||
"filterable": filterable
|
||
})
|
||
|
||
client = get_cloud_api_client()
|
||
response = client.put(url, json=body, headers=headers)
|
||
if response.status_code == 201:
|
||
print(f"Created search index {index_name}")
|
||
elif response.status_code == 204:
|
||
print(f"Updated existing search index {index_name}")
|
||
else:
|
||
raise Exception(f"Failed to create search index. Status Code:{response.status_code}, Error: {response.text}")
|
||
|
||
return True
|
||
|
||
|
||
def upload_documents_to_index(service_name:str, index_name:str, docs, upload_batch_size:int=50, admin_key:str|None=None):
|
||
if admin_key is None:
|
||
raise ValueError("credential and admin_key cannot be None")
|
||
|
||
to_upload_dicts = []
|
||
|
||
for d in docs:
|
||
# Get dynamically added attributes
|
||
if type(d) is not dict:
|
||
d = asdict_with_dynamic(d)
|
||
|
||
# add id to documents
|
||
d.update({"@search.action": "upload", "id": d["id"]})
|
||
if "contentVector" in d and d["contentVector"] is None:
|
||
del d["contentVector"]
|
||
if "full_metadata_vector" in d and d["full_metadata_vector"] is None:
|
||
del d["full_metadata_vector"]
|
||
to_upload_dicts.append(d)
|
||
|
||
# endpoint = "https://{}.search.windows.net/".format(service_name)
|
||
endpoint: str = service_name
|
||
|
||
search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(admin_key))
|
||
|
||
# Upload the documents in batches of upload_batch_size
|
||
for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc="Indexing Chunks..."):
|
||
batch = to_upload_dicts[i: i + upload_batch_size]
|
||
results = search_client.upload_documents(documents=batch)
|
||
num_failures = 0
|
||
errors = set()
|
||
for result in results:
|
||
if not result.succeeded:
|
||
print(f"Indexing Failed for {result.key} with ERROR: {result.error_message}")
|
||
num_failures += 1
|
||
errors.add(result.error_message)
|
||
if num_failures > 0:
|
||
raise Exception(f"INDEXING FAILED for {num_failures} documents. Please recreate the index."
|
||
f"To Debug: PLEASE CHECK chunk_size and upload_batch_size. \n Error Messages: {list(errors)}")
|
||
|
||
|
||
|
||
|
||
def upload_merge_index(index_config: Any, docs:list[dict[str,Any]],merge_fields:list[dict[str,Any]]|None=None,current_tmp_directory:str='') -> bool:
|
||
"""
|
||
Merge chunk information and upload to AI search index
|
||
"""
|
||
index_name: str = index_config["index_name"]
|
||
embedding_endpoint: str = os.environ.get("embedding_model_endpoint", '')
|
||
embedding_model_key: str = os.environ.get("embedding_model_key", '') #config.embedding_model_key
|
||
|
||
fields_meta: Any = index_config["fields"] or []
|
||
merge_content_fields: Any = index_config[ "merge_content_fields"] if "merge_content_fields" in index_config.keys() else []
|
||
key_fields: Any = index_config["key_fields"] if "key_fields" in index_config.keys() else []
|
||
|
||
all_fields = list(dict.fromkeys(["id"] + fields_meta + merge_content_fields + key_fields + [f.name for f in fields(Document)] ))
|
||
upload_batch_size = index_config["upload_batch_size"] if "upload_batch_size" in index_config.keys() else 1
|
||
|
||
original_to_upload_dicts: list[Any] = []
|
||
|
||
for d in docs:
|
||
# Get dynamically added attributes
|
||
if type(d) is not dict:
|
||
d = asdict_with_dynamic(d)
|
||
|
||
for key in list(d.keys()):
|
||
if key not in all_fields:
|
||
del d[key]
|
||
|
||
if ("contentVector" in d) and (d["contentVector"] is None or "contentVector" not in all_fields):
|
||
del d["contentVector"]
|
||
if ("full_metadata_vector" in d) and (
|
||
d["full_metadata_vector"] is None or "full_metadata_vector" not in all_fields):
|
||
del d["full_metadata_vector"]
|
||
|
||
# Default id primary key assignment, key_fields content merge and base64
|
||
id_value = d["id"] if "id" in d else ""
|
||
if "key_fields" in index_config.keys():
|
||
id_value = '_'.join(str(d[k]) for k in key_fields if k in d)
|
||
|
||
if id_value is None or id_value == "":
|
||
continue
|
||
# Select certain fields, concatenate to another field
|
||
|
||
for merge_field in merge_fields:
|
||
d[merge_field["key"]] = json.dumps( {field: d[field] for field in merge_field["fields"] if field in d and (value := d[field]) is not None and value != ""}, ensure_ascii=False)
|
||
|
||
d["id"] = base64.urlsafe_b64encode(id_value.encode('utf-8')).decode('utf-8') \
|
||
# add id to documents
|
||
d.update({"@search.action": "upload", "id": d["id"]})
|
||
d.update({"session_id":str(uuid6.uuid7())})
|
||
original_to_upload_dicts.append(d)
|
||
|
||
to_upload_dicts = original_to_upload_dicts
|
||
current_object_key = to_upload_dicts[0]["filepath"] if len(to_upload_dicts) > 0 and "filepath" in to_upload_dicts[0] else ''
|
||
|
||
|
||
# Calculate vector data based on configuration fields
|
||
for vector_config in index_config["vector_fields"] if "vector_fields" in index_config.keys() else []:
|
||
for i in tqdm(range(0, len(to_upload_dicts), 1), desc=f"{current_object_key} vector {vector_config["field"]} embedding..."):
|
||
d = to_upload_dicts[i: i + 1][0]
|
||
vector_dict = {}
|
||
for field in vector_config["append_fields"]:
|
||
if isinstance(d[field], dict):
|
||
vector_dict |= d[field]
|
||
elif isinstance(d[field], str):
|
||
vector_dict[field] = d[field]
|
||
vector_str = str(vector_dict) if vector_dict else ""
|
||
embedding = retry_get_embedding(text=vector_str, embedding_model_key=embedding_model_key, embedding_endpoint=embedding_endpoint)
|
||
if embedding:
|
||
d[vector_config["field"]] = retry_get_embedding(text=vector_str, embedding_model_key=embedding_model_key, embedding_endpoint=embedding_endpoint)
|
||
|
||
# 根据to_upload_dicts种的filepath字段分组,写入到.index目录下对应的json文件
|
||
write_grouped_index_files(to_upload_dicts, index_name=index_name, base_directory=current_tmp_directory)
|
||
|
||
results: list[bool] = []
|
||
# Upload the documents in batches of upload_batch_size
|
||
for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc=f"Indexing {index_name} Chunks..."):
|
||
batch = to_upload_dicts[i: i + upload_batch_size]
|
||
results.append(upload_and_ensure(index_name=index_name, docs=batch, key_field="session_id"))
|
||
return all(results)
|
||
|
||
|
||
|
||
def merge_dicts(data_list, key_fields, merge_fields, separator='\n'):
|
||
"""
|
||
Merge dictionary list based on specified fields
|
||
|
||
Arguments:
|
||
data_list -- Original dictionary list
|
||
key_fields -- Fields used for deduplication (e.g., ['title', 'filepath'])
|
||
merge_fields -- Fields to be merged (e.g., ['content'])
|
||
separator -- Separator used for merging fields (default is newline)
|
||
|
||
Returns:
|
||
New dictionary list after merging
|
||
"""
|
||
merged_dict = {}
|
||
|
||
for item in data_list:
|
||
# Create a unique key - a tuple of all key fields
|
||
key = tuple(item.get(field) for field in key_fields)
|
||
|
||
if key in merged_dict:
|
||
# Merge fields
|
||
existing = merged_dict[key]
|
||
for field in merge_fields:
|
||
# Merge new value with old value
|
||
existing[field] = separator.join([
|
||
existing.get(field, ''),
|
||
item.get(field, '')
|
||
]).strip(separator)
|
||
else:
|
||
# Create new record
|
||
merged_dict[key] = {
|
||
**item, # Copy original fields
|
||
# Pre-initialize merged fields
|
||
**{field: item.get(field, '') for field in merge_fields}
|
||
}
|
||
|
||
return list(merged_dict.values())
|
||
|
||
|
||
def validate_index(service_name:str, index_name:str, admin_key:str=None):
|
||
api_version = "2024-11-01-Preview"
|
||
headers = {"Content-Type": "application/json", "api-key": admin_key}
|
||
params = {"api-version": api_version}
|
||
url = f"{service_name}/indexes/{index_name}/stats"
|
||
client = get_cloud_api_client()
|
||
for retry_count in range(5):
|
||
response = client.get(url, headers=headers, params=params)
|
||
if response.status_code == 200:
|
||
response_data = response.json()
|
||
num_chunks = response_data['documentCount']
|
||
if num_chunks == 0 and retry_count < 10:
|
||
print("Index is empty. Waiting 20 seconds to check again...")
|
||
time.sleep(20)
|
||
elif num_chunks == 0 and retry_count == 10:
|
||
print("Index is empty. Please investigate and re-index.")
|
||
else:
|
||
print(f"The index contains {num_chunks} chunks.")
|
||
average_chunk_size = response_data['storageSize'] / num_chunks
|
||
print(f"The average chunk size of the index is {average_chunk_size} bytes.")
|
||
break
|
||
else:
|
||
if response.status_code == 404:
|
||
print("The index does not seem to exist. Please make sure the index was created correctly, and that you are using the correct service and index names")
|
||
elif response.status_code == 403:
|
||
print("Authentication Failure: Make sure you are using the correct key")
|
||
else:
|
||
print(f"Request failed. Please investigate. Status code: {response.status_code}")
|
||
break
|
||
|
||
|
||
def index_exists(index_name: str) -> bool:
|
||
try:
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
|
||
endpoint = search_service_name
|
||
credential = AzureKeyCredential(search_admin_key)
|
||
index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
|
||
|
||
index_client.get_index(index_name)
|
||
return True
|
||
except Exception as e:
|
||
write_log(f"Index '{index_name}' does not exist: {e}")
|
||
return False
|
||
|
||
|
||
def create_index(index_name:str, index_fields: list[dict[str, Any]], suggesters: Optional[list[dict[str, Any]]] = None) -> None:
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
|
||
endpoint = search_service_name
|
||
credential = AzureKeyCredential(search_admin_key)
|
||
index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
|
||
|
||
if index_exists(index_name=index_name):
|
||
write_log(f"Index '{index_name}' already exists.")
|
||
return
|
||
search_fields = [SimpleField(**field) for field in index_fields]
|
||
index = SearchIndex(name=index_name, fields=search_fields, suggesters=suggesters or [])
|
||
index_client.create_index(index)
|
||
write_log(f"Index '{index_name}' created.")
|
||
|
||
|
||
def upload_documents(index_name:str, documents: List[Dict[str, Any]]) -> None:
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
|
||
endpoint = search_service_name
|
||
credential = AzureKeyCredential(search_admin_key)
|
||
search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
|
||
batch = IndexDocumentsBatch()
|
||
batch.add_merge_or_upload_actions(documents) #type: ignore
|
||
results = search_client.index_documents(batch)
|
||
|
||
write_log(f"Uploaded {len(documents)} documents to index '{index_name}'. Result: {results}")
|
||
|
||
def delete_index(index_name:str) -> None:
|
||
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
endpoint = search_service_name
|
||
credential = AzureKeyCredential(search_admin_key)
|
||
index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
|
||
|
||
if index_exists(index_name=index_name):
|
||
index_client.delete_index(index_name)
|
||
write_log(f"Index '{index_name}' deleted.")
|
||
else:
|
||
write_log(f"Index '{index_name}' does not exist.")
|
||
|
||
|
||
def search(index_name, search_text: str, **kwargs) -> Any:
|
||
endpoint = os.getenv("search_service_name","")
|
||
credential = AzureKeyCredential(os.getenv("search_admin_key",""))
|
||
index_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
|
||
return index_client.search(search_text, **kwargs)
|
||
|
||
|
||
def documents_with_field_value_exist(index_name:str, field_name: str, value: Any) -> bool:
|
||
"""
|
||
Check if there are documents in the index where a specific field equals the given value.
|
||
"""
|
||
|
||
endpoint = os.getenv("search_service_name", "")
|
||
credential = AzureKeyCredential(os.getenv("search_admin_key", ""))
|
||
index_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
|
||
|
||
filter_query = f"{field_name} eq '{value}'" if isinstance(value, str) else f"{field_name} eq {value}"
|
||
results: Any = index_client.search("*", filter=filter_query, top=1)
|
||
for _ in results:
|
||
return True
|
||
return False
|
||
|
||
|
||
|
||
def delete_documents_by_field(index_name:str,field_name: str, value: Any) -> bool:
|
||
"""
|
||
Delete all documents where the specified field equals the given value.
|
||
"""
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
search_client = SearchClient(endpoint=search_service_name, index_name=index_name, credential=AzureKeyCredential(search_admin_key))
|
||
# Step 1: Retrieve documents that meet the criteria (here looking for documents with status field as "inactive")
|
||
query = f"{field_name} eq '{value}'"
|
||
results: Any = search_client.search(select=["id"], filter=query)
|
||
if not results:
|
||
return True
|
||
|
||
# Step 2: Extract the primary keys (id) of the documents to be deleted
|
||
keys_to_delete = [doc['id'] for doc in results]
|
||
|
||
# Step 3: Delete the documents that meet the criteria
|
||
if keys_to_delete:
|
||
# Use batch delete API to remove documents
|
||
delete_results:list[IndexingResult] = search_client.delete_documents(documents=[{'id': key} for key in keys_to_delete])#type: ignore
|
||
|
||
logging.getLogger().info(f"Deleted documents with keys: {keys_to_delete}")
|
||
return all(result.succeeded for result in delete_results)
|
||
else:
|
||
return False
|
||
|
||
|
||
|
||
def query_by_field( index_name: str, field_name: str, value: Any, top: int = 99999) -> list[dict[Any,Any]]:
|
||
"""
|
||
Query documents in the index where a specific field equals the given value.
|
||
:param field_name: The field to filter on.
|
||
:param value: The value to match.
|
||
:param top: Maximum number of results to return.
|
||
:return: List of matching documents.
|
||
"""
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
|
||
search_client = SearchClient(endpoint = search_service_name, index_name=index_name,credential=AzureKeyCredential(search_admin_key))
|
||
filter_query = f"{field_name} eq '{value}'" if isinstance(value, str) else f"{field_name} eq {value}"
|
||
results:Any = search_client.search("*", filter=filter_query, top=top)
|
||
return [doc for doc in results]
|
||
|
||
|
||
|
||
def upload_and_ensure(index_name:str, docs: list[dict[Any, Any]], key_field="session_id", delay_seconds:int=5, max_retries:int=5) -> bool:
|
||
search_service_name = os.getenv("search_service_name", "")
|
||
search_admin_key = os.getenv("search_admin_key", "")
|
||
|
||
endpoint = search_service_name
|
||
api_key = search_admin_key
|
||
|
||
client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(api_key))
|
||
|
||
# Step 1: Batch submit MergeOrUpload
|
||
batch = IndexDocumentsBatch()
|
||
batch.add_merge_or_upload_actions(docs) # type: ignore
|
||
results = client.index_documents(batch)
|
||
|
||
# Step 2: Check status of each document
|
||
failed = [r.key for r in results if not r.succeeded]
|
||
if failed:
|
||
raise Exception(f"Initial submission failed for documents: {failed}")
|
||
|
||
|
||
return True
|
||
# # Step 3: Delay waiting for background index
|
||
# time.sleep(delay_seconds)
|
||
|
||
# # Step 4: Verify and retry
|
||
# keys: list[str] = [doc[key_field] for doc in docs]
|
||
# return verify_and_retry(client, keys, docs, key_field, delay_seconds, max_retries)
|
||
|
||
|
||
def verify_and_retry(client: SearchClient, keys: list[str], docs, key_field, delay_seconds, max_retries) -> bool:
|
||
attempt = 0
|
||
session_id = str(uuid.uuid4())
|
||
|
||
while attempt <= max_retries:
|
||
missing = find_missing(client, keys, session_id)
|
||
if not missing:
|
||
return True
|
||
|
||
attempt += 1
|
||
print(f"Retry {attempt}, missing: {missing}")
|
||
|
||
to_retry = [doc for doc in docs if doc[key_field] in missing]
|
||
|
||
batch = IndexDocumentsBatch()
|
||
actions = [batch.add_merge_or_upload_actions([doc]) for doc in to_retry]
|
||
|
||
client.index_documents(batch)
|
||
|
||
time.sleep(delay_seconds)
|
||
|
||
# Final check
|
||
missing = find_missing(client, keys, session_id)
|
||
if missing:
|
||
raise Exception(f"Index verification failed, the following documents were not indexed: {missing}")
|
||
return True
|
||
|
||
|
||
def find_missing(client: SearchClient, keys: list[str], session_id: str) -> list[str]:
|
||
missing: list[str] = []
|
||
for key in keys:
|
||
try:
|
||
results = client.search(filter=f"session_id eq '{key}'", top=1)
|
||
if not any(results):
|
||
missing.append(key)
|
||
except HttpResponseError:
|
||
missing.append(key)
|
||
return missing
|