""" Azure AI index search service Provides operations for Azure AI Search Index, including creating indexes, uploading documents, checking if an index exists, etc. """ import base64 import json import logging import os import time import uuid from dataclasses import fields from typing import List, Dict, Any, Optional from tqdm import tqdm import uuid6 from azure.core.credentials import AzureKeyCredential from azure.core.exceptions import HttpResponseError from azure.search.documents import SearchClient, IndexDocumentsBatch from azure.search.documents._generated.models import IndexingResult from azure.search.documents.indexes.models import SearchIndex, SimpleField # type: ignore from azure.search.documents.indexes import SearchIndexClient from resilient_http_pool import get_cloud_api_client from entity_models import Document from utils import asdict_with_dynamic, write_log, write_grouped_index_files from di_extractor import retry_get_embedding SUPPORTED_LANGUAGE_CODES = { "ar": "Arabic", "hy": "Armenian", "eu": "Basque", "bg": "Bulgarian", "ca": "Catalan", "zh-Hans": "Chinese Simplified", "zh-Hant": "Chinese Traditional", "cs": "Czech", "da": "Danish", "nl": "Dutch", "en": "English", "fi": "Finnish", "fr": "French", "gl": "Galician", "de": "German", "el": "Greek", "hi": "Hindi", "hu": "Hungarian", "id": "Indonesian (Bahasa)", "ga": "Irish", "it": "Italian", "ja": "Japanese", "ko": "Korean", "lv": "Latvian", "no": "Norwegian", "fa": "Persian", "pl": "Polish", "pt-Br": "Portuguese (Brazil)", "pt-Pt": "Portuguese (Portugal)", "ro": "Romanian", "ru": "Russian", "es": "Spanish", "sv": "Swedish", "th": "Thai", "tr": "Turkish" } def index_init(data_config: dict[str, Any] , search_admin_key:str, search_service_name:str) -> None: index_schemas: dict[str, Any] = data_config.get("index_schemas") if data_config else None # type: ignore admin_key = search_admin_key if search_admin_key else None service_name = search_service_name for schema_name in index_schemas: language = data_config.get("language", None) if language and language not in SUPPORTED_LANGUAGE_CODES: raise Exception(f"ERROR: Ingestion does not support {language} documents. " f"Please use one of {SUPPORTED_LANGUAGE_CODES}." f"Language is set as two letter code for e.g. 'en' for English." f"If you donot want to set a language just remove this prompt config or set as None") # Basic index structure initialization create_or_update_search_index(service_name=service_name, index_name=schema_name["index_name"], semantic_config_name=schema_name["semantic_config_name"], vector_config_name=schema_name["vector_config_name"], language=language,admin_key=admin_key, meta_fields = schema_name["fields"]) def create_or_update_search_index(service_name: str|None, index_name: str|None, semantic_config_name: str = "default", vector_config_name: str = "", language:str="", admin_key: str = "", meta_fields: list[str]|None = None): url = f"{service_name}/indexes/{index_name}?api-version=2024-11-01-Preview" headers: dict[str, str] = {"Content-Type": "application/json", "api-key": admin_key} body: dict[str, Any] = { "fields": [ {"name":"session_id","type":"Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": True}, {"name": "id","type": "Edm.String","searchable": True,"key": True,}, {"name": "content","type": "Edm.String","searchable": True,"sortable": False,"facetable": False,"filterable": False,"analyzer": f"{language}.lucene" if language else None,}, {"name": "title","type": "Edm.String","searchable": True,"sortable": True,"facetable": False,"filterable": False,"analyzer": f"{language}.lucene" if language else None,}, {"name": "filepath","type": "Edm.String", "searchable": True,"sortable": True,"facetable": False,"filterable": True}, {"name": "url","type": "Edm.String","searchable": True,"sortable": True,"filterable": True}, { "name": "metadata", "type": "Edm.String", "searchable": True, "filterable": True }, { "name": "image_mapping", "type": "Edm.String", "searchable": False, "sortable": False, "facetable": False, "filterable": True }, { "name": "doc_metadata", "type": "Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": False }, { "name": "document_schema", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "main_title", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "sub_title", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "publisher", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "document_code", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "document_category", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "main_title_sec_language", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "sub_title_sec_language", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "primary_language", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "secondary_language", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "full_headers", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "h1", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "h2", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "h3", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "h4", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "h5", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "h6", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "timestamp", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": True, "filterable": True }, { "name": "publish_date", "type": "Edm.String", "searchable": True, "sortable": True, "facetable": False, "filterable": True }, { "name": "description", "type": "Edm.String", "searchable": True, "sortable": False, "facetable": False, "filterable": True } ], "suggesters": [], "scoringProfiles": [], "semantic": { "configurations": [ { "name": semantic_config_name, "prioritizedFields": { "titleField": {"fieldName": "title"}, "prioritizedContentFields": [{"fieldName": "content"}], "prioritizedKeywordsFields": [{"fieldName": "full_headers"}, {"fieldName": "doc_metadata"}], }, } ] }, } if vector_config_name: body["fields"].append({ "name": "contentVector", "type": "Collection(Edm.Single)", "searchable": True, "retrievable": True, "stored": True, "dimensions": int(os.getenv("VECTOR_DIMENSION", "1536")), "vectorSearchProfile": vector_config_name }) body["fields"].append({ "name": "full_metadata_vector", "type": "Collection(Edm.Single)", "searchable": True, "retrievable": True, "stored": True, "dimensions": int(os.getenv("VECTOR_DIMENSION", "1536")), "vectorSearchProfile": vector_config_name }) body["vectorSearch"] = { "algorithms": [ { "name": "my-hnsw-config-1", "kind": "hnsw", "hnswParameters": { "m": 4, "efConstruction": 400, "efSearch": 500, "metric": "cosine" } } ], "profiles": [ { "name": "vectorSearchProfile", "algorithm": "my-hnsw-config-1", # "vectorizer": "azure_vectorizer" } ], } if os.getenv("AOAI_EMBEDDING_ENDPOINT"): body["vectorSearch"]["profiles"][0]["vectorizer"] = "azure_vectorizer" body["vectorSearch"]["vectorizers"] = [ { "name": "azure_vectorizer", "kind": "azureOpenAI", "azureOpenAIParameters": { "resourceUri": os.getenv("AOAI_EMBEDDING_ENDPOINT"), "deploymentId": os.getenv("AOAI_EMBEDDING_DEPLOYMENT"), "apiKey": os.getenv("AOAI_EMBEDDING_KEY"), "modelName": os.getenv("AOAI_EMBEDDING_MODEL") } } ] for field in meta_fields if meta_fields is not None else []: if not any(str(item["name"]) == field for item in body['fields']): sortable:bool = True facetable:bool = True filterable:bool = True if field in ["x_Standard_Range"]: sortable = False facetable = False filterable = False body["fields"].append({ "name": field, "type": "Edm.String", "searchable": True, "sortable": sortable, "facetable": facetable, "filterable": filterable }) client = get_cloud_api_client() response = client.put(url, json=body, headers=headers) if response.status_code == 201: print(f"Created search index {index_name}") elif response.status_code == 204: print(f"Updated existing search index {index_name}") else: raise Exception(f"Failed to create search index. Status Code:{response.status_code}, Error: {response.text}") return True def upload_documents_to_index(service_name:str, index_name:str, docs, upload_batch_size:int=50, admin_key:str|None=None): if admin_key is None: raise ValueError("credential and admin_key cannot be None") to_upload_dicts = [] for d in docs: # Get dynamically added attributes if type(d) is not dict: d = asdict_with_dynamic(d) # add id to documents d.update({"@search.action": "upload", "id": d["id"]}) if "contentVector" in d and d["contentVector"] is None: del d["contentVector"] if "full_metadata_vector" in d and d["full_metadata_vector"] is None: del d["full_metadata_vector"] to_upload_dicts.append(d) # endpoint = "https://{}.search.windows.net/".format(service_name) endpoint: str = service_name search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(admin_key)) # Upload the documents in batches of upload_batch_size for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc="Indexing Chunks..."): batch = to_upload_dicts[i: i + upload_batch_size] results = search_client.upload_documents(documents=batch) num_failures = 0 errors = set() for result in results: if not result.succeeded: print(f"Indexing Failed for {result.key} with ERROR: {result.error_message}") num_failures += 1 errors.add(result.error_message) if num_failures > 0: raise Exception(f"INDEXING FAILED for {num_failures} documents. Please recreate the index." f"To Debug: PLEASE CHECK chunk_size and upload_batch_size. \n Error Messages: {list(errors)}") def upload_merge_index(index_config: Any, docs:list[dict[str,Any]],merge_fields:list[dict[str,Any]]|None=None,current_tmp_directory:str='') -> bool: """ Merge chunk information and upload to AI search index """ index_name: str = index_config["index_name"] embedding_endpoint: str = os.environ.get("embedding_model_endpoint", '') embedding_model_key: str = os.environ.get("embedding_model_key", '') #config.embedding_model_key fields_meta: Any = index_config["fields"] or [] merge_content_fields: Any = index_config[ "merge_content_fields"] if "merge_content_fields" in index_config.keys() else [] key_fields: Any = index_config["key_fields"] if "key_fields" in index_config.keys() else [] all_fields = list(dict.fromkeys(["id"] + fields_meta + merge_content_fields + key_fields + [f.name for f in fields(Document)] )) upload_batch_size = index_config["upload_batch_size"] if "upload_batch_size" in index_config.keys() else 1 original_to_upload_dicts: list[Any] = [] for d in docs: # Get dynamically added attributes if type(d) is not dict: d = asdict_with_dynamic(d) for key in list(d.keys()): if key not in all_fields: del d[key] if ("contentVector" in d) and (d["contentVector"] is None or "contentVector" not in all_fields): del d["contentVector"] if ("full_metadata_vector" in d) and ( d["full_metadata_vector"] is None or "full_metadata_vector" not in all_fields): del d["full_metadata_vector"] # Default id primary key assignment, key_fields content merge and base64 id_value = d["id"] if "id" in d else "" if "key_fields" in index_config.keys(): id_value = '_'.join(str(d[k]) for k in key_fields if k in d) if id_value is None or id_value == "": continue # Select certain fields, concatenate to another field for merge_field in merge_fields: d[merge_field["key"]] = json.dumps( {field: d[field] for field in merge_field["fields"] if field in d and (value := d[field]) is not None and value != ""}, ensure_ascii=False) d["id"] = base64.urlsafe_b64encode(id_value.encode('utf-8')).decode('utf-8') \ # add id to documents d.update({"@search.action": "upload", "id": d["id"]}) d.update({"session_id":str(uuid6.uuid7())}) original_to_upload_dicts.append(d) to_upload_dicts = original_to_upload_dicts current_object_key = to_upload_dicts[0]["filepath"] if len(to_upload_dicts) > 0 and "filepath" in to_upload_dicts[0] else '' # Calculate vector data based on configuration fields for vector_config in index_config["vector_fields"] if "vector_fields" in index_config.keys() else []: for i in tqdm(range(0, len(to_upload_dicts), 1), desc=f"{current_object_key} vector {vector_config["field"]} embedding..."): d = to_upload_dicts[i: i + 1][0] vector_dict = {} for field in vector_config["append_fields"]: if isinstance(d[field], dict): vector_dict |= d[field] elif isinstance(d[field], str): vector_dict[field] = d[field] vector_str = str(vector_dict) if vector_dict else "" embedding = retry_get_embedding(text=vector_str, embedding_model_key=embedding_model_key, embedding_endpoint=embedding_endpoint) if embedding: d[vector_config["field"]] = retry_get_embedding(text=vector_str, embedding_model_key=embedding_model_key, embedding_endpoint=embedding_endpoint) # 根据to_upload_dicts种的filepath字段分组,写入到.index目录下对应的json文件 write_grouped_index_files(to_upload_dicts, index_name=index_name, base_directory=current_tmp_directory) results: list[bool] = [] # Upload the documents in batches of upload_batch_size for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc=f"Indexing {index_name} Chunks..."): batch = to_upload_dicts[i: i + upload_batch_size] results.append(upload_and_ensure(index_name=index_name, docs=batch, key_field="session_id")) return all(results) def merge_dicts(data_list, key_fields, merge_fields, separator='\n'): """ Merge dictionary list based on specified fields Arguments: data_list -- Original dictionary list key_fields -- Fields used for deduplication (e.g., ['title', 'filepath']) merge_fields -- Fields to be merged (e.g., ['content']) separator -- Separator used for merging fields (default is newline) Returns: New dictionary list after merging """ merged_dict = {} for item in data_list: # Create a unique key - a tuple of all key fields key = tuple(item.get(field) for field in key_fields) if key in merged_dict: # Merge fields existing = merged_dict[key] for field in merge_fields: # Merge new value with old value existing[field] = separator.join([ existing.get(field, ''), item.get(field, '') ]).strip(separator) else: # Create new record merged_dict[key] = { **item, # Copy original fields # Pre-initialize merged fields **{field: item.get(field, '') for field in merge_fields} } return list(merged_dict.values()) def validate_index(service_name:str, index_name:str, admin_key:str=None): api_version = "2024-11-01-Preview" headers = {"Content-Type": "application/json", "api-key": admin_key} params = {"api-version": api_version} url = f"{service_name}/indexes/{index_name}/stats" client = get_cloud_api_client() for retry_count in range(5): response = client.get(url, headers=headers, params=params) if response.status_code == 200: response_data = response.json() num_chunks = response_data['documentCount'] if num_chunks == 0 and retry_count < 10: print("Index is empty. Waiting 20 seconds to check again...") time.sleep(20) elif num_chunks == 0 and retry_count == 10: print("Index is empty. Please investigate and re-index.") else: print(f"The index contains {num_chunks} chunks.") average_chunk_size = response_data['storageSize'] / num_chunks print(f"The average chunk size of the index is {average_chunk_size} bytes.") break else: if response.status_code == 404: print("The index does not seem to exist. Please make sure the index was created correctly, and that you are using the correct service and index names") elif response.status_code == 403: print("Authentication Failure: Make sure you are using the correct key") else: print(f"Request failed. Please investigate. Status code: {response.status_code}") break def index_exists(index_name: str) -> bool: try: search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") endpoint = search_service_name credential = AzureKeyCredential(search_admin_key) index_client = SearchIndexClient(endpoint=endpoint, credential=credential) index_client.get_index(index_name) return True except Exception as e: write_log(f"Index '{index_name}' does not exist: {e}") return False def create_index(index_name:str, index_fields: list[dict[str, Any]], suggesters: Optional[list[dict[str, Any]]] = None) -> None: search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") endpoint = search_service_name credential = AzureKeyCredential(search_admin_key) index_client = SearchIndexClient(endpoint=endpoint, credential=credential) if index_exists(index_name=index_name): write_log(f"Index '{index_name}' already exists.") return search_fields = [SimpleField(**field) for field in index_fields] index = SearchIndex(name=index_name, fields=search_fields, suggesters=suggesters or []) index_client.create_index(index) write_log(f"Index '{index_name}' created.") def upload_documents(index_name:str, documents: List[Dict[str, Any]]) -> None: search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") endpoint = search_service_name credential = AzureKeyCredential(search_admin_key) search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential) batch = IndexDocumentsBatch() batch.add_merge_or_upload_actions(documents) #type: ignore results = search_client.index_documents(batch) write_log(f"Uploaded {len(documents)} documents to index '{index_name}'. Result: {results}") def delete_index(index_name:str) -> None: search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") endpoint = search_service_name credential = AzureKeyCredential(search_admin_key) index_client = SearchIndexClient(endpoint=endpoint, credential=credential) if index_exists(index_name=index_name): index_client.delete_index(index_name) write_log(f"Index '{index_name}' deleted.") else: write_log(f"Index '{index_name}' does not exist.") def search(index_name, search_text: str, **kwargs) -> Any: endpoint = os.getenv("search_service_name","") credential = AzureKeyCredential(os.getenv("search_admin_key","")) index_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential) return index_client.search(search_text, **kwargs) def documents_with_field_value_exist(index_name:str, field_name: str, value: Any) -> bool: """ Check if there are documents in the index where a specific field equals the given value. """ endpoint = os.getenv("search_service_name", "") credential = AzureKeyCredential(os.getenv("search_admin_key", "")) index_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential) filter_query = f"{field_name} eq '{value}'" if isinstance(value, str) else f"{field_name} eq {value}" results: Any = index_client.search("*", filter=filter_query, top=1) for _ in results: return True return False def delete_documents_by_field(index_name:str,field_name: str, value: Any) -> bool: """ Delete all documents where the specified field equals the given value. """ search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") search_client = SearchClient(endpoint=search_service_name, index_name=index_name, credential=AzureKeyCredential(search_admin_key)) # Step 1: Retrieve documents that meet the criteria (here looking for documents with status field as "inactive") query = f"{field_name} eq '{value}'" results: Any = search_client.search(select=["id"], filter=query) if not results: return True # Step 2: Extract the primary keys (id) of the documents to be deleted keys_to_delete = [doc['id'] for doc in results] # Step 3: Delete the documents that meet the criteria if keys_to_delete: # Use batch delete API to remove documents delete_results:list[IndexingResult] = search_client.delete_documents(documents=[{'id': key} for key in keys_to_delete])#type: ignore logging.getLogger().info(f"Deleted documents with keys: {keys_to_delete}") return all(result.succeeded for result in delete_results) else: return False def query_by_field( index_name: str, field_name: str, value: Any, top: int = 99999) -> list[dict[Any,Any]]: """ Query documents in the index where a specific field equals the given value. :param field_name: The field to filter on. :param value: The value to match. :param top: Maximum number of results to return. :return: List of matching documents. """ search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") search_client = SearchClient(endpoint = search_service_name, index_name=index_name,credential=AzureKeyCredential(search_admin_key)) filter_query = f"{field_name} eq '{value}'" if isinstance(value, str) else f"{field_name} eq {value}" results:Any = search_client.search("*", filter=filter_query, top=top) return [doc for doc in results] def upload_and_ensure(index_name:str, docs: list[dict[Any, Any]], key_field="session_id", delay_seconds:int=5, max_retries:int=5) -> bool: search_service_name = os.getenv("search_service_name", "") search_admin_key = os.getenv("search_admin_key", "") endpoint = search_service_name api_key = search_admin_key client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(api_key)) # Step 1: Batch submit MergeOrUpload batch = IndexDocumentsBatch() batch.add_merge_or_upload_actions(docs) # type: ignore results = client.index_documents(batch) # Step 2: Check status of each document failed = [r.key for r in results if not r.succeeded] if failed: raise Exception(f"Initial submission failed for documents: {failed}") return True # # Step 3: Delay waiting for background index # time.sleep(delay_seconds) # # Step 4: Verify and retry # keys: list[str] = [doc[key_field] for doc in docs] # return verify_and_retry(client, keys, docs, key_field, delay_seconds, max_retries) def verify_and_retry(client: SearchClient, keys: list[str], docs, key_field, delay_seconds, max_retries) -> bool: attempt = 0 session_id = str(uuid.uuid4()) while attempt <= max_retries: missing = find_missing(client, keys, session_id) if not missing: return True attempt += 1 print(f"Retry {attempt}, missing: {missing}") to_retry = [doc for doc in docs if doc[key_field] in missing] batch = IndexDocumentsBatch() actions = [batch.add_merge_or_upload_actions([doc]) for doc in to_retry] client.index_documents(batch) time.sleep(delay_seconds) # Final check missing = find_missing(client, keys, session_id) if missing: raise Exception(f"Index verification failed, the following documents were not indexed: {missing}") return True def find_missing(client: SearchClient, keys: list[str], session_id: str) -> list[str]: missing: list[str] = [] for key in keys: try: results = client.search(filter=f"session_id eq '{key}'", top=1) if not any(results): missing.append(key) except HttpResponseError: missing.append(key) return missing