diff --git a/.gitignore b/.gitignore
index 42a5b09..032e7dd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -164,3 +164,4 @@ cython_debug/
# VSCode
.vscode
.azure
+test_output/
\ No newline at end of file
diff --git a/data/face/enrollment_data/Jordan/Family1-Daughter3.jpg b/data/face/enrollment_data/Mary/Family1-Daughter3.jpg
similarity index 100%
rename from data/face/enrollment_data/Jordan/Family1-Daughter3.jpg
rename to data/face/enrollment_data/Mary/Family1-Daughter3.jpg
diff --git a/data/face/enrollment_data/Bill/Family1-Dad3.jpg b/data/face/new_face_image.jpg
similarity index 100%
rename from data/face/enrollment_data/Bill/Family1-Dad3.jpg
rename to data/face/new_face_image.jpg
diff --git a/docs/set_env_for_training_data_and_reference_doc.md b/docs/set_env_for_training_data_and_reference_doc.md
index cb0d584..706c613 100644
--- a/docs/set_env_for_training_data_and_reference_doc.md
+++ b/docs/set_env_for_training_data_and_reference_doc.md
@@ -22,14 +22,14 @@ The folders [document_training](../data/document_training/) and [field_extractio
- Note: **Write** permission is required for uploading, modifying, or appending blobs.
- Click the **Create** button.
- - **Copy the SAS URL:** After creating the SAS, click **Copy** to get the URL with the token. This URL will be used as the value for either **TRAINING_DATA_SAS_URL** or **REFERENCE_DOC_SAS_URL** when running the sample code.
+ - **Copy the SAS URL:** After creating the SAS, click **Copy** to get the URL with the token. This URL will be used as the value for either **training_data_sas_url** or **REFERENCE_DOC_SAS_URL** when running the sample code.
- Set the following variables in the [.env](../notebooks/.env) file:
- > **Note:** The value for **REFERENCE_DOC_SAS_URL** can be the same as **TRAINING_DATA_SAS_URL** to reuse the same blob container.
- - For [analyzer_training](../notebooks/analyzer_training.ipynb): Add the SAS URL as the value of **TRAINING_DATA_SAS_URL**.
+ > **Note:** The value for **REFERENCE_DOC_SAS_URL** can be the same as **training_data_sas_url** to reuse the same blob container.
+ - For [analyzer_training](../notebooks/analyzer_training.ipynb): Add the SAS URL as the value of **training_data_sas_url**.
```env
- TRAINING_DATA_SAS_URL=
+ training_data_sas_url=
```
- For [field_extraction_pro_mode](../notebooks/field_extraction_pro_mode.ipynb): Add the SAS URL as the value of **REFERENCE_DOC_SAS_URL**.
```env
@@ -53,9 +53,9 @@ The folders [document_training](../data/document_training/) and [field_extractio
5. **Set Folder Prefixes in the `.env` File:**
Depending on the sample you will run, set the required environment variables in the [.env](../notebooks/.env) file.
- - For [analyzer_training](../notebooks/analyzer_training.ipynb): Add a prefix for **TRAINING_DATA_PATH**. You can choose any folder name within the blob container. For example, use `training_files`.
+ - For [analyzer_training](../notebooks/analyzer_training.ipynb): Add a prefix for **training_data_path**. You can choose any folder name within the blob container. For example, use `training_files`.
```env
- TRAINING_DATA_PATH=
+ training_data_path=
```
- For [field_extraction_pro_mode](../notebooks/field_extraction_pro_mode.ipynb): Add a prefix for **REFERENCE_DOC_PATH**. You can choose any folder name within the blob container. For example, use `reference_docs`.
```env
diff --git a/notebooks/.env.sample b/notebooks/.env.sample
index 05c1000..6e23e31 100644
--- a/notebooks/.env.sample
+++ b/notebooks/.env.sample
@@ -1 +1,37 @@
-AZURE_AI_ENDPOINT=
\ No newline at end of file
+# Azure Content Understanding Service Configuration
+# Copy this file to /.env and update with your actual values
+
+# Your Azure Content Understanding service endpoint
+# Example: https://your-resource-name.services.ai.azure.com/
+# If you need help to create one, please see the Prerequisites section in:
+# https://learn.microsoft.com/en-us/azure/ai-services/content-understanding/quickstart/use-rest-api?tabs=document#prerequisites
+# As of 2025/05, 2025-05-01-preview is only available in the regions documented in
+# Content Understanding region and language support (https://learn.microsoft.com/en-us/azure/ai-services/content-understanding/language-region-support).
+
+# Azure Content Understanding Test Configuration
+
+# Required for Content Understanding SDK and testing
+AZURE_CONTENT_UNDERSTANDING_ENDPOINT=https://your-resource-name.services.ai.azure.com/
+
+# Authentication Options:
+# Option 1: Use Azure Key (FOR TESTING ONLY - Less secure)
+# Set this value if you want to use key-based authentication
+# WARNING: Keys are less secure and should only be used for testing/development
+# Leave empty to use DefaultAzureCredential (recommended)
+AZURE_CONTENT_UNDERSTANDING_KEY=
+
+# Option 2: Use DefaultAzureCredential (RECOMMENDED for production and development)
+# If AZURE_CONTENT_UNDERSTANDING_KEY is empty, the script will use DefaultAzureCredential
+#
+# Most common development scenario:
+# 1. Install Azure CLI: https://docs.microsoft.com/en-us/cli/azure/install-azure-cli
+# 2. Login: az login
+# 3. Run the script (no additional configuration needed)
+#
+# This also supports:
+# - Environment variables (AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
+# - Managed Identity (for Azure-hosted applications)
+# - Visual Studio Code authentication
+# - Azure PowerShell authentication
+# For more info: https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme#defaultazurecredential
+
diff --git a/notebooks/analyzer_training.ipynb b/notebooks/analyzer_training.ipynb
index 64cbc45..a3ad31e 100644
--- a/notebooks/analyzer_training.ipynb
+++ b/notebooks/analyzer_training.ipynb
@@ -23,9 +23,9 @@
"## Prerequisites\n",
"1. Ensure your Azure AI service is configured by following the [configuration steps](../README.md#configure-azure-ai-service-resource).\n",
"2. Set environment variables related to training data by following the steps in [Set env for training data](../docs/set_env_for_training_data_and_reference_doc.md) and adding them to the [.env](./.env) file.\n",
- " - You can either set `TRAINING_DATA_SAS_URL` directly with the SAS URL for your Azure Blob container,\n",
+ " - You can either set `training_data_sas_url` directly with the SAS URL for your Azure Blob container,\n",
" - Or set both `TRAINING_DATA_STORAGE_ACCOUNT_NAME` and `TRAINING_DATA_CONTAINER_NAME` to generate the SAS URL automatically during later steps.\n",
- " - Also set `TRAINING_DATA_PATH` to specify the folder path within the container where the training data will be uploaded.\n",
+ " - Also set `training_data_path` to specify the folder path within the container where the training data will be uploaded.\n",
"3. Install the packages required to run the sample:\n"
]
},
@@ -57,7 +57,6 @@
"metadata": {},
"outputs": [],
"source": [
- "analyzer_template = \"../analyzer_templates/receipt.json\"\n",
"training_docs_folder = \"../data/document_training\""
]
},
@@ -88,30 +87,45 @@
"import json\n",
"import os\n",
"import sys\n",
- "from pathlib import Path\n",
- "from dotenv import find_dotenv, load_dotenv\n",
- "from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n",
+ "from datetime import datetime\n",
+ "import uuid\n",
+ "from dotenv import load_dotenv\n",
+ "from azure.storage.blob import ContainerSasPermissions\n",
+ "from azure.core.credentials import AzureKeyCredential\n",
+ "from azure.identity import DefaultAzureCredential\n",
+ "from azure.ai.contentunderstanding.aio import ContentUnderstandingClient\n",
+ "from azure.ai.contentunderstanding.models import (\n",
+ " ContentAnalyzer,\n",
+ " FieldSchema,\n",
+ " FieldDefinition,\n",
+ " FieldType,\n",
+ " GenerationMethod,\n",
+ " AnalysisMode,\n",
+ " ProcessingLocation,\n",
+ ")\n",
"\n",
- "# Import utility package from the Python samples root directory\n",
- "parent_dir = Path(Path.cwd()).parent\n",
- "sys.path.append(str(parent_dir))\n",
- "from python.content_understanding_client import AzureContentUnderstandingClient\n",
+ "# Add the parent directory to the Python path to import the sample_helper module\n",
+ "sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'python'))\n",
+ "from extension.document_processor import DocumentProcessor\n",
+ "from extension.sample_helper import extract_operation_id_from_poller, PollerType, save_json_to_file\n",
"\n",
- "load_dotenv(find_dotenv())\n",
+ "load_dotenv()\n",
"logging.basicConfig(level=logging.INFO)\n",
"\n",
- "credential = DefaultAzureCredential()\n",
- "token_provider = get_bearer_token_provider(credential, \"https://cognitiveservices.azure.com/.default\")\n",
- "\n",
- "client = AzureContentUnderstandingClient(\n",
- " endpoint=os.getenv(\"AZURE_AI_ENDPOINT\"),\n",
- " api_version=os.getenv(\"AZURE_AI_API_VERSION\", \"2025-05-01-preview\"),\n",
- " # IMPORTANT: Comment out token_provider if using subscription key\n",
- " token_provider=token_provider,\n",
- " # IMPORTANT: Uncomment this if using subscription key\n",
- " # subscription_key=os.getenv(\"AZURE_AI_API_KEY\"),\n",
- " x_ms_useragent=\"azure-ai-content-understanding-python/analyzer_training\", # This header is used for sample usage telemetry; please comment out this line if you want to opt out.\n",
- ")"
+ "endpoint = os.environ.get(\"AZURE_CONTENT_UNDERSTANDING_ENDPOINT\")\n",
+ "# Return AzureKeyCredential if AZURE_CONTENT_UNDERSTANDING_KEY is set, otherwise DefaultAzureCredential\n",
+ "key = os.getenv(\"AZURE_CONTENT_UNDERSTANDING_KEY\")\n",
+ "credential = AzureKeyCredential(key) if key else DefaultAzureCredential()\n",
+ "# Create the ContentUnderstandingClient\n",
+ "client = ContentUnderstandingClient(endpoint=endpoint, credential=credential)\n",
+ "print(\"β
ContentUnderstandingClient created successfully\")\n",
+ "\n",
+ "try:\n",
+ " processor = DocumentProcessor(client)\n",
+ " print(\"β
DocumentProcessor created successfully\")\n",
+ "except Exception as e:\n",
+ " print(f\"β Failed to create DocumentProcessor: {e}\")\n",
+ " raise"
]
},
{
@@ -120,9 +134,9 @@
"source": [
"## Prepare Labeled Data\n",
"In this step, we will:\n",
- "- Use the environment variables `TRAINING_DATA_PATH` and SAS URL related variables set in the Prerequisites step.\n",
- "- Attempt to get the SAS URL from the environment variable `TRAINING_DATA_SAS_URL`.\n",
- "- If `TRAINING_DATA_SAS_URL` is not set, try generating it automatically using `TRAINING_DATA_STORAGE_ACCOUNT_NAME` and `TRAINING_DATA_CONTAINER_NAME` environment variables.\n",
+ "- Use the environment variables `training_data_path` and SAS URL related variables set in the Prerequisites step.\n",
+ "- Attempt to get the SAS URL from the environment variable `training_data_sas_url`.\n",
+ "- If `training_data_sas_url` is not set, try generating it automatically using `TRAINING_DATA_STORAGE_ACCOUNT_NAME` and `TRAINING_DATA_CONTAINER_NAME` environment variables.\n",
"- Verify that each document file in the local folder has corresponding `.labels.json` and `.result.json` files.\n",
"- Upload these files to the Azure Blob storage container specified by the environment variables."
]
@@ -133,26 +147,27 @@
"metadata": {},
"outputs": [],
"source": [
- "training_data_sas_url = os.getenv(\"TRAINING_DATA_SAS_URL\")\n",
+ "# Load reference storage configuration from environment\n",
+ "training_data_path = os.getenv(\"training_data_path\") or f\"training_data_{uuid.uuid4().hex[:8]}\"\n",
+ "training_data_sas_url = os.getenv(\"training_data_sas_url\")\n",
+ "\n",
+ "if not training_data_path.endswith(\"/\"):\n",
+ " training_data_path += \"/\"\n",
+ "\n",
"if not training_data_sas_url:\n",
" TRAINING_DATA_STORAGE_ACCOUNT_NAME = os.getenv(\"TRAINING_DATA_STORAGE_ACCOUNT_NAME\")\n",
" TRAINING_DATA_CONTAINER_NAME = os.getenv(\"TRAINING_DATA_CONTAINER_NAME\")\n",
- " if not TRAINING_DATA_STORAGE_ACCOUNT_NAME and not training_data_sas_url:\n",
- " raise ValueError(\n",
- " \"Please set either TRAINING_DATA_SAS_URL or both TRAINING_DATA_STORAGE_ACCOUNT_NAME and TRAINING_DATA_CONTAINER_NAME environment variables.\"\n",
+ "\n",
+ " if TRAINING_DATA_STORAGE_ACCOUNT_NAME and TRAINING_DATA_CONTAINER_NAME:\n",
+ " # We require \"Write\" permission to upload, modify, or append blobs\n",
+ " training_data_sas_url = processor.generate_container_sas_url(\n",
+ " account_name=TRAINING_DATA_STORAGE_ACCOUNT_NAME,\n",
+ " container_name=TRAINING_DATA_CONTAINER_NAME,\n",
+ " permissions=ContainerSasPermissions(read=True, write=True, list=True),\n",
+ " expiry_hours=1,\n",
" )\n",
- " from azure.storage.blob import ContainerSasPermissions\n",
- " # Requires \"Write\" (critical for upload/modify/append) along with \"Read\" and \"List\" for viewing/listing blobs.\n",
- " training_data_sas_url = AzureContentUnderstandingClient.generate_temp_container_sas_url(\n",
- " account_name=TRAINING_DATA_STORAGE_ACCOUNT_NAME,\n",
- " container_name=TRAINING_DATA_CONTAINER_NAME,\n",
- " permissions=ContainerSasPermissions(read=True, write=True, list=True),\n",
- " expiry_hours=1,\n",
- " )\n",
- "\n",
- "training_data_path = os.getenv(\"TRAINING_DATA_PATH\")\n",
- "\n",
- "await client.generate_training_data_on_blob(training_docs_folder, training_data_sas_url, training_data_path)"
+ "\n",
+ "await processor.generate_training_data_on_blob(training_docs_folder, training_data_sas_url, training_data_path)"
]
},
{
@@ -162,7 +177,7 @@
"## Create Analyzer with Defined Schema\n",
"Before creating the analyzer, fill in the constant `ANALYZER_ID` with a relevant name for your task. In this example, we generate a unique suffix so that this cell can be run multiple times to create different analyzers.\n",
"\n",
- "We use **training_data_sas_url** and **training_data_path** as set in the [.env](./.env) file and used in the previous step."
+ "We use **TRAINING_DATA_SAS_URL** and **TRAINING_DATA_PATH** as set in the [.env](./.env) file and used in the previous step."
]
},
{
@@ -171,24 +186,78 @@
"metadata": {},
"outputs": [],
"source": [
- "import uuid\n",
- "CUSTOM_ANALYZER_ID = \"train-sample-\" + str(uuid.uuid4())\n",
+ "analyzer_id = f\"analyzer-training-sample-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
+ "\n",
+ "content_analyzer = ContentAnalyzer(\n",
+ " base_analyzer_id=\"prebuilt-documentAnalyzer\",\n",
+ " description=\"Extract useful information from receipt\",\n",
+ " field_schema=FieldSchema(\n",
+ " name=\"receipt schema\",\n",
+ " description=\"Schema for receipt\",\n",
+ " fields={\n",
+ " \"MerchantName\": FieldDefinition(\n",
+ " type=FieldType.STRING,\n",
+ " method=GenerationMethod.EXTRACT,\n",
+ " description=\"\"\n",
+ " ),\n",
+ " \"Items\": FieldDefinition(\n",
+ " type=FieldType.ARRAY,\n",
+ " method=GenerationMethod.GENERATE,\n",
+ " description=\"\",\n",
+ " items_property={\n",
+ " \"type\": \"object\",\n",
+ " \"method\": \"extract\",\n",
+ " \"properties\": {\n",
+ " \"Quantity\": {\n",
+ " \"type\": \"string\",\n",
+ " \"method\": \"extract\",\n",
+ " \"description\": \"\"\n",
+ " },\n",
+ " \"Name\": {\n",
+ " \"type\": \"string\",\n",
+ " \"method\": \"extract\",\n",
+ " \"description\": \"\"\n",
+ " },\n",
+ " \"Price\": {\n",
+ " \"type\": \"string\",\n",
+ " \"method\": \"extract\",\n",
+ " \"description\": \"\"\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ),\n",
+ " \"TotalPrice\": FieldDefinition(\n",
+ " type=FieldType.STRING,\n",
+ " method=GenerationMethod.EXTRACT,\n",
+ " description=\"\"\n",
+ " )\n",
+ " }\n",
+ " ),\n",
+ " mode=AnalysisMode.STANDARD,\n",
+ " processing_location=ProcessingLocation.GEOGRAPHY,\n",
+ " tags={\"demo_type\": \"get_result\"},\n",
+ " training_data={\n",
+ " \"kind\": \"blob\",\n",
+ " \"containerUrl\": training_data_sas_url,\n",
+ " \"prefix\": training_data_path\n",
+ " },\n",
+ ")\n",
+ "print(f\"π§ Creating custom analyzer '{analyzer_id}'...\")\n",
+ "poller = await client.content_analyzers.begin_create_or_replace(\n",
+ " analyzer_id=analyzer_id,\n",
+ " resource=content_analyzer,\n",
+ ")\n",
"\n",
- "response = client.begin_create_analyzer(\n",
- " CUSTOM_ANALYZER_ID,\n",
- " analyzer_template_path=analyzer_template,\n",
- " training_storage_container_sas_url=training_data_sas_url,\n",
- " training_storage_container_path_prefix=training_data_path,\n",
+ "# Extract operation ID from the poller\n",
+ "operation_id = extract_operation_id_from_poller(\n",
+ " poller, PollerType.ANALYZER_CREATION\n",
")\n",
- "result = client.poll_result(response)\n",
- "if result is not None and \"status\" in result and result[\"status\"] == \"Succeeded\":\n",
- " logging.info(f\"Analyzer details for {result['result']['analyzerId']}\")\n",
- " logging.info(json.dumps(result, indent=2))\n",
- "else:\n",
- " logging.warning(\n",
- " \"An issue was encountered when trying to create the analyzer. \"\n",
- " \"Please double-check your deployment and configurations for potential problems.\"\n",
- " )"
+ "print(f\"π Extracted creation operation ID: {operation_id}\")\n",
+ "\n",
+ "# Wait for the analyzer to be created\n",
+ "print(f\"β³ Waiting for analyzer creation to complete...\")\n",
+ "await poller.result()\n",
+ "print(f\"β
Analyzer '{analyzer_id}' created successfully!\")"
]
},
{
@@ -205,10 +274,53 @@
"metadata": {},
"outputs": [],
"source": [
- "response = client.begin_analyze(CUSTOM_ANALYZER_ID, file_location='../data/receipt.png')\n",
- "result_json = client.poll_result(response)\n",
+ "file_path = \"../data/receipt.png\"\n",
+ "print(f\"π Reading document file: {file_path}\")\n",
+ "with open(file_path, \"rb\") as f:\n",
+ " data_content = f.read()\n",
+ "\n",
+ "# Begin document analysis operation\n",
+ "print(f\"π Starting document analysis with analyzer '{analyzer_id}'...\")\n",
+ "analysis_poller = await client.content_analyzers.begin_analyze_binary(\n",
+ " analyzer_id=analyzer_id, \n",
+ " input=data_content,\n",
+ " content_type=\"application/octet-stream\")\n",
+ "\n",
+ "# Wait for analysis completion\n",
+ "print(f\"β³ Waiting for document analysis to complete...\")\n",
+ "analysis_result = await analysis_poller.result()\n",
+ "print(f\"β
Document analysis completed successfully!\")\n",
+ "\n",
+ " # Extract operation ID for get_result\n",
+ "analysis_operation_id = extract_operation_id_from_poller(\n",
+ " analysis_poller, PollerType.ANALYZE_CALL\n",
+ ")\n",
+ "print(f\"π Extracted analysis operation ID: {analysis_operation_id}\")\n",
+ "\n",
+ "# Get the analysis result using the operation ID\n",
+ "print(\n",
+ " f\"π Getting analysis result using operation ID '{analysis_operation_id}'...\"\n",
+ ")\n",
+ "operation_status = await client.content_analyzers.get_result(\n",
+ " operation_id=analysis_operation_id,\n",
+ ")\n",
+ "\n",
+ "print(f\"β
Analysis result retrieved successfully!\")\n",
+ "print(f\" Operation ID: {operation_status.id}\")\n",
+ "print(f\" Status: {operation_status.status}\")\n",
"\n",
- "logging.info(json.dumps(result_json, indent=2))"
+ "# The actual analysis result is in operation_status.result\n",
+ "operation_result = operation_status.result\n",
+ "if operation_result is None:\n",
+ " print(\"β οΈ No analysis result available\")\n",
+ "\n",
+ "print(f\"π Analysis Result: {json.dumps(operation_result.as_dict())}\")\n",
+ "\n",
+ "# Save the analysis result to a file\n",
+ "saved_file_path = save_json_to_file(\n",
+ " result=operation_result.as_dict(),\n",
+ " filename_prefix=\"analyzer_training_get_result\",\n",
+ ")"
]
},
{
@@ -225,13 +337,15 @@
"metadata": {},
"outputs": [],
"source": [
- "client.delete_analyzer(CUSTOM_ANALYZER_ID)"
+ "print(f\"ποΈ Deleting analyzer '{analyzer_id}' (demo cleanup)...\")\n",
+ "await client.content_analyzers.delete(analyzer_id=analyzer_id)\n",
+ "print(f\"β
Analyzer '{analyzer_id}' deleted successfully!\")"
]
}
],
"metadata": {
"kernelspec": {
- "display_name": "Python 3",
+ "display_name": "py312",
"language": "python",
"name": "python3"
},
@@ -245,7 +359,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.11.12"
+ "version": "3.12.3"
}
},
"nbformat": 4,
diff --git a/notebooks/build_person_directory.ipynb b/notebooks/build_person_directory.ipynb
index 4840de4..8df83f2 100644
--- a/notebooks/build_person_directory.ipynb
+++ b/notebooks/build_person_directory.ipynb
@@ -47,31 +47,30 @@
"source": [
"import logging\n",
"import os\n",
+ "import uuid\n",
"import sys\n",
- "from pathlib import Path\n",
- "from dotenv import find_dotenv, load_dotenv\n",
- "from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n",
+ "from dotenv import load_dotenv\n",
+ "from azure.core.credentials import AzureKeyCredential\n",
+ "from azure.identity.aio import DefaultAzureCredential\n",
+ "from azure.ai.contentunderstanding.aio import ContentUnderstandingClient\n",
+ "from azure.ai.contentunderstanding.models import PersonDirectory, FaceSource, PersonDirectoryPerson\n",
+ "sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'python'))\n",
+ "from extension.sample_helper import (\n",
+ " read_image_to_base64\n",
+ ")\n",
"\n",
- "# import utility package from python samples root directory\n",
- "parent_dir = Path.cwd().parent\n",
- "sys.path.append(str(parent_dir))\n",
- "from python.content_understanding_face_client import AzureContentUnderstandingFaceClient\n",
+ "# Add the parent directory to the Python path to import the sample_helper module\n",
+ "sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'python'))\n",
"\n",
- "load_dotenv(find_dotenv())\n",
+ "load_dotenv()\n",
"logging.basicConfig(level=logging.INFO)\n",
"\n",
- "credential = DefaultAzureCredential()\n",
- "token_provider = get_bearer_token_provider(credential, \"https://cognitiveservices.azure.com/.default\")\n",
- "\n",
- "client = AzureContentUnderstandingFaceClient(\n",
- " endpoint=os.getenv(\"AZURE_AI_ENDPOINT\"),\n",
- " api_version=os.getenv(\"AZURE_AI_API_VERSION\", \"2025-05-01-preview\"),\n",
- " # IMPORTANT: Comment out token_provider if using subscription key\n",
- " token_provider=token_provider,\n",
- " # IMPORTANT: Uncomment this if using subscription key\n",
- " # subscription_key=os.getenv(\"AZURE_AI_API_KEY\"),\n",
- " x_ms_useragent=\"azure-ai-content-understanding-python/build_person_directory\", # This header is used for sample usage telemetry, please comment out this line if you want to opt out.\n",
- ")"
+ "endpoint = os.environ.get(\"AZURE_CONTENT_UNDERSTANDING_ENDPOINT\")\n",
+ "# Return AzureKeyCredential if AZURE_CONTENT_UNDERSTANDING_KEY is set, otherwise DefaultAzureCredential\n",
+ "key = os.getenv(\"AZURE_CONTENT_UNDERSTANDING_KEY\")\n",
+ "credential = AzureKeyCredential(key) if key else DefaultAzureCredential()\n",
+ "# Create the ContentUnderstandingClient\n",
+ "client = ContentUnderstandingClient(endpoint=endpoint, credential=credential)"
]
},
{
@@ -89,38 +88,68 @@
"metadata": {},
"outputs": [],
"source": [
- "import os\n",
- "import uuid\n",
"folder_path = \"../data/face/enrollment_data\" # Replace with the path to your folder containing subfolders of images\n",
"\n",
"# Create a person directory\n",
"person_directory_id = f\"person_directory_id_{uuid.uuid4().hex[:8]}\"\n",
- "client.create_person_directory(person_directory_id)\n",
+ "\n",
+ "# Create a person directory first\n",
+ "print(f\"π§ Creating person directory '{person_directory_id}'...\")\n",
+ "\n",
+ "person_directory = PersonDirectory(\n",
+ " description=f\"Sample person directory for delete person demo: {person_directory_id}\",\n",
+ " tags={\"demo_type\": \"delete_person\"},\n",
+ " )\n",
+ "person_directory = await client.person_directories.create(person_directory_id, resource=person_directory)\n",
"logging.info(f\"Created person directory with ID: {person_directory_id}\")\n",
"\n",
+ "# Initialize persons list\n",
+ "persons: list = []\n",
+ "\n",
"# Iterate through all subfolders in the folder_path\n",
"for subfolder_name in os.listdir(folder_path):\n",
" subfolder_path = os.path.join(folder_path, subfolder_name)\n",
" if os.path.isdir(subfolder_path):\n",
" person_name = subfolder_name\n",
" # Add a person for each subfolder\n",
- " person = client.add_person(person_directory_id, tags={\"name\": person_name})\n",
+ " person = await client.person_directories.add_person(person_directory_id, tags={\"name\": person_name})\n",
+ " print(f\"π§ Creating person '{person_name}'...\")\n",
" logging.info(f\"Created person {person_name} with person_id: {person['personId']}\")\n",
" if person:\n",
+ " # Initialize person entry in persons list\n",
+ " person_entry = {\n",
+ " 'personId': person['personId'],\n",
+ " 'name': person_name,\n",
+ " 'faceIds': []\n",
+ " }\n",
+ "\n",
" # Iterate through all images in the subfolder\n",
" for filename in os.listdir(subfolder_path):\n",
" if filename.lower().endswith(('.png', '.jpg', '.jpeg')):\n",
" image_path = os.path.join(subfolder_path, filename)\n",
" # Convert image to base64\n",
- " image_data = AzureContentUnderstandingFaceClient.read_file_to_base64(image_path)\n",
+ " image_data_base64 = read_image_to_base64(image_path)\n",
" # Add a face to the Person Directory and associate it to the added person\n",
- " face = client.add_face(person_directory_id, image_data, person['personId'])\n",
+ " print(f\"π§ Adding face from image '{image_path}' to person '{person_name}'...\")\n",
+ " print(f\"Image Data: \", image_data_base64)\n",
+ " face = await client.person_directories.add_face(\n",
+ " person_directory_id=person_directory_id, \n",
+ " face_source=FaceSource(data=image_data_base64),\n",
+ " person_id=person['personId']\n",
+ " )\n",
" if face:\n",
+ " person_entry['faceIds'].append(face['faceId'])\n",
" logging.info(f\"Added face from {filename} with face_id: {face['faceId']} to person_id: {person['personId']}\")\n",
" else:\n",
" logging.warning(f\"Failed to add face from {filename} to person_id: {person['personId']}\")\n",
"\n",
- "logging.info(\"Done\")"
+ " # Add person entry to persons list\n",
+ " persons.append(person_entry)\n",
+ "\n",
+ "logging.info(\"Done\")\n",
+ "logging.info(f\"Created {len(persons)} persons:\")\n",
+ "for person in persons:\n",
+ " logging.info(f\"Person: {person['name']} (ID: {person['personId']}) with {len(person['faceIds'])} faces\")"
]
},
{
@@ -142,12 +171,15 @@
"test_image_path = \"../data/face/family.jpg\" # Path to the test image\n",
"\n",
"# Detect faces in the test image\n",
- "image_data = AzureContentUnderstandingFaceClient.read_file_to_base64(test_image_path)\n",
- "detected_faces = client.detect_faces(data=image_data)\n",
+ "image_data_base64 = read_image_to_base64(test_image_path)\n",
+ "detected_faces = await client.faces.detect(data=image_data_base64)\n",
"for face in detected_faces['detectedFaces']:\n",
- " identified_persons = client.identify_person(person_directory_id, image_data, face['boundingBox'])\n",
- " if identified_persons.get(\"personCandidates\"):\n",
- " person = identified_persons[\"personCandidates\"][0]\n",
+ " identified_persons = await client.person_directories.identify_person(\n",
+ " person_directory_id=person_directory_id, \n",
+ " face_source=FaceSource(data=image_data_base64), \n",
+ " max_person_candidates=5)\n",
+ " if identified_persons.get(\"person_candidates\"):\n",
+ " person = identified_persons[\"person_candidates\"][0]\n",
" name = person.get(\"tags\", {}).get(\"name\", \"Unknown\")\n",
" logging.info(f\"Detected person: {name} with confidence: {person.get('confidence', 0)} at bounding box: {face['boundingBox']}\")\n",
"\n",
@@ -170,15 +202,20 @@
"metadata": {},
"outputs": [],
"source": [
- "new_face_image_path = \"new_face_image_path\" # The path to the face image you want to add.\n",
- "existing_person_id = \"existing_person_id\" # The unique ID of the person to whom the face should be associated.\n",
+ "person_bill = next(person for person in persons if person['name'] == 'Bill')\n",
+ "new_face_image_path = \"../data/face/new_face_image.jpg\" # The path to the face image you want to add.\n",
+ "existing_person_id = person_bill['personId'] # The unique ID of the person to whom the face should be associated.\n",
"\n",
"# Convert the new face image to base64\n",
- "image_data = AzureContentUnderstandingFaceClient.read_file_to_base64(new_face_image_path)\n",
+ "image_data_base64 = read_image_to_base64(new_face_image_path)\n",
"# Add the new face to the person directory and associate it with the existing person\n",
- "face = client.add_face(person_directory_id, image_data, existing_person_id)\n",
+ "face = await client.person_directories.add_face(\n",
+ " person_directory_id=person_directory_id, \n",
+ " face_source=FaceSource(data=image_data_base64), \n",
+ " person_id=existing_person_id)\n",
"if face:\n",
" logging.info(f\"Added face from {new_face_image_path} with face_id: {face['faceId']} to person_id: {existing_person_id}\")\n",
+ " person_bill['faceIds'].append(face['faceId'])\n",
"else:\n",
" logging.warning(f\"Failed to add face from {new_face_image_path} to person_id: {existing_person_id}\")"
]
@@ -200,11 +237,16 @@
"metadata": {},
"outputs": [],
"source": [
- "existing_person_id = \"existing_person_id\" # The unique ID of the person to whom the face should be associated.\n",
- "existing_face_id_list = [\"existing_face_id_1\", \"existing_face_id_2\"] # The list of face IDs to be associated.\n",
+ "existing_person_id = person_bill['personId'] # The unique ID of the person to whom the face should be associated.\n",
+ "existing_face_id_list: list = [person_bill['faceIds'][0], person_bill['faceIds'][1], person_bill['faceIds'][2]] # The list of face IDs to be associated.\n",
"\n",
"# Associate the existing face IDs with the existing person\n",
- "client.update_person(person_directory_id, existing_person_id, face_ids=existing_face_id_list)"
+ "await client.person_directories.update_person(\n",
+ " person_directory_id=person_directory_id, \n",
+ " person_id=existing_person_id, \n",
+ " resource={\"faceIds\": existing_face_id_list},\n",
+ " content_type=\"application/json\"\n",
+ ")"
]
},
{
@@ -223,18 +265,30 @@
"metadata": {},
"outputs": [],
"source": [
- "existing_face_id = \"existing_face_id\" # The unique ID of the face.\n",
+ "person_mary = next(person for person in persons if person['name'] == 'Mary')\n",
+ "existing_face_id = person_mary['faceIds'][0] # The unique ID of the face.\n",
"\n",
"# Remove the association of the existing face ID from the person\n",
- "client.update_face(person_directory_id, existing_face_id, person_id=\"\") # The person_id is set to \"\" to remove the association\n",
+ "await client.person_directories.update_face(\n",
+ " person_directory_id=person_directory_id, \n",
+ " face_id=existing_face_id,\n",
+ " resource={'personId': None},\n",
+ " content_type=\"application/json\"\n",
+ ")\n",
"logging.info(f\"Removed association of face_id: {existing_face_id} from the existing person_id\")\n",
- "logging.info(client.get_face(person_directory_id, existing_face_id)) # This will return the face information without the person association\n",
+ "logging.info(await client.person_directories.get_face(person_directory_id, existing_face_id)) # This will return the face information without the person association\n",
"\n",
"# Associate the existing face ID with a person\n",
- "existing_person_id = \"existing_person_id\" # The unique ID of the person to be associated with the face.\n",
- "client.update_face(person_directory_id, existing_face_id, person_id=existing_person_id)\n",
+ "person_jordan = next(person for person in persons if person['name'] == 'Jordan')\n",
+ "existing_person_id = person_jordan['personId'] # The unique ID of the person to be associated with the face.\n",
+ "await client.person_directories.update_face(\n",
+ " person_directory_id=person_directory_id, \n",
+ " face_id=existing_face_id, \n",
+ " resource={'personId': existing_person_id},\n",
+ " content_type=\"application/json\"\n",
+ ")\n",
"logging.info(f\"Associated face_id: {existing_face_id} with person_id: {existing_person_id}\")\n",
- "logging.info(client.get_face(person_directory_id, existing_face_id)) # This will return the face information with the new person association"
+ "logging.info(await client.person_directories.get_face(person_directory_id, existing_face_id)) # This will return the face information with the new person association"
]
},
{
@@ -257,25 +311,31 @@
"person_directory_description = \"This is a sample person directory for managing faces.\"\n",
"person_directory_tags = {\"project\": \"face_management\", \"version\": \"1.0\"}\n",
"\n",
- "client.update_person_directory(\n",
- " person_directory_id,\n",
- " description=person_directory_description,\n",
- " tags=person_directory_tags\n",
+ "await client.person_directories.update(\n",
+ " person_directory_id=person_directory_id,\n",
+ " resource=PersonDirectory(\n",
+ " description=person_directory_description,\n",
+ " tags=person_directory_tags\n",
+ " ),\n",
+ " content_type=\"application/json\",\n",
")\n",
"logging.info(f\"Updated Person Directory with description: '{person_directory_description}' and tags: {person_directory_tags}\")\n",
- "logging.info(client.get_person_directory(person_directory_id)) # This will return the updated person directory information\n",
+ "logging.info(await client.person_directories.get(person_directory_id)) # This will return the updated person directory information\n",
"\n",
"# Update the tags for an individual person\n",
- "existing_person_id = \"existing_person_id\" # The unique ID of the person to update.\n",
+ "existing_person_id = person_bill['personId'] # The unique ID of the person to update.\n",
"person_tags = {\"role\": \"tester\", \"department\": \"engineering\", \"name\": \"\"} # This will remove the name tag from the person.\n",
"\n",
- "client.update_person(\n",
- " person_directory_id,\n",
- " existing_person_id,\n",
- " tags=person_tags\n",
+ "await client.person_directories.update_person(\n",
+ " person_directory_id=person_directory_id,\n",
+ " person_id=existing_person_id,\n",
+ " resource=PersonDirectoryPerson(\n",
+ " tags=person_tags\n",
+ " ),\n",
+ " content_type=\"application/json\",\n",
")\n",
"logging.info(f\"Updated person with person_id: {existing_person_id} with tags: {person_tags}\")\n",
- "logging.info(client.get_person(person_directory_id, existing_person_id)) # This will return the updated person information"
+ "logging.info(await client.person_directories.get_person(person_directory_id, existing_person_id)) # This will return the updated person information"
]
},
{
@@ -294,9 +354,9 @@
"metadata": {},
"outputs": [],
"source": [
- "existing_face_id = \"existing_face_id\" # The unique ID of the face to delete.\n",
+ "existing_face_id = person_mary['faceIds'][0] # The unique ID of the face to delete.\n",
"\n",
- "client.delete_face(person_directory_id, existing_face_id)\n",
+ "await client.person_directories.delete_face(person_directory_id, existing_face_id)\n",
"logging.info(f\"Deleted face with face_id: {existing_face_id}\")"
]
},
@@ -317,9 +377,9 @@
"metadata": {},
"outputs": [],
"source": [
- "existing_person_id = \"existing_person_id\" # The unique ID of the person to delete.\n",
+ "existing_person_id = person_mary['personId'] # The unique ID of the person to delete.\n",
"\n",
- "client.delete_person(person_directory_id, existing_person_id)\n",
+ "await client.person_directories.delete_person(person_directory_id, existing_person_id)\n",
"logging.info(f\"Deleted person with person_id: {existing_person_id}\")"
]
},
@@ -340,26 +400,26 @@
"metadata": {},
"outputs": [],
"source": [
- "existing_person_id = \"existing_person_id\" # The unique ID of the person to delete.\n",
+ "existing_person_id = person_bill['personId'] # The unique ID of the person to delete.\n",
"\n",
"# Get the list of face IDs associated with the person\n",
- "response = client.get_person(person_directory_id, existing_person_id)\n",
+ "response = await client.person_directories.get_person(person_directory_id, existing_person_id)\n",
"face_ids = response.get('faceIds', [])\n",
"\n",
"# Delete each face associated with the person\n",
"for face_id in face_ids:\n",
" logging.info(f\"Deleting face with face_id: {face_id} from person_id: {existing_person_id}\")\n",
- " client.delete_face(person_directory_id, face_id)\n",
+ " await client.person_directories.delete_face(person_directory_id, face_id)\n",
"\n",
"# Delete the person after deleting all associated faces\n",
- "client.delete_person(person_directory_id, existing_person_id)\n",
+ "await client.person_directories.delete_person(person_directory_id, existing_person_id)\n",
"logging.info(f\"Deleted person with person_id: {existing_person_id} and all associated faces.\")"
]
}
],
"metadata": {
"kernelspec": {
- "display_name": "Python 3",
+ "display_name": "py312",
"language": "python",
"name": "python3"
},
@@ -373,7 +433,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.11.12"
+ "version": "3.12.3"
}
},
"nbformat": 4,
diff --git a/notebooks/classifier.ipynb b/notebooks/classifier.ipynb
index 2a640a2..338dc2f 100644
--- a/notebooks/classifier.ipynb
+++ b/notebooks/classifier.ipynb
@@ -32,7 +32,18 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 1. Import Required Libraries"
+ "## Create Azure AI Content Understanding Client\n",
+ "\n",
+ "> The [AzureContentUnderstandingClient](../python/content_understanding_client.py) is a utility class that provides functions to interact with the Content Understanding API. Prior to the official release of the Content Understanding SDK, it serves as a lightweight SDK.\n",
+ ">\n",
+ "> Fill in the constants **AZURE_AI_ENDPOINT**, **AZURE_AI_API_VERSION**, and **AZURE_AI_API_KEY** with the details from your Azure AI Service.\n",
+ "\n",
+ "> β οΈ Important:\n",
+ "You must update the code below to use your preferred Azure authentication method.\n",
+ "Look for the `# IMPORTANT` comments in the code and modify those sections accordingly.\n",
+ "Skipping this step may cause the sample to not run correctly.\n",
+ "\n",
+ "> β οΈ Note: While using a subscription key is supported, it is strongly recommended to use a token provider with Azure Active Directory (AAD) for enhanced security in production environments."
]
},
{
@@ -41,29 +52,57 @@
"metadata": {},
"outputs": [],
"source": [
- "import json\n",
+ "%pip install python-dotenv azure-ai-contentunderstanding azure-identity\n",
+ "\n",
"import logging\n",
+ "import json\n",
"import os\n",
"import sys\n",
+ "from datetime import datetime\n",
"import uuid\n",
- "from pathlib import Path\n",
+ "from dotenv import load_dotenv\n",
+ "from azure.core.credentials import AzureKeyCredential\n",
+ "from azure.identity.aio import DefaultAzureCredential\n",
+ "from azure.ai.contentunderstanding.aio import ContentUnderstandingClient\n",
+ "from azure.ai.contentunderstanding.models import (\n",
+ " ContentClassifier,\n",
+ " ContentAnalyzer,\n",
+ " ClassifierCategory,\n",
+ " DocumentContent,\n",
+ " FieldSchema,\n",
+ " FieldDefinition,\n",
+ " FieldType,\n",
+ " ContentAnalyzerConfig,\n",
+ ")\n",
"\n",
- "from dotenv import find_dotenv, load_dotenv\n",
- "from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n",
+ "# Add the parent directory to the Python path to import the sample_helper module\n",
+ "sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'python'))\n",
+ "from extension.sample_helper import extract_operation_id_from_poller, save_json_to_file, PollerType\n",
+ "from typing import Dict, Optional\n",
"\n",
- "load_dotenv(find_dotenv())\n",
+ "load_dotenv()\n",
"logging.basicConfig(level=logging.INFO)\n",
"\n",
- "print(\"β
Libraries imported successfully!\")"
+ "endpoint = os.environ.get(\"AZURE_CONTENT_UNDERSTANDING_ENDPOINT\")\n",
+ "# Return AzureKeyCredential if AZURE_CONTENT_UNDERSTANDING_KEY is set, otherwise DefaultAzureCredential\n",
+ "key = os.getenv(\"AZURE_CONTENT_UNDERSTANDING_KEY\")\n",
+ "credential = AzureKeyCredential(key) if key else DefaultAzureCredential()\n",
+ "# Create the ContentUnderstandingClient\n",
+ "client = ContentUnderstandingClient(endpoint=endpoint, credential=credential)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 2. Import Azure Content Understanding Client\n",
+ "## Create a Basic Classifier\n",
+ "Classify document from URL using begin_classify API.\n",
"\n",
- "The `AzureContentUnderstandingClient` class manages all API interactions with the Azure AI service."
+ "High-level steps:\n",
+ "1. Create a custom classifier\n",
+ "2. Classify a document from a remote URL\n",
+ "3. Save the classification result to a file\n",
+ "4. Clean up the created classifier"
]
},
{
@@ -72,74 +111,68 @@
"metadata": {},
"outputs": [],
"source": [
- "# Add the parent directory to the system path to access shared modules\n",
- "parent_dir = Path(Path.cwd()).parent\n",
- "sys.path.append(str(parent_dir))\n",
- "try:\n",
- " from python.content_understanding_client import AzureContentUnderstandingClient\n",
- " print(\"β
Azure Content Understanding Client imported successfully!\")\n",
- "except ImportError:\n",
- " print(\"β Error: Ensure 'AzureContentUnderstandingClient.py' exists in the same directory as this notebook.\")\n",
- " raise"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## 3. Configure Azure AI Service Settings and Prepare the Sample\n",
+ "# Create a simple ContentClassifier object with default configuration.\n",
"\n",
- "Update the following settings to match your Azure environment:\n",
+ "# Args:\n",
+ "# classifier_id: The classifier ID\n",
+ "# description: Optional description for the classifier\n",
+ "# tags: Optional tags for the classifier\n",
"\n",
- "- **AZURE_AI_ENDPOINT**: Your Azure AI service endpoint URL, or configure it in the \".env\" file\n",
- "- **AZURE_AI_API_VERSION**: Azure AI API version to use. Defaults to \"2025-05-01-preview\"\n",
- "- **AZURE_AI_API_KEY**: Your Azure AI API key (optional if using token-based authentication)\n",
- "- **ANALYZER_SAMPLE_FILE**: Path to the PDF document you want to process"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "# Authentication supports either token-based or subscription key methods; only one is required\n",
- "AZURE_AI_ENDPOINT = os.getenv(\"AZURE_AI_ENDPOINT\")\n",
- "# IMPORTANT: Substitute with your subscription key or configure in \".env\" if not using token auth\n",
- "AZURE_AI_API_KEY = os.getenv(\"AZURE_AI_API_KEY\")\n",
- "AZURE_AI_API_VERSION = os.getenv(\"AZURE_AI_API_VERSION\", \"2025-05-01-preview\")\n",
- "ANALYZER_SAMPLE_FILE = \"../data/mixed_financial_docs.pdf\" # Update this path to your PDF file\n",
- "\n",
- "# Use DefaultAzureCredential for token-based authentication\n",
- "credential = DefaultAzureCredential()\n",
- "token_provider = get_bearer_token_provider(credential, \"https://cognitiveservices.azure.com/.default\")\n",
- "\n",
- "file_location = Path(ANALYZER_SAMPLE_FILE)\n",
- "\n",
- "print(\"π Configuration Summary:\")\n",
- "print(f\" Endpoint: {AZURE_AI_ENDPOINT}\")\n",
- "print(f\" API Version: {AZURE_AI_API_VERSION}\")\n",
- "print(f\" Document: {file_location.name if file_location.exists() else 'β File not found'}\")"
+ "# Returns:\n",
+ "# ContentClassifier: A configured ContentClassifier object\n",
+ "\n",
+ "def create_classifier_schema(description: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> ContentClassifier:\n",
+ " categories = {\n",
+ " \"Loan application\": ClassifierCategory(\n",
+ " description=\"Documents submitted by individuals or businesses to request funding, typically including personal or business details, financial history, loan amount, purpose, and supporting documentation.\"\n",
+ " ),\n",
+ " \"Invoice\": ClassifierCategory(\n",
+ " description=\"Billing documents issued by sellers or service providers to request payment for goods or services, detailing items, prices, taxes, totals, and payment terms.\"\n",
+ " ),\n",
+ " \"Bank_Statement\": ClassifierCategory(\n",
+ " description=\"Official statements issued by banks that summarize account activity over a period, including deposits, withdrawals, fees, and balances.\"\n",
+ " ),\n",
+ " }\n",
+ "\n",
+ " classifier = ContentClassifier(\n",
+ " categories=categories,\n",
+ " split_mode=\"auto\",\n",
+ " description=description,\n",
+ " tags=tags,\n",
+ " )\n",
+ "\n",
+ " return classifier\n",
+ "\n",
+ "# Generate a unique classifier ID\n",
+ "classifier_id = f\"classifier-sample-{datetime.now().strftime('%Y%m%d')}-{datetime.now().strftime('%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
+ "\n",
+ "# Create a custom classifier using object model\n",
+ "print(f\"π§ Creating custom classifier '{classifier_id}'...\")\n",
+ "\n",
+ "classifier_schema: ContentClassifier = create_classifier_schema(\n",
+ " description=f\"Custom classifier for URL classification demo: {classifier_id}\",\n",
+ " tags={\"demo_type\": \"url_classification\"},\n",
+ ")\n",
+ "\n",
+ "# Start the classifier creation operation\n",
+ "poller = await client.content_classifiers.begin_create_or_replace(\n",
+ " classifier_id=classifier_id,\n",
+ " resource=classifier_schema,\n",
+ ")\n",
+ "\n",
+ "# Wait for the classifier to be created\n",
+ "print(f\"β³ Waiting for classifier creation to complete...\")\n",
+ "await poller.result()\n",
+ "print(f\"β
Classifier '{classifier_id}' created successfully!\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 4. Define Classifier Schema\n",
- "\n",
- "The classifier schema defines:\n",
- "- **Categories**: Document types to classify (e.g., Legal, Medical)\n",
- " - **description (Optional)**: Provides additional context or hints for categorizing or splitting documents. Useful when the category name alone is not sufficiently descriptive. Omit if the category name is self-explanatory.\n",
- "- **splitMode Options**: Determines how multi-page documents are split before classification or analysis.\n",
- " - `\"auto\"`: Automatically split based on content. \n",
- " For example, given categories βinvoiceβ and βapplication formβ:\n",
- " - A PDF with one invoice will be classified as a single document.\n",
- " - A PDF containing two invoices and one application form will be automatically split into three classified sections.\n",
- " - `\"none\"`: No splitting. \n",
- " The entire multi-page document is treated as one unit for classification and analysis.\n",
- " - `\"perPage\"`: Split by page. \n",
- " Treats each page as a separate document, useful if custom analyzers designed to operate at the page level."
+ "## Classify Your Document\n",
+ "\n",
+ "Now, use the classifier to categorize your document."
]
},
{
@@ -148,73 +181,67 @@
"metadata": {},
"outputs": [],
"source": [
- "# Define document categories and their descriptions\n",
- "classifier_schema = {\n",
- " \"categories\": {\n",
- " \"Loan application\": { # Both spaces and underscores are supported in category names\n",
- " \"description\": \"Documents submitted by individuals or businesses to request funding, typically including personal or business details, financial history, loan amount, purpose, and supporting documentation.\"\n",
- " },\n",
- " \"Invoice\": {\n",
- " \"description\": \"Billing documents issued by sellers or service providers to request payment for goods or services, detailing items, prices, taxes, totals, and payment terms.\"\n",
- " },\n",
- " \"Bank_Statement\": { # Both spaces and underscores are supported\n",
- " \"description\": \"Official statements issued by banks summarizing account activity over a period, including deposits, withdrawals, fees, and balances.\"\n",
- " },\n",
- " },\n",
- " \"splitMode\": \"auto\" # IMPORTANT: Automatically detect document boundaries; adjust as needed.\n",
- "}\n",
+ "# Read the mixed financial docs PDF file\n",
+ "pdf_path = \"../data/mixed_financial_docs.pdf\"\n",
+ "print(f\"π Reading document file: {pdf_path}\")\n",
+ "with open(pdf_path, \"rb\") as pdf_file:\n",
+ " pdf_content = pdf_file.read()\n",
"\n",
- "print(\"π Classifier Categories:\")\n",
- "for category, details in classifier_schema[\"categories\"].items():\n",
- " print(f\" β’ {category}: {details['description'][:60]}...\")"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## 5. Initialize Content Understanding Client\n",
+ "# Begin binary classification operation\n",
+ "print(f\"π Starting binary classification with classifier '{classifier_id}'...\")\n",
+ "classification_poller = await client.content_classifiers.begin_classify_binary(\n",
+ " classifier_id=classifier_id,\n",
+ " input=pdf_content,\n",
+ " content_type=\"application/pdf\",\n",
+ ")\n",
"\n",
- "Create the client to interact with Azure AI services.\n",
+ "# Wait for classification completion\n",
+ "print(f\"β³ Waiting for classification to complete...\")\n",
+ "classification_result = await classification_poller.result()\n",
+ "print(f\"β
Classification completed successfully!\")\n",
"\n",
- "β οΈ Important:\n",
- "Please update the authentication details below to match your Azure setup.\n",
- "Look for the `# IMPORTANT` comments and modify those sections accordingly.\n",
- "Skipping this step may result in runtime errors.\n",
+ "# Extract operation ID for get_result\n",
+ "classification_operation_id = extract_operation_id_from_poller(\n",
+ " classification_poller, PollerType.CLASSIFY_CALL\n",
+ ")\n",
+ "print(\n",
+ " f\"π Extracted classification operation ID: {classification_operation_id}\"\n",
+ ")\n",
"\n",
- "β οΈ Note: While subscription key authentication works, using Azure Active Directory (AAD) token provider is more secure and recommended for production."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "# Initialize the Azure Content Understanding client\n",
- "try:\n",
- " content_understanding_client = AzureContentUnderstandingClient(\n",
- " endpoint=AZURE_AI_ENDPOINT,\n",
- " api_version=AZURE_AI_API_VERSION,\n",
- " # IMPORTANT: Comment out token_provider if using subscription key\n",
- " token_provider=token_provider,\n",
- " # IMPORTANT: Uncomment this if using subscription key\n",
- " # subscription_key=AZURE_AI_API_KEY,\n",
+ "# Get the classification result using the operation ID\n",
+ "print(\n",
+ " f\"π Getting classification result using operation ID '{classification_operation_id}'...\"\n",
+ ")\n",
+ "operation_status = await client.content_classifiers.get_result(\n",
+ " operation_id=classification_operation_id,\n",
+ ")\n",
+ "\n",
+ "print(f\"β
Classification result retrieved successfully!\")\n",
+ "print(f\" Operation ID: {getattr(operation_status, 'id', 'N/A')}\")\n",
+ "print(f\" Status: {getattr(operation_status, 'status', 'N/A')}\")\n",
+ "\n",
+ "# The actual classification result is in operation_status.result\n",
+ "operation_result = getattr(operation_status, \"result\", None)\n",
+ "if operation_result is not None:\n",
+ " print(\n",
+ " f\" Result contains {len(getattr(operation_result, 'contents', []))} contents\"\n",
" )\n",
- " print(\"β
Content Understanding client initialized successfully!\")\n",
- " print(\" Ready to create classifiers and analyzers.\")\n",
- "except Exception as e:\n",
- " print(f\"β Failed to initialize client: {e}\")\n",
- " raise"
+ "\n",
+ "# Save the classification result to a file\n",
+ "saved_file_path = save_json_to_file(\n",
+ " result=operation_status.as_dict(),\n",
+ " filename_prefix=\"content_classifiers_get_result\",\n",
+ ")\n",
+ "print(f\"πΎ Classification result saved to: {saved_file_path}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 6. Create a Basic Classifier\n",
+ "## View Classification Results\n",
"\n",
- "First, create a simple classifier that categorizes documents without performing additional analysis."
+ "Review the classification results generated for your document."
]
},
{
@@ -223,37 +250,21 @@
"metadata": {},
"outputs": [],
"source": [
- "# Generate a unique classifier ID\n",
- "classifier_id = \"classifier-sample-\" + str(uuid.uuid4())\n",
- "\n",
- "try:\n",
- " # Create the classifier\n",
- " print(f\"π¨ Creating classifier: {classifier_id}\")\n",
- " print(\" This may take a few seconds...\")\n",
- " \n",
- " response = content_understanding_client.begin_create_classifier(classifier_id, classifier_schema)\n",
- " result = content_understanding_client.poll_result(response)\n",
- " \n",
- " print(\"\\nβ
Classifier created successfully!\")\n",
- " print(f\" Status: {result.get('status')}\")\n",
- " print(f\" Resource Location: {result.get('resourceLocation')}\")\n",
- " \n",
- "except Exception as e:\n",
- " print(f\"\\nβ Error creating classifier: {e}\")\n",
- " if \"already exists\" in str(e):\n",
- " print(\"\\nπ‘ Tip: The classifier already exists. You can:\")\n",
- " print(\" 1. Use a different classifier ID\")\n",
- " print(\" 2. Delete the existing classifier first\")\n",
- " print(\" 3. Skip to document classification\")"
+ "# Display classification results\n",
+ "print(f\"π Classification Results:\")\n",
+ "for content in classification_result.contents:\n",
+ " document_content: DocumentContent = content\n",
+ " print(f\" Category: {document_content.category}\")\n",
+ " print(f\" Start Page Number: {document_content.start_page_number}\")\n",
+ " print(f\" End Page Number: {document_content.end_page_number}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 7. Classify Your Document\n",
- "\n",
- "Now, use the classifier to categorize your document."
+ "## Saving Classification Results\n",
+ "The classification result is saved to a JSON file for later analysis."
]
},
{
@@ -262,34 +273,21 @@
"metadata": {},
"outputs": [],
"source": [
- "try:\n",
- " # Verify that the document exists\n",
- " if not file_location.exists():\n",
- " raise FileNotFoundError(f\"Document not found at {file_location}\")\n",
- " \n",
- " # Classify the document\n",
- " print(f\"π Classifying document: {file_location.name}\")\n",
- " print(\"\\nβ³ Processing... This may take several minutes for large documents.\")\n",
- " \n",
- " response = content_understanding_client.begin_classify(classifier_id, file_location=str(file_location))\n",
- " result = content_understanding_client.poll_result(response, timeout_seconds=360)\n",
- " \n",
- " print(\"\\nβ
Classification completed successfully!\")\n",
- " \n",
- "except FileNotFoundError:\n",
- " print(f\"\\nβ Document not found: {file_location}\")\n",
- " print(\" Please update 'file_location' to point to your PDF file.\")\n",
- "except Exception as e:\n",
- " print(f\"\\nβ Error classifying document: {e}\")"
+ "# Save the classification result to a file\n",
+ "\n",
+ "saved_file_path = save_json_to_file(\n",
+ " result=classification_result.as_dict(),\n",
+ " filename_prefix=\"content_classifiers_classify\",\n",
+ ")\n",
+ "print(f\"πΎ Classification result saved to: {saved_file_path}\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 8. View Classification Results\n",
- "\n",
- "Review the classification results generated for your document."
+ "## Clean up the created analyzer \n",
+ "After the demo completes, the classifier is automatically deleted to prevent resource accumulation."
]
},
{
@@ -298,33 +296,17 @@
"metadata": {},
"outputs": [],
"source": [
- "# Display classification results\n",
- "if 'result' in locals() and result:\n",
- " result_data = result.get(\"result\", {})\n",
- " contents = result_data.get(\"contents\", [])\n",
- " \n",
- " print(\"π CLASSIFICATION RESULTS\")\n",
- " print(\"=\" * 50)\n",
- " print(f\"\\nTotal sections found: {len(contents)}\")\n",
- " \n",
- " # Summarize each classified section\n",
- " print(\"\\nπ Document Sections:\")\n",
- " for i, content in enumerate(contents, 1):\n",
- " print(f\"\\n Section {i}:\")\n",
- " print(f\" β’ Category: {content.get('category', 'Unknown')}\")\n",
- " print(f\" β’ Pages: {content.get('startPageNumber', '?')} - {content.get('endPageNumber', '?')}\")\n",
- " \n",
- " print(\"\\nFull result output:\")\n",
- " print(json.dumps(result, indent=2))\n",
- "else:\n",
- " print(\"β No results available. Please run the classification step first.\")"
+ "# Clean up the created classifier (demo cleanup)\n",
+ "print(f\"ποΈ Deleting classifier '{classifier_id}' (demo cleanup)...\")\n",
+ "await client.content_classifiers.delete(classifier_id=classifier_id)\n",
+ "print(f\"β
Classifier '{classifier_id}' deleted successfully!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 9. Create a Custom Analyzer (Advanced)\n",
+ "## Create a Custom Analyzer (Advanced)\n",
"\n",
"Create a custom analyzer to extract specific fields from documents.\n",
"This example extracts common fields from loan application documents and generates document excerpts."
@@ -336,80 +318,74 @@
"metadata": {},
"outputs": [],
"source": [
- "# Define the analyzer schema with custom fields\n",
- "analyzer_schema = {\n",
- " \"description\": \"Loan application analyzer - extracts key information from loan applications\",\n",
- " \"baseAnalyzerId\": \"prebuilt-documentAnalyzer\", # Built on top of the general document analyzer\n",
- " \"config\": {\n",
- " \"returnDetails\": True,\n",
- " \"enableLayout\": True, # Extract layout details\n",
- " \"enableBarcode\": False, # Disable barcode detection\n",
- " \"enableFormula\": False, # Disable formula detection\n",
- " \"estimateFieldSourceAndConfidence\": True, # Enable estimation of field location and confidence\n",
- " \"disableContentFiltering\": False\n",
- " },\n",
- " \"fieldSchema\": {\n",
- " \"fields\": {\n",
- " \"ApplicationDate\": {\n",
- " \"type\": \"date\",\n",
- " \"method\": \"generate\",\n",
- " \"description\": \"The date when the loan application was submitted.\"\n",
- " },\n",
- " \"ApplicantName\": {\n",
- " \"type\": \"string\",\n",
- " \"method\": \"generate\",\n",
- " \"description\": \"Full name of the loan applicant or company.\"\n",
- " },\n",
- " \"LoanAmountRequested\": {\n",
- " \"type\": \"number\",\n",
- " \"method\": \"generate\",\n",
- " \"description\": \"The total loan amount requested by the applicant.\"\n",
- " },\n",
- " \"LoanPurpose\": {\n",
- " \"type\": \"string\",\n",
- " \"method\": \"generate\",\n",
- " \"description\": \"The stated purpose or reason for the loan.\"\n",
- " },\n",
- " \"CreditScore\": {\n",
- " \"type\": \"number\",\n",
- " \"method\": \"generate\",\n",
- " \"description\": \"Credit score of the applicant, if available.\"\n",
- " },\n",
- " \"Summary\": {\n",
- " \"type\": \"string\",\n",
- " \"method\": \"generate\",\n",
- " \"description\": \"A brief summary overview of the loan application details.\"\n",
- " }\n",
+ "import asyncio\n",
+ "\n",
+ "# Define fields schema\n",
+ "custom_analyzer = ContentAnalyzer(\n",
+ " base_analyzer_id=\"prebuilt-documentAnalyzer\", # Built on top of the general document analyzer\n",
+ " description=\"Loan application analyzer - extracts key information from loan applications\",\n",
+ " config=ContentAnalyzerConfig(\n",
+ " return_details=True,\n",
+ " enable_layout=True, # Extract layout details\n",
+ " enable_formula=False, # Disable formula detection\n",
+ " estimate_field_source_and_confidence=True, # Enable estimation of field location and confidence\n",
+ " disable_content_filtering=False\n",
+ " ),\n",
+ " field_schema=FieldSchema(\n",
+ " fields={\n",
+ " \"ApplicationDate\": FieldDefinition(\n",
+ " type=FieldType.DATE,\n",
+ " method=\"generate\",\n",
+ " description=\"The date when the loan application was submitted.\"\n",
+ " ),\n",
+ " \"ApplicantName\": FieldDefinition(\n",
+ " type=FieldType.STRING,\n",
+ " method=\"generate\",\n",
+ " description=\"Full name of the loan applicant or company.\"\n",
+ " ),\n",
+ " \"LoanAmountRequested\": FieldDefinition(\n",
+ " type=FieldType.NUMBER,\n",
+ " method=\"generate\",\n",
+ " description=\"The total loan amount requested by the applicant.\"\n",
+ " ),\n",
+ " \"LoanPurpose\": FieldDefinition(\n",
+ " type=FieldType.STRING,\n",
+ " method=\"generate\",\n",
+ " description=\"The stated purpose or reason for the loan.\"\n",
+ " ),\n",
+ " \"CreditScore\": FieldDefinition(\n",
+ " type=FieldType.NUMBER,\n",
+ " method=\"generate\",\n",
+ " description=\"Credit score of the applicant, if available.\"\n",
+ " ),\n",
+ " \"Summary\": FieldDefinition(\n",
+ " type=FieldType.STRING,\n",
+ " method=\"generate\",\n",
+ " description=\"A brief summary overview of the loan application details.\"\n",
+ " )\n",
" }\n",
- " }\n",
- "}\n",
+ " ),\n",
+ " tags={\"demo\": \"loan-application\"}\n",
+ ")\n",
"\n",
"# Generate a unique analyzer ID\n",
- "analyzer_id = \"analyzer-loan-application-\" + str(uuid.uuid4())\n",
+ "analyzer_id = f\"classifier-sample-{datetime.now().strftime('%Y%m%d')}-{datetime.now().strftime('%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
"\n",
"# Create the custom analyzer\n",
- "try:\n",
- " print(f\"π¨ Creating custom analyzer: {analyzer_id}\")\n",
- " print(\"\\nπ The analyzer will extract the following fields:\")\n",
- " for field_name, field_info in analyzer_schema[\"fieldSchema\"][\"fields\"].items():\n",
- " print(f\" β’ {field_name}: {field_info['description']}\")\n",
- " \n",
- " response = content_understanding_client.begin_create_analyzer(analyzer_id, analyzer_schema)\n",
- " result = content_understanding_client.poll_result(response)\n",
- " \n",
- " print(\"\\nβ
Analyzer created successfully!\")\n",
- " print(f\" Analyzer ID: {analyzer_id}\")\n",
- " \n",
- "except Exception as e:\n",
- " print(f\"\\nβ Error creating analyzer: {e}\")\n",
- " analyzer_id = None # Set to None if creation failed"
+ "print(f\"π§ Creating custom analyzer '{analyzer_id}'...\")\n",
+ "poller = await client.content_analyzers.begin_create_or_replace(\n",
+ " analyzer_id=analyzer_id,\n",
+ " resource=custom_analyzer,\n",
+ ")\n",
+ "result = await poller.result()\n",
+ "print(f\"β
Analyzer '{analyzer_id}' created successfully!\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 10. Create an Enhanced Classifier with Custom Analyzer\n",
+ "## Create an Enhanced Classifier with Custom Analyzer\n",
"\n",
"Now create a new classifier that uses the prebuilt invoice analyzer for invoices and the custom analyzer for loan application documents.\n",
"This combines document classification with field extraction in one operation."
@@ -421,12 +397,8 @@
"metadata": {},
"outputs": [],
"source": [
- "# Generate a unique enhanced classifier ID\n",
- "enhanced_classifier_id = \"classifier-enhanced-\" + str(uuid.uuid4())\n",
- "\n",
- "# Define the enhanced classifier schema\n",
- "enhanced_classifier_schema = {\n",
- " \"categories\": {\n",
+ "def create_enhanced_classifier_schema(analyzer_id: str, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> ContentClassifier:\n",
+ " categories = {\n",
" \"Loan application\": { # Both spaces and underscores allowed\n",
" \"description\": \"Documents submitted by individuals or businesses requesting funding, including personal/business details, financial history, and supporting documents.\",\n",
" \"analyzerId\": analyzer_id # IMPORTANT: Use the custom analyzer created previously for loan applications\n",
@@ -439,35 +411,45 @@
" \"description\": \"Official bank statements summarizing account activity over a period, including deposits, withdrawals, fees, and balances.\"\n",
" # No analyzer specified - uses default processing\n",
" }\n",
- " },\n",
- " \"splitMode\": \"auto\"\n",
- "}\n",
+ " }\n",
+ "\n",
+ " classifier = ContentClassifier(\n",
+ " categories=categories,\n",
+ " split_mode=\"auto\",\n",
+ " description=description,\n",
+ " tags=tags,\n",
+ " )\n",
+ "\n",
+ " return classifier\n",
+ "\n",
+ "# Generate a unique enhanced classifier ID\n",
+ "classifier_id = f\"enhanced-classifier-sample-{datetime.now().strftime('%Y%m%d')}-{datetime.now().strftime('%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
+ "\n",
+ "# Create the enhanced classifier schema\n",
+ "enhanced_classifier_schema = create_enhanced_classifier_schema(\n",
+ " analyzer_id=analyzer_id,\n",
+ " description=f\"Custom classifier for URL classification demo: {classifier_id}\",\n",
+ " tags={\"demo_type\": \"url_classification\"}\n",
+ ")\n",
"\n",
"# Create the enhanced classifier only if the custom analyzer was created successfully\n",
"if analyzer_id:\n",
- " try:\n",
- " print(f\"π¨ Creating enhanced classifier: {enhanced_classifier_id}\")\n",
- " print(\"\\nπ Configuration:\")\n",
- " print(\" β’ Loan application documents β Custom analyzer with field extraction\")\n",
- " print(\" β’ Invoice documents β Prebuilt invoice analyzer\")\n",
- " print(\" β’ Bank_Statement documents β Standard processing\")\n",
- " \n",
- " response = content_understanding_client.begin_create_classifier(enhanced_classifier_id, enhanced_classifier_schema)\n",
- " result = content_understanding_client.poll_result(response)\n",
- " \n",
- " print(\"\\nβ
Enhanced classifier created successfully!\")\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"\\nβ Error creating enhanced classifier: {e}\")\n",
- "else:\n",
- " print(\"β οΈ Skipping enhanced classifier creation - custom analyzer was not created successfully.\")"
+ " poller = await client.content_classifiers.begin_create_or_replace(\n",
+ " classifier_id=classifier_id,\n",
+ " resource=enhanced_classifier_schema\n",
+ " )\n",
+ "\n",
+ " # Wait for the classifier to be created\n",
+ " print(f\"β³ Waiting for classifier creation to complete...\")\n",
+ " await poller.result()\n",
+ " print(f\"β
Classifier '{classifier_id}' created successfully!\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 11. Process Document with Enhanced Classifier\n",
+ "## Process Document with Enhanced Classifier\n",
"\n",
"Process the document again using the enhanced classifier.\n",
"Invoices and loan applications will now have additional fields extracted."
@@ -479,24 +461,24 @@
"metadata": {},
"outputs": [],
"source": [
- "if 'enhanced_classifier_id' in locals() and analyzer_id:\n",
- " try:\n",
- " # Verify the document exists\n",
- " if not file_location.exists():\n",
- " raise FileNotFoundError(f\"Document not found at {file_location}\")\n",
- " \n",
- " # Process document with enhanced classifier\n",
- " print(\"π Processing document with enhanced classifier\")\n",
- " print(f\" Document: {file_location.name}\")\n",
- " print(\"\\nβ³ Processing with classification and field extraction...\")\n",
- " \n",
- " response = content_understanding_client.begin_classify(enhanced_classifier_id, file_location=str(file_location))\n",
- " enhanced_result = content_understanding_client.poll_result(response, timeout_seconds=360)\n",
- " \n",
- " print(\"\\nβ
Enhanced processing completed!\")\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"\\nβ Error processing document: {e}\")\n",
+ "if classifier_id and analyzer_id:\n",
+ " pdf_path = \"../data/mixed_financial_docs.pdf\"\n",
+ " print(f\"π Reading document file: {pdf_path}\")\n",
+ " with open(pdf_path, \"rb\") as pdf_file:\n",
+ " pdf_content = pdf_file.read()\n",
+ "\n",
+ " # Begin binary classification operation\n",
+ " print(f\"π Starting binary classification with classifier '{classifier_id}'...\")\n",
+ " classification_poller = await client.content_classifiers.begin_classify_binary(\n",
+ " classifier_id=classifier_id,\n",
+ " input=pdf_content,\n",
+ " content_type=\"application/pdf\",\n",
+ " )\n",
+ "\n",
+ " # Wait for classification completion\n",
+ " print(f\"β³ Waiting for classification to complete...\")\n",
+ " classification_result = await classification_poller.result()\n",
+ " print(f\"β
Classification completed successfully!\")\n",
"else:\n",
" print(\"β οΈ Skipping enhanced classification - enhanced classifier was not created.\")"
]
@@ -505,7 +487,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "## 12. View Enhanced Results with Extracted Fields\n",
+ "## View Enhanced Results with Extracted Fields\n",
"\n",
"Review the classification results alongside extracted fields from loan application documents."
]
@@ -516,41 +498,22 @@
"metadata": {},
"outputs": [],
"source": [
- "# Display enhanced classification results\n",
- "if 'enhanced_result' in locals() and enhanced_result:\n",
- " result_data = enhanced_result.get(\"result\", {})\n",
- " contents = result_data.get(\"contents\", [])\n",
- " \n",
- " print(\"π ENHANCED CLASSIFICATION RESULTS\")\n",
- " print(\"=\" * 70)\n",
- " print(f\"\\nTotal sections found: {len(contents)}\")\n",
- " \n",
- " # Iterate through each document section\n",
- " for i, content in enumerate(contents, 1):\n",
- " print(f\"\\n{'='*70}\")\n",
- " print(f\"SECTION {i}\")\n",
- " print(f\"{'='*70}\")\n",
- " \n",
- " category = content.get('category', 'Unknown')\n",
- " print(f\"\\nπ Category: {category}\")\n",
- " print(f\"π Pages: {content.get('startPageNumber', '?')} - {content.get('endPageNumber', '?')}\")\n",
- " \n",
- " # Display extracted fields if available\n",
- " fields = content.get('fields', {})\n",
- " if fields:\n",
- " print(\"\\nπ Extracted Information:\")\n",
- " for field_name, field_data in fields.items():\n",
- " print(f\"\\n {field_name}:\")\n",
- " print(f\" β’ Value: {field_data}\")\n",
- "else:\n",
- " print(\"β No enhanced results available. Please run the enhanced classification step first.\")"
+ "# Display classification results\n",
+ "print(f\"π Classification Results: {json.dumps(classification_result.as_dict(), indent=2)}\")\n",
+ "for content in classification_result.contents:\n",
+ " if hasattr(content, \"classifications\") and content.classifications:\n",
+ " for classification in content.classifications:\n",
+ " print(f\" Category: {classification.category}\")\n",
+ " print(f\" Confidence: {classification.confidence}\")\n",
+ " print(f\" Score: {classification.score}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "You can also view the full JSON result below."
+ "## Saving Classification Results\n",
+ "The classification result is saved to a JSON file for later analysis."
]
},
{
@@ -559,25 +522,58 @@
"metadata": {},
"outputs": [],
"source": [
- "print(json.dumps(enhanced_result, indent=2))"
+ "# Save the classification result to a file\n",
+ "saved_file_path = save_json_to_file(\n",
+ " result=classification_result.as_dict(),\n",
+ " filename_prefix=\"content_classifiers_classify_binary\",\n",
+ ")\n",
+ "print(f\"πΎ Classification result saved to: {saved_file_path}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## Summary and Next Steps\n",
- "\n",
- "Congratulations! You have successfully:\n",
- "1. β
Created a basic classifier to categorize documents\n",
- "2. β
Created a custom analyzer to extract specific fields\n",
- "3. β
Combined them into an enhanced classifier for intelligent document processing"
+ "## Clean up the created analyzer\n",
+ "After the demo completes, the analyzer is automatically deleted to prevent resource accumulation."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Clean up the created analyzer (demo cleanup)\n",
+ "print(f\"ποΈ Deleting analyzer '{analyzer_id}' (demo cleanup)...\")\n",
+ "await client.content_analyzers.delete(analyzer_id=analyzer_id)\n",
+ "print(f\"β
Analyzer '{analyzer_id}' deleted successfully!\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Clean up the created classifier\n",
+ "After the demo completes, the classifier is automatically deleted to prevent resource accumulation."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Clean up the created classifier (demo cleanup)\n",
+ "print(f\"ποΈ Deleting classifier '{classifier_id}' (demo cleanup)...\")\n",
+ "await client.content_classifiers.delete(classifier_id=classifier_id)\n",
+ "print(f\"β
Classifier '{classifier_id}' deleted successfully!\")"
]
}
],
"metadata": {
"kernelspec": {
- "display_name": "Python 3",
+ "display_name": "py312",
"language": "python",
"name": "python3"
},
@@ -591,7 +587,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.11.12"
+ "version": "3.12.3"
}
},
"nbformat": 4,
diff --git a/notebooks/content_extraction.ipynb b/notebooks/content_extraction.ipynb
index a06b81c..e2b89e2 100644
--- a/notebooks/content_extraction.ipynb
+++ b/notebooks/content_extraction.ipynb
@@ -59,53 +59,42 @@
"import logging\n",
"import json\n",
"import os\n",
+ "from pathlib import Path\n",
"import sys\n",
+ "from dotenv import load_dotenv\n",
+ "from azure.core.credentials import AzureKeyCredential\n",
+ "from azure.identity.aio import DefaultAzureCredential\n",
+ "from azure.ai.contentunderstanding.aio import ContentUnderstandingClient\n",
+ "from azure.ai.contentunderstanding.models import (\n",
+ " AnalyzeResult,\n",
+ " ContentAnalyzer,\n",
+ " ContentAnalyzerConfig,\n",
+ " AnalysisMode,\n",
+ " ProcessingLocation,\n",
+ " AudioVisualContent,\n",
+ ")\n",
+ "from datetime import datetime\n",
+ "from typing import Any\n",
"import uuid\n",
- "from pathlib import Path\n",
- "from dotenv import find_dotenv, load_dotenv\n",
- "from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n",
"\n",
- "load_dotenv(find_dotenv())\n",
- "logging.basicConfig(level=logging.INFO)\n",
- "\n",
- "# For authentication, you can use either token-based auth or subscription key; only one is required\n",
- "AZURE_AI_ENDPOINT = os.getenv(\"AZURE_AI_ENDPOINT\")\n",
- "# IMPORTANT: Replace with your actual subscription key or set it in your \".env\" file if not using token authentication\n",
- "AZURE_AI_API_KEY = os.getenv(\"AZURE_AI_API_KEY\")\n",
- "AZURE_AI_API_VERSION = os.getenv(\"AZURE_AI_API_VERSION\", \"2025-05-01-preview\")\n",
- "\n",
- "# Add the parent directory to the path to use shared modules\n",
- "parent_dir = Path(Path.cwd()).parent\n",
- "sys.path.append(str(parent_dir))\n",
- "from python.content_understanding_client import AzureContentUnderstandingClient\n",
- "\n",
- "credential = DefaultAzureCredential()\n",
- "token_provider = get_bearer_token_provider(credential, \"https://cognitiveservices.azure.com/.default\")\n",
- "\n",
- "client = AzureContentUnderstandingClient(\n",
- " endpoint=AZURE_AI_ENDPOINT,\n",
- " api_version=AZURE_AI_API_VERSION,\n",
- " # IMPORTANT: Comment out token_provider if using subscription key\n",
- " token_provider=token_provider,\n",
- " # IMPORTANT: Uncomment the following line if using subscription key\n",
- " # subscription_key=AZURE_AI_API_KEY,\n",
- " x_ms_useragent=\"azure-ai-content-understanding-python/content_extraction\", # This header is used for sample usage telemetry; please comment out this line if you want to opt out.\n",
+ "# Add the parent directory to the Python path to import the sample_helper module\n",
+ "sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'python'))\n",
+ "from extension.sample_helper import (\n",
+ " extract_operation_id_from_poller,\n",
+ " PollerType,\n",
+ " save_json_to_file,\n",
+ " save_keyframe_image_to_file,\n",
")\n",
"\n",
- "# Utility function to save images\n",
- "from PIL import Image\n",
- "from io import BytesIO\n",
- "import re\n",
+ "load_dotenv()\n",
+ "logging.basicConfig(level=logging.INFO)\n",
"\n",
- "def save_image(image_id: str, response):\n",
- " raw_image = client.get_image_from_analyze_operation(analyze_response=response,\n",
- " image_id=image_id\n",
- " )\n",
- " image = Image.open(BytesIO(raw_image))\n",
- " # To display the image, uncomment the following line:\n",
- " # image.show()\n",
- " Path(\".cache\").mkdir(exist_ok=True)\n",
- " image.save(f\".cache/{image_id}.jpg\", \"JPEG\")\n"
+ "endpoint = os.environ.get(\"AZURE_CONTENT_UNDERSTANDING_ENDPOINT\")\n",
+ "# Return AzureKeyCredential if AZURE_CONTENT_UNDERSTANDING_KEY is set, otherwise DefaultAzureCredential\n",
+ "key = os.getenv(\"AZURE_CONTENT_UNDERSTANDING_KEY\")\n",
+ "credential = AzureKeyCredential(key) if key else DefaultAzureCredential()\n",
+ "# Create the ContentUnderstandingClient\n",
+ "client = ContentUnderstandingClient(endpoint=endpoint, credential=credential)"
]
},
{
@@ -123,14 +112,19 @@
"metadata": {},
"outputs": [],
"source": [
- "ANALYZER_SAMPLE_FILE = '../data/invoice.pdf'\n",
- "ANALYZER_ID = 'prebuilt-documentAnalyzer'\n",
+ "analyzer_sample_file = '../data/invoice.pdf'\n",
+ "analyzer_id = 'prebuilt-documentAnalyzer'\n",
"\n",
- "# Analyze document file\n",
- "response = client.begin_analyze(ANALYZER_ID, file_location=ANALYZER_SAMPLE_FILE)\n",
- "result_json = client.poll_result(response)\n",
+ "with open(analyzer_sample_file, \"rb\") as f:\n",
+ " pdf_bytes = f.read()\n",
"\n",
- "print(json.dumps(result_json, indent=2))"
+ "print(f\"π Analyzing {analyzer_sample_file} with prebuilt-documentAnalyzer...\")\n",
+ "poller = await client.content_analyzers.begin_analyze_binary(\n",
+ " analyzer_id=analyzer_id,\n",
+ " input=pdf_bytes,\n",
+ " content_type=\"application/pdf\"\n",
+ ")\n",
+ "result: AnalyzeResult = await poller.result()"
]
},
{
@@ -146,7 +140,11 @@
"metadata": {},
"outputs": [],
"source": [
- "print(result_json[\"result\"][\"contents\"][0][\"markdown\"])\n"
+ "print(\"\\nπ Markdown Content:\")\n",
+ "print(\"=\" * 50)\n",
+ "content = result.contents[0]\n",
+ "print(content.markdown)\n",
+ "print(\"=\" * 50)"
]
},
{
@@ -162,23 +160,7 @@
"metadata": {},
"outputs": [],
"source": [
- "print(json.dumps(result_json[\"result\"][\"contents\"][0], indent=2))\n"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "> This output helps you retrieve structural information about the tables embedded within the document."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "print(json.dumps(result_json[\"result\"][\"contents\"][0][\"tables\"], indent=2))"
+ "print(json.dumps(result.as_dict(), indent=2))"
]
},
{
@@ -206,14 +188,62 @@
"metadata": {},
"outputs": [],
"source": [
- "ANALYZER_SAMPLE_FILE = '../data/audio.wav'\n",
- "ANALYZER_ID = 'prebuilt-audioAnalyzer'\n",
+ "analyzer_id = f\"audio-sample-{datetime.now().strftime('%Y%m%d')}-{datetime.now().strftime('%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
+ "\n",
+ "# Create a marketing video analyzer using object model\n",
+ "print(f\"π§ Creating marketing video analyzer '{analyzer_id}'...\")\n",
"\n",
- "# Analyze audio file\n",
- "response = client.begin_analyze(ANALYZER_ID, file_location=ANALYZER_SAMPLE_FILE)\n",
- "result_json = client.poll_result(response)\n",
+ "audio_analyzer = ContentAnalyzer(\n",
+ " base_analyzer_id=\"prebuilt-audioAnalyzer\",\n",
+ " config=ContentAnalyzerConfig(return_details=True),\n",
+ " description=\"Marketing audio analyzer for result file demo\",\n",
+ " mode=AnalysisMode.STANDARD,\n",
+ " processing_location=ProcessingLocation.GLOBAL,\n",
+ " tags={\"demo_type\": \"audio_analysis\"},\n",
+ ")\n",
+ "\n",
+ " # Start the analyzer creation operation\n",
+ "poller = await client.content_analyzers.begin_create_or_replace(\n",
+ " analyzer_id=analyzer_id,\n",
+ " resource=audio_analyzer,\n",
+ ")\n",
+ "\n",
+ "# Extract operation ID from the poller\n",
+ "operation_id = extract_operation_id_from_poller(\n",
+ " poller, PollerType.ANALYZER_CREATION\n",
+ ")\n",
+ "print(f\"π Extracted creation operation ID: {operation_id}\")\n",
+ "\n",
+ "# Wait for the analyzer to be created\n",
+ "print(f\"β³ Waiting for analyzer creation to complete...\")\n",
+ "await poller.result()\n",
+ "print(f\"β
Analyzer '{analyzer_id}' created successfully!\")\n",
"\n",
- "print(json.dumps(result_json, indent=2))"
+ "# Analyze audio file with the created analyzer\n",
+ "audio_file_path = \"../data/audio.wav\"\n",
+ "print(f\"π Analyzing audio file from path: {audio_file_path} with analyzer '{analyzer_id}'...\")\n",
+ "\n",
+ "with open(audio_file_path, \"rb\") as f:\n",
+ " audio_data = f.read()\n",
+ "\n",
+ "# Begin audio analysis operation\n",
+ "print(f\"π¬ Starting audio analysis with analyzer '{analyzer_id}'...\")\n",
+ "analysis_poller = await client.content_analyzers.begin_analyze_binary(\n",
+ " analyzer_id=analyzer_id,\n",
+ " input=audio_data,\n",
+ " content_type=\"application/octet-stream\",\n",
+ ")\n",
+ "\n",
+ " # Wait for analysis completion\n",
+ "print(f\"β³ Waiting for audio analysis to complete...\")\n",
+ "analysis_result = await analysis_poller.result()\n",
+ "print(f\"β
Audio analysis completed successfully!\")\n",
+ "print(f\"π Analysis Results: {json.dumps(analysis_result.as_dict(), indent=2)}\")\n",
+ "\n",
+ "# Clean up the created analyzer (demo cleanup)\n",
+ "print(f\"ποΈ Deleting analyzer '{analyzer_id}' (demo cleanup)...\")\n",
+ "await client.content_analyzers.delete(analyzer_id=analyzer_id)\n",
+ "print(f\"β
Analyzer '{analyzer_id}' deleted successfully!\")"
]
},
{
@@ -237,32 +267,146 @@
"metadata": {},
"outputs": [],
"source": [
- "ANALYZER_SAMPLE_FILE = '../data/FlightSimulator.mp4'\n",
- "ANALYZER_ID = 'prebuilt-videoAnalyzer'\n",
+ "analyzer_id = f\"video-sample-{datetime.now().strftime('%Y%m%d')}-{datetime.now().strftime('%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
"\n",
- "# Analyze video file\n",
- "response = client.begin_analyze(ANALYZER_ID, file_location=ANALYZER_SAMPLE_FILE)\n",
- "result_json = client.poll_result(response)\n",
+ "video_analyzer = ContentAnalyzer(\n",
+ " base_analyzer_id='prebuilt-videoAnalyzer', \n",
+ " config=ContentAnalyzerConfig(return_details=True), \n",
+ " description=\"Marketing video analyzer for result file demo\", \n",
+ " mode=AnalysisMode.STANDARD,\n",
+ " processing_location=ProcessingLocation.GLOBAL,\n",
+ " tags={\"demo_type\": \"video_analysis\"}\n",
+ ")\n",
"\n",
- "print(json.dumps(result_json, indent=2))\n",
+ "# Start the analyzer creation operation\n",
+ "poller = await client.content_analyzers.begin_create_or_replace(\n",
+ " analyzer_id=analyzer_id,\n",
+ " resource=video_analyzer,\n",
+ ")\n",
+ "\n",
+ " # Extract operation ID from the poller\n",
+ "operation_id = extract_operation_id_from_poller(\n",
+ " poller, PollerType.ANALYZER_CREATION\n",
+ ")\n",
+ "print(f\"π Extracted creation operation ID: {operation_id}\")\n",
"\n",
- "# Save keyframes (optional)\n",
- "keyframe_ids = set()\n",
- "result_data = result_json.get(\"result\", {})\n",
- "contents = result_data.get(\"contents\", [])\n",
+ "# Wait for the analyzer to be created\n",
+ "print(f\"β³ Waiting for analyzer creation to complete...\")\n",
+ "await poller.result()\n",
+ "print(f\"β
Analyzer '{analyzer_id}' created successfully!\")\n",
"\n",
- "# Extract keyframe IDs from markdown content\n",
- "for content in contents:\n",
- " markdown_content = content.get(\"markdown\", \"\")\n",
- " if isinstance(markdown_content, str):\n",
- " keyframe_ids.update(re.findall(r\"(keyFrame\\.\\d+)\\.jpg\", markdown_content))\n",
+ "# Use the FlightSimulator.mp4 video file from remote location\n",
+ "video_file_path = \"../data/FlightSimulator.mp4\"\n",
+ "print(f\"πΉ Using video file from URL: {video_file_path}\")\n",
+ "\n",
+ "with open(video_file_path, \"rb\") as f:\n",
+ " video_data = f.read()\n",
+ "\n",
+ "# Begin video analysis operation\n",
+ "print(f\"π¬ Starting video analysis with analyzer '{analyzer_id}'...\")\n",
+ "analysis_poller = await client.content_analyzers.begin_analyze_binary(\n",
+ " analyzer_id=analyzer_id,\n",
+ " input=video_data,\n",
+ " content_type=\"application/octet-stream\"\n",
+ ")\n",
+ "\n",
+ "# Wait for analysis completion\n",
+ "print(f\"β³ Waiting for video analysis to complete...\")\n",
+ "analysis_result = await analysis_poller.result()\n",
+ "print(json.dumps(analysis_result.as_dict(), indent=2))\n",
+ "print(f\"β
Video analysis completed successfully!\")\n",
+ "\n",
+ "# Extract operation ID for get_result_file\n",
+ "analysis_operation_id = extract_operation_id_from_poller(\n",
+ " analysis_poller, PollerType.ANALYZE_CALL\n",
+ ")\n",
+ "print(f\"π Extracted analysis operation ID: {analysis_operation_id}\")\n",
+ "\n",
+ "# Get the result to see what files are available\n",
+ "print(f\"π Getting analysis result to find available files...\")\n",
+ "operation_status = await client.content_analyzers.get_result(\n",
+ " operation_id=analysis_operation_id,\n",
+ ")\n",
"\n",
- "# Output unique keyframe IDs\n",
- "print(\"Unique Keyframe IDs:\", keyframe_ids)\n",
+ "# The actual analysis result is in operation_status.result\n",
+ "operation_result: Any = operation_status.result\n",
+ "if operation_result is None:\n",
+ " print(\"β οΈ No analysis result available\")\n",
+ "else:\n",
+ " print(f\"β
Analysis result contains {len(operation_result.contents)} contents\")\n",
"\n",
- "# Save all keyframe images\n",
- "for keyframe_id in keyframe_ids:\n",
- " save_image(keyframe_id, response)"
+ "# Look for keyframe times in the analysis result\n",
+ "keyframe_times_ms: list[int] = []\n",
+ "for content in operation_result.contents:\n",
+ " if isinstance(content, AudioVisualContent):\n",
+ " video_content: AudioVisualContent = content\n",
+ " print(f\"KeyFrameTimesMs: {video_content.key_frame_times_ms}\")\n",
+ " print(video_content)\n",
+ " keyframe_times_ms.extend(video_content.key_frame_times_ms or [])\n",
+ " print(f\"πΉ Found {len(keyframe_times_ms)} keyframes in video content\")\n",
+ " break\n",
+ " else:\n",
+ " print(f\"Content is not an AudioVisualContent: {content}\")\n",
+ "\n",
+ "if not keyframe_times_ms:\n",
+ " print(\"β οΈ No keyframe times found in the analysis result\")\n",
+ "else:\n",
+ " print(f\"πΌοΈ Found {len(keyframe_times_ms)} keyframe times in milliseconds\")\n",
+ "\n",
+ "# Build keyframe filenames using the time values\n",
+ "keyframe_files = [f\"keyFrame.{time_ms}\" for time_ms in keyframe_times_ms]\n",
+ "\n",
+ "# Download and save a few keyframe images as examples (first, middle, last)\n",
+ "if len(keyframe_files) >= 3:\n",
+ " frames_to_download = {\n",
+ " keyframe_files[0],\n",
+ " keyframe_files[-1],\n",
+ " keyframe_files[len(keyframe_files) // 2],\n",
+ " }\n",
+ "else:\n",
+ " frames_to_download = set(keyframe_files)\n",
+ "\n",
+ "files_to_download = list(frames_to_download)\n",
+ "print(\n",
+ " f\"π₯ Downloading {len(files_to_download)} keyframe images as examples: {files_to_download}\"\n",
+ ")\n",
+ "\n",
+ "for keyframe_id in files_to_download:\n",
+ " print(f\"π₯ Getting result file: {keyframe_id}\")\n",
+ "\n",
+ " # Get the result file (keyframe image)\n",
+ " response: Any = await client.content_analyzers.get_result_file(\n",
+ " operation_id=analysis_operation_id,\n",
+ " path=keyframe_id,\n",
+ " )\n",
+ "\n",
+ " # Handle the response which may be bytes or an async iterator of bytes\n",
+ " if isinstance(response, (bytes, bytearray)):\n",
+ " image_content = bytes(response)\n",
+ " else:\n",
+ " chunks: list[bytes] = []\n",
+ " async for chunk in response:\n",
+ " chunks.append(chunk)\n",
+ " image_content = b\"\".join(chunks)\n",
+ "\n",
+ " print(\n",
+ " f\"β
Retrieved image file for {keyframe_id} ({len(image_content)} bytes)\"\n",
+ " )\n",
+ "\n",
+ " # Save the image file\n",
+ " saved_file_path = save_keyframe_image_to_file(\n",
+ " image_content=image_content,\n",
+ " keyframe_id=keyframe_id,\n",
+ " test_name=\"content_analyzers_get_result_file\",\n",
+ " test_py_file_dir=os.getcwd(),\n",
+ " identifier=analyzer_id,\n",
+ " )\n",
+ " print(f\"πΎ Keyframe image saved to: {saved_file_path}\")\n",
+ "\n",
+ "# Clean up the created analyzer (demo cleanup)\n",
+ "print(f\"ποΈ Deleting analyzer '{analyzer_id}' (demo cleanup)...\")\n",
+ "await client.content_analyzers.delete(analyzer_id=analyzer_id)\n",
+ "print(f\"β
Analyzer '{analyzer_id}' deleted successfully!\")"
]
},
{
@@ -281,14 +425,78 @@
"metadata": {},
"outputs": [],
"source": [
- "ANALYZER_SAMPLE_FILE = '../data/FlightSimulator.mp4'\n",
- "ANALYZER_ID = 'prebuilt-videoAnalyzer'\n",
+ "analyzer_id = f\"video-sample-{datetime.now().strftime('%Y%m%d')}-{datetime.now().strftime('%H%M%S')}-{uuid.uuid4().hex[:8]}\"\n",
+ "\n",
+ "# Create a marketing video analyzer using object model\n",
+ "print(f\"π§ Creating marketing video analyzer '{analyzer_id}'...\")\n",
+ "\n",
+ "video_analyzer = ContentAnalyzer(\n",
+ " base_analyzer_id='prebuilt-videoAnalyzer',\n",
+ " config=ContentAnalyzerConfig(\n",
+ " return_details=True,\n",
+ " ),\n",
+ " description=\"Marketing video analyzer for result file demo\",\n",
+ " mode=AnalysisMode.STANDARD,\n",
+ " processing_location=ProcessingLocation.GLOBAL,\n",
+ " tags={\"demo_type\": \"video_analysis\"},\n",
+ ")\n",
+ "\n",
+ "# Start the analyzer creation operation\n",
+ "poller = await client.content_analyzers.begin_create_or_replace(\n",
+ " analyzer_id=analyzer_id,\n",
+ " resource=video_analyzer,\n",
+ ")\n",
+ "\n",
+ "# Extract operation ID from the poller\n",
+ "operation_id = extract_operation_id_from_poller(\n",
+ " poller, PollerType.ANALYZER_CREATION\n",
+ ")\n",
+ "print(f\"π Extracted creation operation ID: {operation_id}\")\n",
+ "\n",
+ "# Wait for the analyzer to be created\n",
+ "print(f\"β³ Waiting for analyzer creation to complete...\")\n",
+ "await poller.result()\n",
+ "print(f\"β
Analyzer '{analyzer_id}' created successfully!\")\n",
+ "\n",
+ "# Use the FlightSimulator.mp4 video file from remote location\n",
+ "video_file_path = \"../data/FlightSimulator.mp4\"\n",
+ "print(f\"πΉ Using video file from URL: {video_file_path}\")\n",
"\n",
- "# Analyze video file with face recognition\n",
- "response = client.begin_analyze(ANALYZER_ID, file_location=ANALYZER_SAMPLE_FILE)\n",
- "result_json = client.poll_result(response)\n",
+ "with open(video_file_path, \"rb\") as f:\n",
+ " video_data = f.read()\n",
"\n",
- "print(json.dumps(result_json, indent=2))"
+ "# Begin video analysis operation\n",
+ "print(f\"π¬ Starting video analysis with analyzer '{analyzer_id}'...\")\n",
+ "analysis_poller = await client.content_analyzers.begin_analyze_binary(\n",
+ " analyzer_id=analyzer_id,\n",
+ " input=video_data,\n",
+ " content_type=\"application/octet-stream\"\n",
+ ")\n",
+ "\n",
+ "# Wait for analysis completion\n",
+ "print(f\"β³ Waiting for video analysis to complete...\")\n",
+ "analysis_result = await analysis_poller.result()\n",
+ "print(\"result: \", json.dumps(analysis_result.as_dict(), indent=2))\n",
+ "print(f\"β
Video analysis completed successfully!\")\n",
+ "\n",
+ "# Extract operation ID for get_result_file\n",
+ "analysis_operation_id = extract_operation_id_from_poller(\n",
+ " analysis_poller, PollerType.ANALYZE_CALL\n",
+ ")\n",
+ "print(f\"π Extracted analysis operation ID: {analysis_operation_id}\")\n",
+ "\n",
+ "# Get the result to see what files are available\n",
+ "print(f\"π Getting analysis result to find available files...\")\n",
+ "operation_status = await client.content_analyzers.get_result(\n",
+ " operation_id=analysis_operation_id,\n",
+ ")\n",
+ "\n",
+ "# The actual analysis result is in operation_status.result\n",
+ "operation_result: Any = operation_status.result\n",
+ "if operation_result is None:\n",
+ " print(\"β οΈ No analysis result available\")\n",
+ "else:\n",
+ " print(f\"β
Analysis result contains {len(operation_result.contents)} contents\")"
]
},
{
@@ -304,45 +512,93 @@
"metadata": {},
"outputs": [],
"source": [
- "# Initialize sets to store unique face IDs and keyframe IDs\n",
+ "# Initialize sets to store unique face IDs\n",
"face_ids = set()\n",
- "keyframe_ids = set()\n",
- "\n",
- "# Safely extract face IDs and keyframe IDs from content\n",
- "result_data = result_json.get(\"result\", {})\n",
- "contents = result_data.get(\"contents\", [])\n",
- "\n",
- "for content in contents:\n",
- " # Extract face IDs if \"faces\" field exists and is a list\n",
- " faces = content.get(\"faces\", [])\n",
- " if isinstance(faces, list):\n",
- " for face in faces:\n",
- " face_id = face.get(\"faceId\")\n",
- " if face_id:\n",
- " face_ids.add(f\"face.{face_id}\")\n",
- "\n",
- " # Extract keyframe IDs from \"markdown\" if present and a string\n",
- " markdown_content = content.get(\"markdown\", \"\")\n",
- " if isinstance(markdown_content, str):\n",
- " keyframe_ids.update(re.findall(r\"(keyFrame\\.\\d+)\\.jpg\", markdown_content))\n",
- "\n",
- "# Display unique face and keyframe IDs\n",
- "print(\"Unique Face IDs:\", face_ids)\n",
- "print(\"Unique Keyframe IDs:\", keyframe_ids)\n",
- "\n",
- "# Save all face images\n",
- "for face_id in face_ids:\n",
- " save_image(face_id, response)\n",
- "\n",
- "# Save all keyframe images\n",
- "for keyframe_id in keyframe_ids:\n",
- " save_image(keyframe_id, response)"
+ "\n",
+ "# Look for keyframe times in the analysis result\n",
+ "keyframe_times_ms: list[int] = []\n",
+ "for content in operation_result.contents:\n",
+ " if isinstance(content, AudioVisualContent):\n",
+ " video_content: AudioVisualContent = content\n",
+ " print(f\"KeyFrameTimesMs: {video_content.key_frame_times_ms}\")\n",
+ " print(video_content)\n",
+ " keyframe_times_ms.extend(video_content.key_frame_times_ms or [])\n",
+ " print(f\"πΉ Found {len(keyframe_times_ms)} keyframes in video content\")\n",
+ " faces = content.get(\"faces\", [])\n",
+ " if isinstance(faces, list):\n",
+ " for face in faces:\n",
+ " face_id = face.get(\"faceId\")\n",
+ " if face_id:\n",
+ " face_ids.add(f\"face.{face_id}\")\n",
+ " break\n",
+ " else:\n",
+ " print(f\"Content is not an AudioVisualContent: {content}\")\n",
+ "\n",
+ "if not keyframe_times_ms:\n",
+ " print(\"β οΈ No keyframe times found in the analysis result\")\n",
+ "else:\n",
+ " print(f\"πΌοΈ Found {len(keyframe_times_ms)} keyframe times in milliseconds\")\n",
+ "\n",
+ "# Build keyframe filenames using the time values\n",
+ "keyframe_files = [f\"keyFrame.{time_ms}\" for time_ms in keyframe_times_ms]\n",
+ "\n",
+ "# Download and save a few keyframe images as examples (first, middle, last)\n",
+ "if len(keyframe_files) >= 3:\n",
+ " frames_to_download = {\n",
+ " keyframe_files[0],\n",
+ " keyframe_files[-1],\n",
+ " keyframe_files[len(keyframe_files) // 2],\n",
+ " }\n",
+ "else:\n",
+ " frames_to_download = set(keyframe_files)\n",
+ "\n",
+ "files_to_download = list(frames_to_download)\n",
+ "print(\n",
+ " f\"π₯ Downloading {len(files_to_download)} keyframe images as examples: {files_to_download}\"\n",
+ ")\n",
+ "\n",
+ "for keyframe_id in files_to_download:\n",
+ " print(f\"π₯ Getting result file: {keyframe_id}\")\n",
+ "\n",
+ " # Get the result file (keyframe image)\n",
+ " response: Any = await client.content_analyzers.get_result_file(\n",
+ " operation_id=analysis_operation_id,\n",
+ " path=keyframe_id,\n",
+ " )\n",
+ "\n",
+ " # Handle the response which may be bytes or an async iterator of bytes\n",
+ " if isinstance(response, (bytes, bytearray)):\n",
+ " image_content = bytes(response)\n",
+ " else:\n",
+ " chunks: list[bytes] = []\n",
+ " async for chunk in response:\n",
+ " chunks.append(chunk)\n",
+ " image_content = b\"\".join(chunks)\n",
+ "\n",
+ " print(\n",
+ " f\"β
Retrieved image file for {keyframe_id} ({len(image_content)} bytes)\"\n",
+ " )\n",
+ "\n",
+ " # Save the image file\n",
+ " saved_file_path = save_keyframe_image_to_file(\n",
+ " image_content=image_content,\n",
+ " keyframe_id=keyframe_id,\n",
+ " test_name=\"content_analyzers_get_result_file\",\n",
+ " test_py_file_dir=os.getcwd(),\n",
+ " identifier=analyzer_id,\n",
+ " )\n",
+ " print(f\"πΎ Keyframe image saved to: {saved_file_path}\")\n",
+ "\n",
+ "# Clean up the created analyzer (demo cleanup)\n",
+ "print(f\"ποΈ Deleting analyzer '{analyzer_id}' (demo cleanup)...\")\n",
+ "await client.content_analyzers.delete(analyzer_id=analyzer_id)\n",
+ "print(f\"β
Analyzer '{analyzer_id}' deleted successfully!\")"
]
}
],
"metadata": {
"kernelspec": {
- "display_name": "Python 3",
+ "display_name": "py312",
"language": "python",
"name": "python3"
},
@@ -356,7 +612,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.11.12"
+ "version": "3.12.3"
}
},
"nbformat": 4,
diff --git a/python/extension/document_processor.py b/python/extension/document_processor.py
new file mode 100644
index 0000000..bf451ee
--- /dev/null
+++ b/python/extension/document_processor.py
@@ -0,0 +1,343 @@
+from datetime import datetime, timedelta, timezone
+import os
+import json
+import asyncio
+from typing import List, Dict, Any, Optional
+from pathlib import Path
+from azure.identity import DefaultAzureCredential
+from azure.storage.blob.aio import ContainerClient
+from azure.ai.contentunderstanding.aio import ContentUnderstandingClient
+from azure.storage.blob import (
+ BlobServiceClient,
+ generate_container_sas,
+ ContainerSasPermissions
+)
+from dataclasses import dataclass
+
+@dataclass
+class ReferenceDocItem:
+ file_name: str = ""
+ file_path: str = ""
+ result_file_name: str = ""
+ result_file_path: str = ""
+
+class DocumentProcessor:
+ PREBUILT_DOCUMENT_ANALYZER_ID: str = "prebuilt-documentAnalyzer"
+ OCR_RESULT_FILE_SUFFIX: str = ".result.json"
+ LABEL_FILE_SUFFIX: str = ".labels.json"
+ KNOWLEDGE_SOURCE_LIST_FILE_NAME: str = "sources.jsonl"
+ SAS_EXPIRY_HOURS: int = 1
+
+ SUPPORTED_FILE_TYPES_DOCUMENT_TXT: List[str] = [
+ ".pdf", ".tiff", ".jpg", ".jpeg", ".png", ".bmp", ".heif", ".docx",
+ ".xlsx", ".pptx", ".txt", ".html", ".md", ".eml", ".msg", ".xml",
+ ]
+
+ SUPPORTED_FILE_TYPES_DOCUMENT: List[str] = [
+ ".pdf", ".tiff", ".jpg", ".jpeg", ".png", ".bmp", ".heif",
+ ]
+
+ def __init__(self, client: ContentUnderstandingClient):
+ self._client = client
+
+ def generate_container_sas_url(
+ self,
+ account_name: str,
+ container_name: str,
+ permissions: Optional[ContainerSasPermissions] = None,
+ expiry_hours: Optional[int] = None,
+ ) -> str:
+ """Generate a temporary SAS URL for an Azure Blob container using Azure AD authentication."""
+ print(f"account_name: {account_name}")
+ if not all([account_name, container_name]):
+ raise ValueError("Account name and container name must be provided.")
+
+ permissions = permissions or ContainerSasPermissions(read=True, write=True, list=True)
+ hours = expiry_hours or self.SAS_EXPIRY_HOURS
+
+ now = datetime.now(timezone.utc)
+ expiry = now + timedelta(hours=hours)
+ account_url = f"https://{account_name}.blob.core.windows.net"
+ client = BlobServiceClient(account_url=account_url, credential=DefaultAzureCredential())
+
+ delegation_key = client.get_user_delegation_key(now, expiry)
+ sas_token = generate_container_sas(
+ account_name=account_name,
+ container_name=container_name,
+ user_delegation_key=delegation_key,
+ permission=permissions,
+ expiry=expiry,
+ start=now,
+ )
+
+ return f"{account_url}/{container_name}?{sas_token}"
+
+ async def generate_knowledge_base_on_blob(
+ self,
+ reference_docs_folder: str,
+ storage_container_sas_url: str,
+ storage_container_path_prefix: str,
+ skip_analyze: bool = False
+ ):
+ if not storage_container_path_prefix.endswith("/"):
+ storage_container_path_prefix += "/"
+
+ try:
+ resources = []
+ container_client = ContainerClient.from_container_url(storage_container_sas_url)
+
+ if not skip_analyze:
+ analyze_list: List[ReferenceDocItem] = self._get_analyze_list(reference_docs_folder)
+
+ for analyze_item in analyze_list:
+ try:
+ prebuilt_document_analyzer_id = self.PREBUILT_DOCUMENT_ANALYZER_ID
+
+ print(analyze_item.file_path)
+
+ with open(analyze_item.file_path, "rb") as f:
+ doc_bytes: bytes = f.read()
+
+ print(f"π Analyzing {analyze_item.file_path} with prebuilt-documentAnalyzer...")
+ poller = await self._client.content_analyzers.begin_analyze_binary(
+ analyzer_id=prebuilt_document_analyzer_id,
+ input=doc_bytes,
+ content_type="application/pdf",
+ )
+ result = await poller.result()
+
+ if isinstance(result, (dict, list)):
+ json_string = json.dumps(result)
+ else:
+ json_string = str(result)
+
+ result_file_blob_path = storage_container_path_prefix + analyze_item.result_file_name
+ file_blob_path = storage_container_path_prefix + analyze_item.file_name
+
+ await self._upload_json_to_blob(container_client, json_string, result_file_blob_path)
+ await self._upload_file_to_blob(container_client, analyze_item.file_path, file_blob_path)
+
+ resources.append({
+ "file": analyze_item.file_name,
+ "resultFile": analyze_item.result_file_name
+ })
+ except json.JSONDecodeError as json_ex:
+ raise ValueError(
+ f"Failed to parse JSON result for file '{analyze_item.file_path}'. "
+ f"Ensure the file is a valid document and the analyzer is set up correctly."
+ ) from json_ex
+ except Exception as ex:
+ raise ValueError(
+ f"Failed to analyze file '{analyze_item.file_path}'. "
+ f"Ensure the file is a valid document and the analyzer is set up correctly."
+ ) from ex
+ else:
+ upload_list: List[ReferenceDocItem] = []
+
+ # Process subdirectories
+ for dir_path in Path(reference_docs_folder).rglob("*"):
+ if dir_path.is_dir():
+ self._process_directory(str(dir_path), upload_list)
+
+ # Process root directory
+ self._process_directory(reference_docs_folder, upload_list)
+
+ for upload_item in upload_list:
+ result_file_blob_path = storage_container_path_prefix + upload_item.result_file_name
+ file_blob_path = storage_container_path_prefix + upload_item.file_name
+
+ await self._upload_file_to_blob(container_client, upload_item.result_file_path, result_file_blob_path)
+ await self._upload_file_to_blob(container_client, upload_item.file_path, file_blob_path)
+
+ resources.append({
+ "file": upload_item.file_name,
+ "resultFile": upload_item.result_file_name
+ })
+
+ # Convert resources to JSON strings
+ jsons = [json.dumps(record) for record in resources]
+
+ await self._upload_jsonl_to_blob(container_client, jsons, storage_container_path_prefix + self.KNOWLEDGE_SOURCE_LIST_FILE_NAME)
+ finally:
+ if container_client:
+ await container_client.close()
+
+ def _process_directory(self, dir_path: str, upload_only_list: List[ReferenceDocItem]):
+ # Get all files in the directory
+ try:
+ file_names = set(os.listdir(dir_path))
+ file_paths = [os.path.join(dir_path, f) for f in file_names if os.path.isfile(os.path.join(dir_path, f))]
+ except OSError:
+ return
+
+ for file_path in file_paths:
+ file_name = os.path.basename(file_path)
+ file_ext = os.path.splitext(file_name)[1]
+
+ if self.is_supported_doc_type_by_file_ext(file_ext, is_document=True):
+ result_file_name = file_name + self.OCR_RESULT_FILE_SUFFIX
+ result_file_path = os.path.join(dir_path, result_file_name)
+
+ if not os.path.exists(result_file_path):
+ raise FileNotFoundError(
+ f"Result file '{result_file_name}' not found in directory '{dir_path}'. "
+ f"Please run analyze first or remove this file from the folder."
+ )
+
+ upload_only_list.append(ReferenceDocItem(
+ file_name=file_name,
+ file_path=file_path,
+ result_file_name=result_file_name,
+ result_file_path=result_file_path
+ ))
+ elif file_name.lower().endswith(self.OCR_RESULT_FILE_SUFFIX.lower()):
+ ocr_suffix = self.OCR_RESULT_FILE_SUFFIX
+ original_file_name = file_name[:-len(ocr_suffix)]
+ original_file_path = os.path.join(dir_path, original_file_name)
+
+ if os.path.exists(original_file_path):
+ origin_file_ext = os.path.splitext(original_file_name)[1]
+
+ if self.is_supported_doc_type_by_file_ext(origin_file_ext, is_document=True):
+ continue
+ else:
+ raise ValueError(
+ f"The '{original_file_name}' is not a supported document type, "
+ f"please remove the result file '{file_name}' and '{original_file_name}'."
+ )
+ else:
+ raise ValueError(
+ f"Result file '{file_name}' is not corresponding to an original file, "
+ f"please remove it."
+ )
+ else:
+ raise ValueError(
+ f"File '{file_name}' is not a supported document type, "
+ f"please remove it or convert it to a supported type."
+ )
+
+ def _get_analyze_list(self, reference_docs_folder: str) -> List[ReferenceDocItem]:
+ analyze_list: List[ReferenceDocItem] = []
+
+ # Process subdirectories
+ for dir_path in Path(reference_docs_folder).rglob("*"):
+ if dir_path.is_dir():
+ try:
+ for file_path in dir_path.iterdir():
+ if file_path.is_file():
+ file_name_only = file_path.name
+ file_ext = file_path.suffix
+
+ if self.is_supported_doc_type_by_file_ext(file_ext, is_document=True):
+ result_file_name = file_name_only + self.OCR_RESULT_FILE_SUFFIX
+ analyze_list.append(ReferenceDocItem(
+ file_name=file_name_only,
+ file_path=str(file_path),
+ result_file_name=result_file_name
+ ))
+ else:
+ raise ValueError(
+ f"File '{file_name_only}' is not a supported document type, "
+ f"please remove it or convert it to a supported type."
+ )
+ except OSError:
+ continue
+
+ # Process files in the root folder
+ root_path = Path(reference_docs_folder)
+ try:
+ for file_path in root_path.iterdir():
+ if file_path.is_file():
+ file_name_only = file_path.name
+ file_ext = file_path.suffix
+
+ if self.is_supported_doc_type_by_file_ext(file_ext, is_document=True):
+ result_file_name = file_name_only + self.OCR_RESULT_FILE_SUFFIX
+ analyze_list.append(ReferenceDocItem(
+ file_name=file_name_only,
+ file_path=str(file_path),
+ result_file_name=result_file_name
+ ))
+ else:
+ raise ValueError(
+ f"File '{file_name_only}' is not a supported document type, "
+ f"please remove it or convert it to a supported type."
+ )
+ except OSError:
+ pass
+
+ return analyze_list
+
+ async def generate_training_data_on_blob(
+ self,
+ training_docs_folder: str,
+ storage_container_sas_url: str,
+ storage_container_path_prefix: str,
+ ) -> None:
+ if not storage_container_path_prefix.endswith("/"):
+ storage_container_path_prefix += "/"
+
+ async with ContainerClient.from_container_url(storage_container_sas_url) as container_client:
+ for file_name in os.listdir(training_docs_folder):
+ file_path = os.path.join(training_docs_folder, file_name)
+ _, file_ext = os.path.splitext(file_name)
+ if os.path.isfile(file_path) and (
+ file_ext == "" or file_ext.lower() in self.SUPPORTED_FILE_TYPES_DOCUMENT):
+ # Training feature only supports Standard mode with document data
+ # Document files uploaded to AI Foundry will be convert to uuid without extension
+ label_file_name = file_name + self.LABEL_FILE_SUFFIX
+ label_path = os.path.join(training_docs_folder, label_file_name)
+ ocr_result_file_name = file_name + self.OCR_RESULT_FILE_SUFFIX
+ ocr_result_path = os.path.join(training_docs_folder, ocr_result_file_name)
+
+ if os.path.exists(label_path) and os.path.exists(ocr_result_path):
+ file_blob_path = storage_container_path_prefix + file_name
+ label_blob_path = storage_container_path_prefix + label_file_name
+ ocr_result_blob_path = storage_container_path_prefix + ocr_result_file_name
+
+ # Upload files
+ await self._upload_file_to_blob(container_client, file_path, file_blob_path)
+ await self._upload_file_to_blob(container_client, label_path, label_blob_path)
+ await self._upload_file_to_blob(container_client, ocr_result_path, ocr_result_blob_path)
+ print(f"Uploaded training data for {file_name}")
+ else:
+ raise FileNotFoundError(
+ f"Label file '{label_file_name}' or OCR result file '{ocr_result_file_name}' "
+ f"does not exist in '{training_docs_folder}'. "
+ f"Please ensure both files exist for '{file_name}'."
+ )
+
+ async def _upload_file_to_blob(
+ self, container_client: ContainerClient, file_path: str, target_blob_path: str
+ ) -> None:
+ with open(file_path, "rb") as data:
+ await container_client.upload_blob(name=target_blob_path, data=data, overwrite=True)
+ print(f"Uploaded file to {target_blob_path}")
+
+ async def _upload_json_to_blob(
+ self, container_client: ContainerClient, json_string: str, target_blob_path: str
+ ) -> None:
+ json_bytes = json_string.encode('utf-8')
+ await container_client.upload_blob(name=target_blob_path, data=json_bytes, overwrite=True)
+ print(f"Uploaded json to {target_blob_path}")
+
+ async def _upload_jsonl_to_blob(
+ self, container_client: ContainerClient, data_list: List[str], target_blob_path: str
+ ) -> None:
+ jsonl_string = "\n".join(data_list)
+ jsonl_bytes = jsonl_string.encode("utf-8")
+ await container_client.upload_blob(name=target_blob_path, data=jsonl_bytes, overwrite=True)
+ print(f"Uploaded jsonl to blob '{target_blob_path}'")
+
+ def is_supported_doc_type_by_file_ext(self, file_ext: str, is_document: bool=False) -> bool:
+ supported_types = (
+ self.SUPPORTED_FILE_TYPES_DOCUMENT
+ if is_document else self.SUPPORTED_FILE_TYPES_DOCUMENT_TXT
+ )
+ return file_ext.lower() in supported_types
+
+ def is_supported_doc_type_by_file_path(self, file_path: Path, is_document: bool=False) -> bool:
+ if not file_path.is_file():
+ return False
+ file_ext = file_path.suffix.lower()
+ return self.is_supported_doc_type_by_file_ext(file_ext, is_document)
\ No newline at end of file
diff --git a/python/extension/sample_helper.py b/python/extension/sample_helper.py
new file mode 100644
index 0000000..3c3e21b
--- /dev/null
+++ b/python/extension/sample_helper.py
@@ -0,0 +1,181 @@
+# coding=utf-8
+# --------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for license information.
+# --------------------------------------------------------------------------
+"""
+Helper functions for Azure AI Content Understanding samples.
+"""
+
+import json
+import os
+from datetime import datetime, timezone
+from typing import Any, Optional, Dict
+from enum import Enum
+from azure.ai.contentunderstanding.models import (
+ ContentField,
+)
+
+def get_field_value(fields: Dict[str, ContentField], field_name: str) -> Any:
+ """
+ Extract the actual value from a ContentField using the unified .value property.
+
+ Args:
+ fields: A dictionary of field names to ContentField objects.
+ field_name: The name of the field to extract.
+
+ Returns:
+ The extracted value or None if not found.
+ """
+ if not fields or field_name not in fields:
+ return None
+
+ field_data = fields[field_name]
+
+ # Simply use the .value property which works for all ContentField types
+ return field_data.value
+
+
+class PollerType(Enum):
+ """Enum to distinguish different types of pollers for operation ID extraction."""
+
+ ANALYZER_CREATION = "analyzer_creation"
+ ANALYZE_CALL = "analyze_call"
+ CLASSIFIER_CREATION = "classifier_creation"
+ CLASSIFY_CALL = "classify_call"
+
+
+def save_json_to_file(
+ result, output_dir: str = "test_output", filename_prefix: str = "analysis_result"
+) -> str:
+ """Persist the full AnalyzeResult as JSON and return the file path."""
+ os.makedirs(output_dir, exist_ok=True)
+ timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
+ path = os.path.join(output_dir, f"{filename_prefix}_{timestamp}.json")
+ with open(path, "w", encoding="utf-8") as fp:
+ json.dump(result, fp, indent=2, ensure_ascii=False)
+ print(f"πΎ Analysis result saved to: {path}")
+ return path
+
+
+def extract_operation_id_from_poller(poller: Any, poller_type: PollerType) -> str:
+ """Extract operation ID from an LROPoller or AsyncLROPoller.
+
+ The poller stores the initial response in `_initial_response`, which contains
+ the Operation-Location header. The extraction pattern depends on the poller type:
+ - AnalyzerCreation: https://endpoint/contentunderstanding/operations/{operation_id}?api-version=...
+ - AnalyzeCall: https://endpoint/contentunderstanding/analyzerResults/{operation_id}?api-version=...
+ - ClassifierCreation: https://endpoint/contentunderstanding/operations/{operation_id}?api-version=...
+ - ClassifyCall: https://endpoint/contentunderstanding/classifierResults/{operation_id}?api-version=...
+
+ Args:
+ poller: The LROPoller or AsyncLROPoller instance
+ poller_type: The type of poller (ANALYZER_CREATION, ANALYZE_CALL, CLASSIFIER_CREATION, or CLASSIFY_CALL) - REQUIRED
+
+ Returns:
+ str: The operation ID extracted from the poller
+
+ Raises:
+ ValueError: If no operation ID can be extracted from the poller or if poller_type is not provided
+ """
+ if poller_type is None:
+ raise ValueError("poller_type is required and must be specified")
+ # Extract from Operation-Location header (standard approach)
+ initial_response = poller.polling_method()._initial_response
+ operation_location = initial_response.http_response.headers.get(
+ "Operation-Location"
+ )
+
+ if operation_location:
+ if (
+ poller_type == PollerType.ANALYZER_CREATION
+ or poller_type == PollerType.CLASSIFIER_CREATION
+ ):
+ # Pattern: https://endpoint/.../operations/{operation_id}?api-version=...
+ if "/operations/" in operation_location:
+ operation_id = operation_location.split("/operations/")[1].split("?")[0]
+ return operation_id
+ elif poller_type == PollerType.ANALYZE_CALL:
+ # Pattern: https://endpoint/.../analyzerResults/{operation_id}?api-version=...
+ if "/analyzerResults/" in operation_location:
+ operation_id = operation_location.split("/analyzerResults/")[1].split(
+ "?"
+ )[0]
+ return operation_id
+ elif poller_type == PollerType.CLASSIFY_CALL:
+ # Pattern: https://endpoint/.../classifierResults/{operation_id}?api-version=...
+ if "/classifierResults/" in operation_location:
+ operation_id = operation_location.split("/classifierResults/")[1].split(
+ "?"
+ )[0]
+ return operation_id
+
+ raise ValueError(
+ f"Could not extract operation ID from poller for type {poller_type}"
+ )
+
+
+def save_keyframe_image_to_file(
+ image_content: bytes,
+ keyframe_id: str,
+ test_name: str,
+ test_py_file_dir: str,
+ identifier: Optional[str] = None,
+ output_dir: str = "test_output",
+) -> str:
+ """Save keyframe image to output file using pytest naming convention.
+
+ Args:
+ image_content: The binary image content to save
+ keyframe_id: The keyframe ID (e.g., "keyFrame.1")
+ test_name: Name of the test case (e.g., function name)
+ test_py_file_dir: Directory where pytest files are located
+ identifier: Optional unique identifier to avoid conflicts (e.g., analyzer_id)
+ output_dir: Directory name to save the output file (default: "test_output")
+
+ Returns:
+ str: Path to the saved image file
+
+ Raises:
+ OSError: If there are issues creating directory or writing file
+ """
+ # Generate timestamp and frame ID
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ frame_id = keyframe_id.replace("keyFrame.", "")
+
+ # Create output directory if it doesn't exist
+ output_dir_path = os.path.join(test_py_file_dir, output_dir)
+ os.makedirs(output_dir_path, exist_ok=True)
+
+ # Generate output filename with optional identifier to avoid conflicts
+ if identifier:
+ output_filename = f"{test_name}_{identifier}_{timestamp}_{frame_id}.jpg"
+ else:
+ output_filename = f"{test_name}_{timestamp}_{frame_id}.jpg"
+
+ saved_file_path = os.path.join(output_dir_path, output_filename)
+
+ # Write the image content to file
+ with open(saved_file_path, "wb") as image_file:
+ image_file.write(image_content)
+
+ print(f"πΌοΈ Image file saved to: {saved_file_path}")
+ return saved_file_path
+
+
+def read_image_to_base64(image_path: str) -> str:
+ """Read image file and return base64-encoded string."""
+ import base64
+
+ with open(image_path, "rb") as image_file:
+ image_bytes = image_file.read()
+ return base64.b64encode(image_bytes).decode("utf-8")
+
+
+def read_image_to_base64_bytes(image_path: str) -> bytes:
+ """Read image file and return base64-encoded bytes."""
+ import base64
+
+ with open(image_path, "rb") as image_file:
+ image_bytes = image_file.read()
+ return base64.b64encode(image_bytes)
\ No newline at end of file
diff --git a/tools/test_notebooks/README.md b/tools/test_notebooks/README.md
index 1640213..c8501df 100644
--- a/tools/test_notebooks/README.md
+++ b/tools/test_notebooks/README.md
@@ -34,8 +34,8 @@ Some notebooks require access to Azure Storage or other resources. You may need
- Add the following variables to the [.env](../notebooks/.env) file in your project root:
```env
- TRAINING_DATA_SAS_URL=
- TRAINING_DATA_PATH=
+ training_data_sas_url=
+ training_data_path=
REFERENCE_DOC_SAS_URL=
REFERENCE_DOC_PATH=
```