Skip to content

Commit

Permalink
NIFI-13824 Installed Python Processor Dependencies with one command
Browse files Browse the repository at this point in the history
If a Python processor defines dependencies both inline and in a
requirements.txt file, then we need to install the two groups of
dependencies in a single `pip install` command, otherwise pip is
not able to resolve the web of dependencies correctly.

- Added setup-python step with Python 3.12 to ci-workflow for consistent version behavior

This closes #9429

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
fgerlits authored and exceptionfactory committed Oct 26, 2024
1 parent 5be2cd7 commit 3b3e74d
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 28 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ jobs:
distribution: 'corretto'
java-version: '21'
cache: 'maven'
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Evaluate Changed Paths
uses: dorny/paths-filter@v3
id: changes
Expand Down Expand Up @@ -227,6 +231,10 @@ jobs:
distribution: 'zulu'
java-version: '21'
cache: 'maven'
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Evaluate Changed Paths
uses: dorny/paths-filter@v3
id: changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,37 +285,31 @@ def import_external_dependencies(self, processor_details, work_dir):
logger.info("All dependencies have already been imported for {0}".format(class_name))
return True

python_cmd = os.getenv("PYTHON_CMD")
dependency_references = []

if processor_details.source_location is not None:
package_dir = os.path.dirname(processor_details.source_location)
requirements_file = os.path.join(package_dir, 'requirements.txt')
if os.path.exists(requirements_file):
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir, '-r', requirements_file]

logger.info(f"Importing dependencies from requirements file for package {package_dir} to {target_dir} using command {args}")
result = subprocess.run(args)
dependency_references.append('-r')
dependency_references.append(requirements_file)

if result.returncode == 0:
logger.info(f"Successfully imported requirements for package {package_dir} to {target_dir}")
else:
raise RuntimeError(f"Failed to import requirements for package {package_dir} from requirements.txt file: process exited with status code {result}")
inline_dependencies = processor_details.getDependencies()
for dependency in inline_dependencies:
dependency_references.append(dependency)

dependencies = processor_details.getDependencies()
if len(dependencies) > 0:
if len(dependency_references) > 0:
python_cmd = os.getenv("PYTHON_CMD")
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir]
for dep in dependencies:
args.append(dep)

logger.info(f"Importing dependencies {dependencies} for {class_name} to {target_dir} using command {args}")
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir] + dependency_references
logger.info(f"Installing dependencies {dependency_references} for {class_name} to {target_dir} using command {args}")
result = subprocess.run(args)

if result.returncode == 0:
logger.info(f"Successfully imported requirements for {class_name} to {target_dir}")
logger.info(f"Successfully installed requirements for {class_name} to {target_dir}")
else:
raise RuntimeError(f"Failed to import requirements for {class_name}: process exited with status code {result}")
raise RuntimeError(f"Failed to install requirements for {class_name}: process exited with status code {result}")
else:
logger.info(f"No dependencies to import for {class_name}")
logger.info(f"No dependencies to install for {class_name}")

# Write a completion Marker File
with open(completion_marker_file, "w") as file:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import tempfile
import unittest
from unittest.mock import patch

from ExtensionManager import ExtensionManager
from testutils import set_up_env, get_processor_details

PROCESSOR_WITH_DEPENDENCIES_TEST_FILE = 'src/test/resources/python/framework/processor_with_dependencies/ProcessorWithDependencies.py'

class ReturncodeMocker:
def __init__(self, return_code):
self.returncode = return_code

class TestExtensionManager(unittest.TestCase):
def setUp(self):
set_up_env()
self.extension_manager = ExtensionManager(None)

@patch('subprocess.run')
def test_import_external_dependencies(self, mock_subprocess_run):
details = get_processor_details(self, 'ProcessorWithDependencies', PROCESSOR_WITH_DEPENDENCIES_TEST_FILE, '/extensions/processor_with_dependencies')
self.assertIsNotNone(details)

mock_subprocess_run.return_value = ReturncodeMocker(0)

with tempfile.TemporaryDirectory() as temp_dir:
packages_dir = os.path.join(temp_dir, 'packages')
self.extension_manager.import_external_dependencies(details, packages_dir)

mock_subprocess_run.assert_called_once()

if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import ProcessorInspection
import unittest
from testutils import get_processor_details

DUMMY_PROCESSOR_FILE = 'src/test/python/framework/DummyProcessor.py'
DUMMY_PROCESSOR_FILE = 'src/test/resources/python/framework/dummy_processor/DummyProcessor.py'

class DetectProcessorUseCase(unittest.TestCase):
def test_get_processor_details(self):
class_nodes = ProcessorInspection.get_processor_class_nodes(DUMMY_PROCESSOR_FILE)
self.assertIsNotNone(class_nodes)
self.assertEqual(len(class_nodes), 1)
class_node = class_nodes[0]
self.assertEqual(class_node.name, 'DummyProcessor')

details = ProcessorInspection.get_processor_details(class_node, DUMMY_PROCESSOR_FILE, '/extensions/dummy_processor', False)
details = get_processor_details(self, 'DummyProcessor', DUMMY_PROCESSOR_FILE, '/extensions/dummy_processor')
self.assertIsNotNone(details)
self.assertEqual(details.description, 'Fake Processor')
self.assertEqual(details.tags, ['tag1', 'tag2'])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
from nifiapi.__jvm__ import JvmHolder
import ProcessorInspection

class FakeJvm:
def __init__(self):
self.java = FakeJava()

class FakeJava:
def __init__(self):
self.util = FakeJavaUtil()

class FakeJavaUtil:
def ArrayList(self):
return FakeArrayList([])

class FakeArrayList:
def __init__(self, my_list):
self.my_list = my_list

def __len__(self):
return len(self.my_list)

def __iter__(self):
return iter(self.my_list)

def add(self, element):
self.my_list.append(element)

def set_up_env():
python_command = sys.executable
os.environ["PYTHON_CMD"] = python_command
JvmHolder.jvm = FakeJvm()

def get_processor_details(test_fixture, processor_name, processor_file, extension_home):
class_nodes = ProcessorInspection.get_processor_class_nodes(processor_file)
test_fixture.assertIsNotNone(class_nodes)
test_fixture.assertEqual(len(class_nodes), 1)
class_node = class_nodes[0]
test_fixture.assertEqual(class_node.name, processor_name)

return ProcessorInspection.get_processor_details(class_node, processor_file, extension_home, False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult

class ProcessorWithDependencies(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']

class ProcessorDetails:
description = "This processor depends on both google-cloud-vision and pymilvus"
version = '0.0.1'
tags = ['cloud', 'vision', 'milvus']
dependencies = ['pymilvus==2.4.4']

def __init__(self):
pass

def transform(self, context, flow_file):
self.logger.info("ProcessorWithDependencies is returning")
return FlowFileTransformResult('success', contents='foobar')
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

google-cloud-vision==3.7.4

0 comments on commit 3b3e74d

Please sign in to comment.