Skip to content

feat(nifi): Restore flows directly from JSON file instead of using versioning #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
import requests
import urllib3

# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
Expand All @@ -66,70 +67,36 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")

organization = "stackabletech"
repository = "demos"
branch = "main"
version = "main"
directory = "demos/data-lakehouse-iceberg-trino-spark"
flow_name = "LakehouseKafkaIngest"
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.json")

# Check if the GitHub flow registry client already exists
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
filename = "/tmp/LakehouseKafkaIngest.json"
with open(filename, "wb") as f:
f.write(response.content)

github_client = None
for client in flow_registry_clients:
if client.component.name == "GitHubFlowRegistryClient":
github_client = client
print("Found existing GitHub flow registry client")
break
pg_id = get_root_pg_id()

if not github_client:
print("Creating new GitHub flow registry client")
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
body={
"revision": {"version": 0},
"component": {
"name": "GitHubFlowRegistryClient",
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
"properties": {
"Repository Owner": organization,
"Repository Name": repository,
},
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-github-nar",
"version": "2.2.0",
},
},
}
)
if not nipyapi.config.nifi_config.api_client:
nipyapi.config.nifi_config.api_client = ApiClient()

pg_id = get_root_pg_id()
header_params = {}
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])

try:
# Create process group from the file in the Git repo
nipyapi.nifi.ProcessGroupsApi().create_process_group(
id=pg_id,
body={
"revision": {"version": 0},
"component": {
"position": {"x": 300, "y": 10},
"versionControlInformation": {
"registryId": github_client.component.id,
"flowId": flow_name,
"bucketId": directory,
"branch": branch,
"version": version,
},
},
},
)
except ValueError as e:
# Ignore, because nipyapi can't handle non-int versions yet
if "invalid literal for int() with base 10" in str(e):
print("Ignoring ValueError")
else:
raise e
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
path_params={'pg_id': pg_id},
header_params=header_params,
_return_http_data_only=True,
post_params=[
('id', pg_id),
('groupName', 'LakehouseKafkaIngest'),
('positionX', 100),
('positionY', 10),
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
],
files={
'file': filename
},
auth_settings=['tokenAuth'])

# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
import requests
import urllib3

# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
Expand All @@ -75,70 +76,35 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")

organization = "stackabletech"
repository = "demos"
branch = "main"
version = "main"
directory = "demos/nifi-kafka-druid-earthquake-data"
flow_name = "IngestEarthquakesToKafka"
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-earthquake-data/IngestEarthquakesToKafka.json")
filename = "/tmp/IngestEarthquakesToKafka.json"
with open(filename, "wb") as f:
f.write(response.content)

# Check if the GitHub flow registry client already exists
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries

github_client = None
for client in flow_registry_clients:
if client.component.name == "GitHubFlowRegistryClient":
github_client = client
print("Found existing GitHub flow registry client")
break
pg_id = get_root_pg_id()

if not github_client:
print("Creating new GitHub flow registry client")
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
body={
"revision": {"version": 0},
"component": {
"name": "GitHubFlowRegistryClient",
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
"properties": {
"Repository Owner": organization,
"Repository Name": repository,
},
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-github-nar",
"version": "2.2.0",
},
},
}
)
if not nipyapi.config.nifi_config.api_client:
nipyapi.config.nifi_config.api_client = ApiClient()

pg_id = get_root_pg_id()
header_params = {}
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])

try:
# Create process group from the file in the Git repo
nipyapi.nifi.ProcessGroupsApi().create_process_group(
id=pg_id,
body={
"revision": {"version": 0},
"component": {
"position": {"x": 300, "y": 10},
"versionControlInformation": {
"registryId": github_client.component.id,
"flowId": flow_name,
"bucketId": directory,
"branch": branch,
"version": version,
},
},
},
)
except ValueError as e:
# Ignore, because nipyapi can't handle non-int versions yet
if "invalid literal for int() with base 10" in str(e):
print("Ignoring ValueError")
else:
raise e
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
path_params={'pg_id': pg_id},
header_params=header_params,
_return_http_data_only=True,
post_params=[
('id', pg_id),
('groupName', 'IngestEarthquakesToKafka'),
('positionX', 100),
('positionY', 10),
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
],
files={
'file': filename
},
auth_settings=['tokenAuth'])

# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
import requests
import urllib3

# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
Expand All @@ -75,70 +76,36 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")

organization = "stackabletech"
repository = "demos"
branch = "main"
version = "main"
directory = "demos/nifi-kafka-druid-water-level-data"
flow_name = "IngestWaterLevelsToKafka"
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-water-level-data/IngestWaterLevelsToKafka.json")

# Check if the GitHub flow registry client already exists
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
filename = "/tmp/IngestWaterLevelsToKafka.json"
with open(filename, "wb") as f:
f.write(response.content)

github_client = None
for client in flow_registry_clients:
if client.component.name == "GitHubFlowRegistryClient":
github_client = client
print("Found existing GitHub flow registry client")
break
pg_id = get_root_pg_id()

if not github_client:
print("Creating new GitHub flow registry client")
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
body={
"revision": {"version": 0},
"component": {
"name": "GitHubFlowRegistryClient",
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
"properties": {
"Repository Owner": organization,
"Repository Name": repository,
},
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-github-nar",
"version": "2.2.0",
},
},
}
)
if not nipyapi.config.nifi_config.api_client:
nipyapi.config.nifi_config.api_client = ApiClient()

pg_id = get_root_pg_id()
header_params = {}
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])

try:
# Create process group from the file in the Git repo
nipyapi.nifi.ProcessGroupsApi().create_process_group(
id=pg_id,
body={
"revision": {"version": 0},
"component": {
"position": {"x": 300, "y": 10},
"versionControlInformation": {
"registryId": github_client.component.id,
"flowId": flow_name,
"bucketId": directory,
"branch": branch,
"version": version,
},
},
},
)
except ValueError as e:
# Ignore, because nipyapi can't handle non-int versions yet
if "invalid literal for int() with base 10" in str(e):
print("Ignoring ValueError")
else:
raise e
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
path_params={'pg_id': pg_id},
header_params=header_params,
_return_http_data_only=True,
post_params=[
('id', pg_id),
('groupName', 'IngestWaterLevelsToKafka'),
('positionX', 100),
('positionY', 10),
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
],
files={
'file': filename
},
auth_settings=['tokenAuth'])

# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
Expand Down
Loading