Skip to content

Commit

Permalink
MINIFICPP-2447 Include Python processor version in the manifest
Browse files Browse the repository at this point in the history
NiFi python processors which define a ProcessorDetails.version field should
show this version number in the manifest. For all other processors, we will
continue to use the MiNiFi version.
  • Loading branch information
fgerlits committed Sep 6, 2024
1 parent b27a64f commit c8465fd
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 35 deletions.
9 changes: 9 additions & 0 deletions extensions/python/ExecutePythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ class ExecutePythonProcessor : public core::Processor {
return description_;
}

void setVersion(const std::string& version) {
version_ = version;
}

const std::optional<std::string>& getVersion() const {
return version_;
}

void setPythonClassName(const std::string& python_class_name) {
python_class_name_ = python_class_name;
}
Expand All @@ -140,6 +148,7 @@ class ExecutePythonProcessor : public core::Processor {
std::vector<core::Property> python_properties_;

std::string description_;
std::optional<std::string> version_;

bool processor_initialized_;
bool python_dynamic_;
Expand Down
2 changes: 1 addition & 1 deletion extensions/python/PythonCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PythonCreator : public minifi::core::CoreComponent {
processor->initialize();
minifi::BundleDetails details;
details.artifact = path.filename().string();
details.version = minifi::AgentBuild::VERSION;
details.version = processor->getVersion().value_or(minifi::AgentBuild::VERSION);
details.group = "python";

minifi::ClassDescription description{
Expand Down
4 changes: 4 additions & 0 deletions extensions/python/PythonProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ void PythonProcessor::setDescription(const std::string& desc) {
processor_->setDescription(desc);
}

void PythonProcessor::setVersion(const std::string& version) {
processor_->setVersion(version);
}

void PythonProcessor::addProperty(const std::string& name, const std::string& description, const std::optional<std::string>& defaultvalue, bool required, bool el, bool sensitive,
const std::optional<int64_t>& property_type_code, gsl::span<const std::string_view> allowable_values, const std::optional<std::string>& controller_service_type_name) {
processor_->addProperty(name, description, defaultvalue, required, el, sensitive, property_type_code, allowable_values, controller_service_type_name);
Expand Down
2 changes: 2 additions & 0 deletions extensions/python/PythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class PythonProcessor {

void setDescription(const std::string& desc);

void setVersion(const std::string& version);

void addProperty(const std::string& name, const std::string& description, const std::optional<std::string>& defaultvalue, bool required, bool el, bool sensitive,
const std::optional<int64_t>& property_type_code, gsl::span<const std::string_view> allowable_values, const std::optional<std::string>& controller_service_type_name);

Expand Down
3 changes: 3 additions & 0 deletions extensions/python/pythonprocessors/nifiapi/processorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def describe(self, processor: Processor):
else:
processor.setDescription(self.__class__.__name__)

if hasattr(self, 'ProcessorDetails') and hasattr(self.ProcessorDetails, 'version'):
processor.setVersion(self.ProcessorDetails.version)

def onInitialize(self, processor: Processor):
processor.setSupportsDynamicProperties()
for property in self.getPropertyDescriptors():
Expand Down
1 change: 1 addition & 0 deletions extensions/python/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ FOREACH(testfile ${EXECUTEPYTHONPROCESSOR_UNIT_TESTS})
add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
ENDFOREACH()

copyTestResources(${CMAKE_SOURCE_DIR}/extensions/python/pythonprocessors/nifiapi ${CMAKE_BINARY_DIR}/bin/resources/minifi-python/nifiapi)
copyTestResources(${CMAKE_CURRENT_SOURCE_DIR}/test_python_scripts/ ${CMAKE_BINARY_DIR}/bin/resources/test_python_scripts/)
message("-- Finished building ${EXTENSIONS_TEST_COUNT} tests for minifi-python-script-extension...")
161 changes: 127 additions & 34 deletions extensions/python/tests/PythonManifestTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const SerializedResponseNode& getNode(const std::vector<SerializedResponseNode>&
for (auto& node : nodes) {
if (node.name == name) return node;
}
FAIL(fmt::format("Node {} was not found", name));
gsl_FailFast();
}

Expand All @@ -63,6 +64,57 @@ TEST_CASE("Python processor's description is part of the manifest") {
" proc.setSupportsDynamicProperties()\n"
" proc.addProperty('Prop1', 'A great property', 'banana', True, False, False, None, ['apple', 'orange', 'banana', 'durian'], None)\n";

const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
#ifdef WIN32
std::filesystem::create_symlink(executable_dir / "minifi-python-script-extension.dll", python_dir / "minifi_native.pyd");
#endif
std::filesystem::copy(executable_dir / "resources" / "minifi-python" / "nifiapi", python_dir / "nifiapi", std::filesystem::copy_options::recursive);
utils::file::create_dir(python_dir / "nifi_python_processors");
std::ofstream{python_dir / "nifi_python_processors" / "MyPyProc3.py"} << R"(
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor, StandardValidators
class MyPyProc3(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '1.2.3'
description = "Test processor number three."
dependencies = []
COLOR = PropertyDescriptor(
name="Color",
description="Symbolic name for the combination of frequencies of electromagnetic radiation reflected by the processor.",
allowable_values=['red', 'blue', 'green', 'purple'],
default_value='red',
required=True,
expression_language_scope=ExpressionLanguageScope.NONE
)
MOOD = PropertyDescriptor(
name="Mood",
description="The mental or emotional state of the processor.",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return [self.COLOR, self.MOOD]
def transform(self, context, flow_file):
color = context.getProperty(self.COLOR).getValue()
mood = context.getProperty(self.MOOD).evaluateAttributeExpressions(flowfile).getValue() or "OK"
user = flow_file.getContentsAsBytes().decode('utf-8')
response = f"Hello {user}! I am a {color} processor. I am feeling {mood}."
return FlowFileTransformResult('success', contents=response.encode('utf-8'))
)";

controller.configuration_->set(minifi::Configuration::nifi_python_processor_dir, python_dir.string());
controller.configuration_->set(minifi::Configuration::nifi_extension_path, "*minifi-python-script*");

Expand All @@ -76,65 +128,65 @@ TEST_CASE("Python processor's description is part of the manifest") {

auto& manifest = getNode(agent_info.serialized_nodes, "agentManifest");

auto findPythonProcessor = [&] (const std::string& name) {
auto findPythonBundle = [&](const std::string& name) {
// each python file gets its own bundle
auto* python_bundle = findNode(manifest.children, [&] (auto& child) {
return child.name == "bundles" && findNode(child.children, [&] (auto& bundle_child) {
auto* python_bundle = findNode(manifest.children, [&](const auto& child) {
return child.name == "bundles" && findNode(child.children, [&](const auto& bundle_child) {
return bundle_child.name == "artifact" && bundle_child.value == name + ".py";
});
});
REQUIRE(python_bundle);
return gsl::make_not_null(python_bundle);
};

auto& py_processors = getNode(getNode(python_bundle->children, "componentManifest").children, "processors");

// single processor in each bundle
REQUIRE(py_processors.children.size() == 1);

return findNode(py_processors.children, [&] (auto& child) {return utils::string::endsWith(child.name, name);});
auto getProcessorNode = [&] (gsl::not_null<const SerializedResponseNode*> bundle) {
auto& processors = getNode(getNode(bundle->children, "componentManifest").children, "processors");
REQUIRE(processors.children.size() == 1);
auto& only_child = processors.children[0];
return gsl::make_not_null(&only_child);
};

{
auto* MyPyProc = findPythonProcessor("MyPyProc");
REQUIRE(MyPyProc);
auto python_bundle = findPythonBundle("MyPyProc");
auto MyPyProc = getProcessorNode(python_bundle);

REQUIRE(getNode(MyPyProc->children, "inputRequirement").value == "INPUT_ALLOWED");
REQUIRE(getNode(MyPyProc->children, "isSingleThreaded").value == true);
REQUIRE(getNode(MyPyProc->children, "typeDescription").value == "An amazing processor");
REQUIRE(getNode(MyPyProc->children, "supportsDynamicRelationships").value == false);
REQUIRE(getNode(MyPyProc->children, "supportsDynamicProperties").value == false);
REQUIRE(getNode(MyPyProc->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc");
CHECK(getNode(MyPyProc->children, "inputRequirement").value == "INPUT_ALLOWED");
CHECK(getNode(MyPyProc->children, "isSingleThreaded").value == true);
CHECK(getNode(MyPyProc->children, "typeDescription").value == "An amazing processor");
CHECK(getNode(MyPyProc->children, "supportsDynamicRelationships").value == false);
CHECK(getNode(MyPyProc->children, "supportsDynamicProperties").value == false);
CHECK(getNode(MyPyProc->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc");

auto& rels = getNode(MyPyProc->children, "supportedRelationships").children;
REQUIRE(rels.size() == 3);

auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";});
REQUIRE(success);
REQUIRE(getNode(success->children, "description").value == "Script succeeds");
CHECK(getNode(success->children, "description").value == "Script succeeds");

auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";});
REQUIRE(failure);
REQUIRE(getNode(failure->children, "description").value == "Script fails");
}


{
auto* MyPyProc2 = findPythonProcessor("MyPyProc2");
REQUIRE(MyPyProc2);
auto python_bundle = findPythonBundle("MyPyProc2");
auto MyPyProc2 = getProcessorNode(python_bundle);

REQUIRE(getNode(MyPyProc2->children, "inputRequirement").value == "INPUT_ALLOWED");
REQUIRE(getNode(MyPyProc2->children, "isSingleThreaded").value == true);
REQUIRE(getNode(MyPyProc2->children, "typeDescription").value == "Another amazing processor");
REQUIRE(getNode(MyPyProc2->children, "supportsDynamicRelationships").value == false);
REQUIRE(getNode(MyPyProc2->children, "supportsDynamicProperties").value == true);
REQUIRE(getNode(MyPyProc2->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc2");
CHECK(getNode(MyPyProc2->children, "inputRequirement").value == "INPUT_ALLOWED");
CHECK(getNode(MyPyProc2->children, "isSingleThreaded").value == true);
CHECK(getNode(MyPyProc2->children, "typeDescription").value == "Another amazing processor");
CHECK(getNode(MyPyProc2->children, "supportsDynamicRelationships").value == false);
CHECK(getNode(MyPyProc2->children, "supportsDynamicProperties").value == true);
CHECK(getNode(MyPyProc2->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc2");

auto& properties = getNode(MyPyProc2->children, "propertyDescriptors").children;
REQUIRE(properties.size() == 1);
REQUIRE(properties[0].name == "Prop1");
REQUIRE(getNode(properties[0].children, "name").value == "Prop1");
REQUIRE(getNode(properties[0].children, "required").value == true);
REQUIRE(getNode(properties[0].children, "expressionLanguageScope").value == "NONE");
REQUIRE(getNode(properties[0].children, "defaultValue").value == "banana");
CHECK(properties[0].name == "Prop1");
CHECK(getNode(properties[0].children, "name").value == "Prop1");
CHECK(getNode(properties[0].children, "required").value == true);
CHECK(getNode(properties[0].children, "expressionLanguageScope").value == "NONE");
CHECK(getNode(properties[0].children, "defaultValue").value == "banana");
auto& allowable_values = getNode(properties[0].children, "allowableValues");
REQUIRE(allowable_values.children.size() == 4);
CHECK(getNthAllowableValue(allowable_values, 0) == "apple");
Expand All @@ -147,10 +199,51 @@ TEST_CASE("Python processor's description is part of the manifest") {

auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";});
REQUIRE(success);
REQUIRE(getNode(success->children, "description").value == "Script succeeds");
CHECK(getNode(success->children, "description").value == "Script succeeds");

auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";});
REQUIRE(failure);
REQUIRE(getNode(failure->children, "description").value == "Script fails");
CHECK(getNode(failure->children, "description").value == "Script fails");
}

{
auto python_bundle = findPythonBundle("MyPyProc3");
auto MyPyProc3 = getProcessorNode(python_bundle);

CHECK(getNode(python_bundle->children, "version").value == "1.2.3");

CHECK(getNode(MyPyProc3->children, "inputRequirement").value == "INPUT_ALLOWED");
CHECK(getNode(MyPyProc3->children, "isSingleThreaded").value == true);
CHECK(getNode(MyPyProc3->children, "typeDescription").value == "Test processor number three.");
CHECK(getNode(MyPyProc3->children, "supportsDynamicRelationships").value == false);
CHECK(getNode(MyPyProc3->children, "supportsDynamicProperties").value == true);
CHECK(getNode(MyPyProc3->children, "type").value == "org.apache.nifi.minifi.processors.nifi_python_processors.MyPyProc3");

auto& properties = getNode(MyPyProc3->children, "propertyDescriptors").children;
REQUIRE(properties.size() == 2);
CHECK(properties[0].name == "Color");
CHECK(getNode(properties[0].children, "name").value == "Color");
CHECK(getNode(properties[0].children, "required").value == true);
CHECK(getNode(properties[0].children, "expressionLanguageScope").value == "NONE");
CHECK(getNode(properties[0].children, "defaultValue").value == "red");
CHECK(properties[1].name == "Mood");
CHECK(getNode(properties[1].children, "name").value == "Mood");
CHECK(getNode(properties[1].children, "required").value == false);
CHECK(getNode(properties[1].children, "expressionLanguageScope").value == "FLOWFILE_ATTRIBUTES");

auto& rels = getNode(MyPyProc3->children, "supportedRelationships").children;
REQUIRE(rels.size() == 3);

auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";});
REQUIRE(success);
CHECK(getNode(success->children, "description").value == "Script succeeds");

auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";});
REQUIRE(failure);
CHECK(getNode(failure->children, "description").value == "Script fails");

auto* original = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "original";});
REQUIRE(original);
CHECK(getNode(original->children, "description").value == "Original flow file");
}
}
16 changes: 16 additions & 0 deletions extensions/python/types/PyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern "C" {
static PyMethodDef PyProcessor_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays)
{"setSupportsDynamicProperties", (PyCFunction) PyProcessor::setSupportsDynamicProperties, METH_VARARGS, nullptr},
{"setDescription", (PyCFunction) PyProcessor::setDescription, METH_VARARGS, nullptr},
{"setVersion", (PyCFunction) PyProcessor::setVersion, METH_VARARGS, nullptr},
{"addProperty", (PyCFunction) PyProcessor::addProperty, METH_VARARGS, nullptr},
{} /* Sentinel */
};
Expand Down Expand Up @@ -103,6 +104,21 @@ PyObject* PyProcessor::setDescription(PyProcessor* self, PyObject* args) {
Py_RETURN_NONE;
}

PyObject* PyProcessor::setVersion(PyProcessor* self, PyObject* args) {
auto processor = self->processor_.lock();
if (!processor) {
PyErr_SetString(PyExc_AttributeError, "tried reading processor outside 'on_trigger'");
return nullptr;
}

const char* version = nullptr;
if (!PyArg_ParseTuple(args, "s", &version)) {
return nullptr;
}
processor->setVersion(std::string(version));
Py_RETURN_NONE;
}

PyObject* PyProcessor::addProperty(PyProcessor* self, PyObject* args) {
auto processor = self->processor_.lock();
if (!processor) {
Expand Down
1 change: 1 addition & 0 deletions extensions/python/types/PyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct PyProcessor {

static PyObject* setSupportsDynamicProperties(PyProcessor* self, PyObject* args);
static PyObject* setDescription(PyProcessor* self, PyObject* args);
static PyObject* setVersion(PyProcessor* self, PyObject* args);
static PyObject* addProperty(PyProcessor* self, PyObject* args);

static PyTypeObject* typeObject();
Expand Down
12 changes: 12 additions & 0 deletions libminifi/test/libtest/unit/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
#include <utility>
#include <vector>

#include "core/state/Value.h"
#include "utils/file/FileUtils.h"
#include "utils/Id.h"
#include "utils/TimeUtil.h"
#include "TestBase.h"

#include "catch2/catch_tostring.hpp"
#include "fmt/format.h"
#include "rapidjson/document.h"
#include "asio.hpp"
#include "asio/ssl.hpp"
Expand Down Expand Up @@ -229,3 +232,12 @@ inline bool runningAsUnixRoot() {
#endif
}
} // namespace org::apache::nifi::minifi::test::utils

namespace Catch {
template <>
struct StringMaker<minifi::state::response::ValueNode> {
static std::string convert(const minifi::state::response::ValueNode& value_node) {
return fmt::format(R"("{}")", value_node.to_string());
}
};
} // namespace Catch

0 comments on commit c8465fd

Please sign in to comment.