From 607a76297989550d1afc440df9b9818b8d83d361 Mon Sep 17 00:00:00 2001 From: Lukas Krug Date: Wed, 26 Mar 2025 15:44:05 +0100 Subject: [PATCH] feat(nifi): Restore flows directly from JSON file instead of using versioning (#194) feat: remove versioning from flow restore process --- .../create-nifi-ingestion-job.yaml | 85 ++++++------------ .../create-nifi-ingestion-job.yaml | 86 ++++++------------- .../create-nifi-ingestion-job.yaml | 85 ++++++------------ .../create-nifi-ingestion-job.yaml | 77 ++++++----------- 4 files changed, 105 insertions(+), 228 deletions(-) diff --git a/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml b/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml index 224eff9c..4a5d79b2 100644 --- a/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml +++ b/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml @@ -50,6 +50,7 @@ data: from nipyapi.security import service_login import nipyapi import os + import requests import urllib3 # As of 2022-08-29 we cant use "https://nifi:8443" here because

The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.

@@ -66,70 +67,36 @@ data: service_login(username=USERNAME, password=PASSWORD) print("Logged in") - organization = "stackabletech" - repository = "demos" - branch = "main" - version = "main" - directory = "demos/data-lakehouse-iceberg-trino-spark" - flow_name = "LakehouseKafkaIngest" + response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.json") - # Check if the GitHub flow registry client already exists - flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries + filename = "/tmp/LakehouseKafkaIngest.json" + with open(filename, "wb") as f: + f.write(response.content) - github_client = None - for client in flow_registry_clients: - if client.component.name == "GitHubFlowRegistryClient": - github_client = client - print("Found existing GitHub flow registry client") - break + pg_id = get_root_pg_id() - if not github_client: - print("Creating new GitHub flow registry client") - github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client( - body={ - "revision": {"version": 0}, - "component": { - "name": "GitHubFlowRegistryClient", - "type": "org.apache.nifi.github.GitHubFlowRegistryClient", - "properties": { - "Repository Owner": organization, - "Repository Name": repository, - }, - "bundle": { - "group": "org.apache.nifi", - "artifact": "nifi-github-nar", - "version": "2.2.0", - }, - }, - } - ) + if not nipyapi.config.nifi_config.api_client: + nipyapi.config.nifi_config.api_client = ApiClient() - pg_id = get_root_pg_id() + header_params = {} + header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json']) + header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data']) - try: - # Create process group from the file in the Git repo - nipyapi.nifi.ProcessGroupsApi().create_process_group( - id=pg_id, - body={ - "revision": {"version": 0}, - "component": { - "position": {"x": 300, "y": 10}, - "versionControlInformation": { - "registryId": github_client.component.id, - "flowId": flow_name, - "bucketId": directory, - "branch": branch, - "version": version, - }, - }, - }, - ) - except ValueError as e: - # Ignore, because nipyapi can't handle non-int versions yet - if "invalid literal for int() with base 10" in str(e): - print("Ignoring ValueError") - else: - raise e + nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST', + path_params={'pg_id': pg_id}, + header_params=header_params, + _return_http_data_only=True, + post_params=[ + ('id', pg_id), + ('groupName', 'LakehouseKafkaIngest'), + ('positionX', 100), + ('positionY', 10), + ('clientId', nipyapi.nifi.FlowApi().generate_client_id()), + ], + files={ + 'file': filename + }, + auth_settings=['tokenAuth']) # Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it # To work around this, we try to schedule the controllers multiple times diff --git a/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml b/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml index 2c6eb0e6..f80e7a00 100644 --- a/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml +++ b/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml @@ -59,6 +59,7 @@ data: from nipyapi.security import service_login import nipyapi import os + import requests import urllib3 # As of 2022-08-29 we cant use "https://nifi:8443" here because

The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.

@@ -75,70 +76,35 @@ data: service_login(username=USERNAME, password=PASSWORD) print("Logged in") - organization = "stackabletech" - repository = "demos" - branch = "main" - version = "main" - directory = "demos/nifi-kafka-druid-earthquake-data" - flow_name = "IngestEarthquakesToKafka" + response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-earthquake-data/IngestEarthquakesToKafka.json") + filename = "/tmp/IngestEarthquakesToKafka.json" + with open(filename, "wb") as f: + f.write(response.content) - # Check if the GitHub flow registry client already exists - flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries - - github_client = None - for client in flow_registry_clients: - if client.component.name == "GitHubFlowRegistryClient": - github_client = client - print("Found existing GitHub flow registry client") - break + pg_id = get_root_pg_id() - if not github_client: - print("Creating new GitHub flow registry client") - github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client( - body={ - "revision": {"version": 0}, - "component": { - "name": "GitHubFlowRegistryClient", - "type": "org.apache.nifi.github.GitHubFlowRegistryClient", - "properties": { - "Repository Owner": organization, - "Repository Name": repository, - }, - "bundle": { - "group": "org.apache.nifi", - "artifact": "nifi-github-nar", - "version": "2.2.0", - }, - }, - } - ) + if not nipyapi.config.nifi_config.api_client: + nipyapi.config.nifi_config.api_client = ApiClient() - pg_id = get_root_pg_id() + header_params = {} + header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json']) + header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data']) - try: - # Create process group from the file in the Git repo - nipyapi.nifi.ProcessGroupsApi().create_process_group( - id=pg_id, - body={ - "revision": {"version": 0}, - "component": { - "position": {"x": 300, "y": 10}, - "versionControlInformation": { - "registryId": github_client.component.id, - "flowId": flow_name, - "bucketId": directory, - "branch": branch, - "version": version, - }, - }, - }, - ) - except ValueError as e: - # Ignore, because nipyapi can't handle non-int versions yet - if "invalid literal for int() with base 10" in str(e): - print("Ignoring ValueError") - else: - raise e + nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST', + path_params={'pg_id': pg_id}, + header_params=header_params, + _return_http_data_only=True, + post_params=[ + ('id', pg_id), + ('groupName', 'IngestEarthquakesToKafka'), + ('positionX', 100), + ('positionY', 10), + ('clientId', nipyapi.nifi.FlowApi().generate_client_id()), + ], + files={ + 'file': filename + }, + auth_settings=['tokenAuth']) # Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it # To work around this, we try to schedule the controllers multiple times diff --git a/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml b/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml index 850e7751..ba0cea4a 100644 --- a/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml +++ b/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml @@ -59,6 +59,7 @@ data: from nipyapi.security import service_login import nipyapi import os + import requests import urllib3 # As of 2022-08-29 we cant use "https://nifi:8443" here because

The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.

@@ -75,70 +76,36 @@ data: service_login(username=USERNAME, password=PASSWORD) print("Logged in") - organization = "stackabletech" - repository = "demos" - branch = "main" - version = "main" - directory = "demos/nifi-kafka-druid-water-level-data" - flow_name = "IngestWaterLevelsToKafka" + response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-water-level-data/IngestWaterLevelsToKafka.json") - # Check if the GitHub flow registry client already exists - flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries + filename = "/tmp/IngestWaterLevelsToKafka.json" + with open(filename, "wb") as f: + f.write(response.content) - github_client = None - for client in flow_registry_clients: - if client.component.name == "GitHubFlowRegistryClient": - github_client = client - print("Found existing GitHub flow registry client") - break + pg_id = get_root_pg_id() - if not github_client: - print("Creating new GitHub flow registry client") - github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client( - body={ - "revision": {"version": 0}, - "component": { - "name": "GitHubFlowRegistryClient", - "type": "org.apache.nifi.github.GitHubFlowRegistryClient", - "properties": { - "Repository Owner": organization, - "Repository Name": repository, - }, - "bundle": { - "group": "org.apache.nifi", - "artifact": "nifi-github-nar", - "version": "2.2.0", - }, - }, - } - ) + if not nipyapi.config.nifi_config.api_client: + nipyapi.config.nifi_config.api_client = ApiClient() - pg_id = get_root_pg_id() + header_params = {} + header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json']) + header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data']) - try: - # Create process group from the file in the Git repo - nipyapi.nifi.ProcessGroupsApi().create_process_group( - id=pg_id, - body={ - "revision": {"version": 0}, - "component": { - "position": {"x": 300, "y": 10}, - "versionControlInformation": { - "registryId": github_client.component.id, - "flowId": flow_name, - "bucketId": directory, - "branch": branch, - "version": version, - }, - }, - }, - ) - except ValueError as e: - # Ignore, because nipyapi can't handle non-int versions yet - if "invalid literal for int() with base 10" in str(e): - print("Ignoring ValueError") - else: - raise e + nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST', + path_params={'pg_id': pg_id}, + header_params=header_params, + _return_http_data_only=True, + post_params=[ + ('id', pg_id), + ('groupName', 'IngestWaterLevelsToKafka'), + ('positionX', 100), + ('positionY', 10), + ('clientId', nipyapi.nifi.FlowApi().generate_client_id()), + ], + files={ + 'file': filename + }, + auth_settings=['tokenAuth']) # Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it # To work around this, we try to schedule the controllers multiple times diff --git a/demos/signal-processing/create-nifi-ingestion-job.yaml b/demos/signal-processing/create-nifi-ingestion-job.yaml index 7ad1ed81..1cbaa6c9 100644 --- a/demos/signal-processing/create-nifi-ingestion-job.yaml +++ b/demos/signal-processing/create-nifi-ingestion-job.yaml @@ -70,6 +70,7 @@ data: from nipyapi.security import service_login import nipyapi import os + import requests import urllib3 # As of 2022-08-29 we cant use "https://nifi:8443" here because

The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.

@@ -86,60 +87,36 @@ data: service_login(username=USERNAME, password=PASSWORD) print("Logged in") - organization = "stackabletech" - repository = "demos" - branch = "main" - version = "main" - directory = "demos/signal-processing" - flow_name = "DownloadAndWriteToDB" + response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/signal-processing/DownloadAndWriteToDB.json") - # Register the flow registry client - response = nipyapi.nifi.ControllerApi().create_flow_registry_client( - body={ - "revision": {"version": 0}, - "component": { - "name": "GitHubFlowRegistryClient", - "type": "org.apache.nifi.github.GitHubFlowRegistryClient", - "properties": { - "Repository Owner": organization, - "Repository Name": repository, - }, - "bundle": { - "group": "org.apache.nifi", - "artifact": "nifi-github-nar", - "version": "2.2.0", - }, - }, - } - ) + filename = "/tmp/DownloadAndWriteToDB.json" + with open(filename, "wb") as f: + f.write(response.content) pg_id = get_root_pg_id() - print(f"pgid={pg_id}") - try: - # Create process group from the file in the Git repo - nipyapi.nifi.ProcessGroupsApi().create_process_group( - id=pg_id, - body={ - "revision": {"version": 0}, - "component": { - "position": {"x": 300, "y": 10}, - "versionControlInformation": { - "registryId": response.component.id, - "flowId": flow_name, - "bucketId": directory, - "branch": branch, - "version": version, - }, - }, - }, - ) - except ValueError as e: - # Ignore, because nipyapi can't handle non-int versions yet - if "invalid literal for int() with base 10" in str(e): - print("Ignoring ValueError") - else: - raise e + if not nipyapi.config.nifi_config.api_client: + nipyapi.config.nifi_config.api_client = ApiClient() + + header_params = {} + header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json']) + header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data']) + + nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST', + path_params={'pg_id': pg_id}, + header_params=header_params, + _return_http_data_only=True, + post_params=[ + ('id', pg_id), + ('groupName', 'DownloadAndWriteToDB'), + ('positionX', 100), + ('positionY', 10), + ('clientId', nipyapi.nifi.FlowApi().generate_client_id()), + ], + files={ + 'file': filename + }, + auth_settings=['tokenAuth']) # Update the controller services with the correct password for controller in list_all_controllers(pg_id):