Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2447 Include the Python processor version in the manifest #1861

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions extensions/python/ExecutePythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ class ExecutePythonProcessor : public core::Processor {
return description_;
}

void setVersion(const std::string& version) {
version_ = version;
}
szaszm marked this conversation as resolved.
Show resolved Hide resolved

const std::optional<std::string>& getVersion() const {
return version_;
}

void setPythonClassName(const std::string& python_class_name) {
python_class_name_ = python_class_name;
}
Expand All @@ -140,6 +148,7 @@ class ExecutePythonProcessor : public core::Processor {
std::vector<core::Property> python_properties_;

std::string description_;
std::optional<std::string> version_;

bool processor_initialized_;
bool python_dynamic_;
Expand Down
2 changes: 1 addition & 1 deletion extensions/python/PythonCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PythonCreator : public minifi::core::CoreComponent {
processor->initialize();
minifi::BundleDetails details;
details.artifact = path.filename().string();
details.version = minifi::AgentBuild::VERSION;
details.version = processor->getVersion().value_or(minifi::AgentBuild::VERSION);
details.group = "python";

minifi::ClassDescription description{
Expand Down
4 changes: 4 additions & 0 deletions extensions/python/PythonProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ void PythonProcessor::setDescription(const std::string& desc) {
processor_->setDescription(desc);
}

void PythonProcessor::setVersion(const std::string& version) {
processor_->setVersion(version);
}

void PythonProcessor::addProperty(const std::string& name, const std::string& description, const std::optional<std::string>& defaultvalue, bool required, bool el, bool sensitive,
const std::optional<int64_t>& property_type_code, gsl::span<const std::string_view> allowable_values, const std::optional<std::string>& controller_service_type_name) {
processor_->addProperty(name, description, defaultvalue, required, el, sensitive, property_type_code, allowable_values, controller_service_type_name);
Expand Down
2 changes: 2 additions & 0 deletions extensions/python/PythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class PythonProcessor {

void setDescription(const std::string& desc);

void setVersion(const std::string& version);

void addProperty(const std::string& name, const std::string& description, const std::optional<std::string>& defaultvalue, bool required, bool el, bool sensitive,
const std::optional<int64_t>& property_type_code, gsl::span<const std::string_view> allowable_values, const std::optional<std::string>& controller_service_type_name);

Expand Down
3 changes: 3 additions & 0 deletions extensions/python/pythonprocessors/nifiapi/processorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def describe(self, processor: Processor):
else:
processor.setDescription(self.__class__.__name__)

if hasattr(self, 'ProcessorDetails') and hasattr(self.ProcessorDetails, 'version'):
processor.setVersion(self.ProcessorDetails.version)

def onInitialize(self, processor: Processor):
processor.setSupportsDynamicProperties()
for property in self.getPropertyDescriptors():
Expand Down
1 change: 1 addition & 0 deletions extensions/python/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ FOREACH(testfile ${EXECUTEPYTHONPROCESSOR_UNIT_TESTS})
add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
ENDFOREACH()

copyTestResources(${CMAKE_SOURCE_DIR}/extensions/python/pythonprocessors/nifiapi ${CMAKE_BINARY_DIR}/bin/resources/minifi-python/nifiapi)
copyTestResources(${CMAKE_CURRENT_SOURCE_DIR}/test_python_scripts/ ${CMAKE_BINARY_DIR}/bin/resources/test_python_scripts/)
message("-- Finished building ${EXTENSIONS_TEST_COUNT} tests for minifi-python-script-extension...")
161 changes: 127 additions & 34 deletions extensions/python/tests/PythonManifestTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const SerializedResponseNode& getNode(const std::vector<SerializedResponseNode>&
for (auto& node : nodes) {
if (node.name == name) return node;
}
FAIL(fmt::format("Node {} was not found", name));
gsl_FailFast();
}

Expand All @@ -63,6 +64,57 @@ TEST_CASE("Python processor's description is part of the manifest") {
" proc.setSupportsDynamicProperties()\n"
" proc.addProperty('Prop1', 'A great property', 'banana', True, False, False, None, ['apple', 'orange', 'banana', 'durian'], None)\n";

const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
#ifdef WIN32
std::filesystem::create_symlink(executable_dir / "minifi-python-script-extension.dll", python_dir / "minifi_native.pyd");
#endif
std::filesystem::copy(executable_dir / "resources" / "minifi-python" / "nifiapi", python_dir / "nifiapi", std::filesystem::copy_options::recursive);
utils::file::create_dir(python_dir / "nifi_python_processors");
std::ofstream{python_dir / "nifi_python_processors" / "MyPyProc3.py"} << R"(
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor, StandardValidators

class MyPyProc3(FlowFileTransform):

class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']

class ProcessorDetails:
version = '1.2.3'
description = "Test processor number three."
dependencies = []

COLOR = PropertyDescriptor(
name="Color",
description="Symbolic name for the combination of frequencies of electromagnetic radiation reflected by the processor.",
allowable_values=['red', 'blue', 'green', 'purple'],
default_value='red',
required=True,
expression_language_scope=ExpressionLanguageScope.NONE
)

MOOD = PropertyDescriptor(
name="Mood",
description="The mental or emotional state of the processor.",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)

def __init__(self, **kwargs):
pass

def getPropertyDescriptors(self):
return [self.COLOR, self.MOOD]

def transform(self, context, flow_file):
color = context.getProperty(self.COLOR).getValue()
mood = context.getProperty(self.MOOD).evaluateAttributeExpressions(flowfile).getValue() or "OK"
user = flow_file.getContentsAsBytes().decode('utf-8')
response = f"Hello {user}! I am a {color} processor. I am feeling {mood}."
return FlowFileTransformResult('success', contents=response.encode('utf-8'))
)";

controller.configuration_->set(minifi::Configuration::nifi_python_processor_dir, python_dir.string());
controller.configuration_->set(minifi::Configuration::nifi_extension_path, "*minifi-python-script*");

Expand All @@ -76,65 +128,65 @@ TEST_CASE("Python processor's description is part of the manifest") {

auto& manifest = getNode(agent_info.serialized_nodes, "agentManifest");

auto findPythonProcessor = [&] (const std::string& name) {
auto findPythonBundle = [&](const std::string& name) {
// each python file gets its own bundle
auto* python_bundle = findNode(manifest.children, [&] (auto& child) {
return child.name == "bundles" && findNode(child.children, [&] (auto& bundle_child) {
auto* python_bundle = findNode(manifest.children, [&](const auto& child) {
return child.name == "bundles" && findNode(child.children, [&](const auto& bundle_child) {
return bundle_child.name == "artifact" && bundle_child.value == name + ".py";
});
});
REQUIRE(python_bundle);
return gsl::make_not_null(python_bundle);
};

auto& py_processors = getNode(getNode(python_bundle->children, "componentManifest").children, "processors");

// single processor in each bundle
REQUIRE(py_processors.children.size() == 1);

return findNode(py_processors.children, [&] (auto& child) {return utils::string::endsWith(child.name, name);});
auto getProcessorNode = [&] (gsl::not_null<const SerializedResponseNode*> bundle) {
auto& processors = getNode(getNode(bundle->children, "componentManifest").children, "processors");
REQUIRE(processors.children.size() == 1);
auto& only_child = processors.children[0];
return gsl::make_not_null(&only_child);
};

{
auto* MyPyProc = findPythonProcessor("MyPyProc");
REQUIRE(MyPyProc);
auto python_bundle = findPythonBundle("MyPyProc");
auto MyPyProc = getProcessorNode(python_bundle);

REQUIRE(getNode(MyPyProc->children, "inputRequirement").value == "INPUT_ALLOWED");
REQUIRE(getNode(MyPyProc->children, "isSingleThreaded").value == true);
REQUIRE(getNode(MyPyProc->children, "typeDescription").value == "An amazing processor");
REQUIRE(getNode(MyPyProc->children, "supportsDynamicRelationships").value == false);
REQUIRE(getNode(MyPyProc->children, "supportsDynamicProperties").value == false);
REQUIRE(getNode(MyPyProc->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc");
CHECK(getNode(MyPyProc->children, "inputRequirement").value == "INPUT_ALLOWED");
CHECK(getNode(MyPyProc->children, "isSingleThreaded").value == true);
CHECK(getNode(MyPyProc->children, "typeDescription").value == "An amazing processor");
CHECK(getNode(MyPyProc->children, "supportsDynamicRelationships").value == false);
CHECK(getNode(MyPyProc->children, "supportsDynamicProperties").value == false);
CHECK(getNode(MyPyProc->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc");

auto& rels = getNode(MyPyProc->children, "supportedRelationships").children;
REQUIRE(rels.size() == 3);

auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";});
REQUIRE(success);
REQUIRE(getNode(success->children, "description").value == "Script succeeds");
CHECK(getNode(success->children, "description").value == "Script succeeds");

auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";});
REQUIRE(failure);
REQUIRE(getNode(failure->children, "description").value == "Script fails");
}


{
auto* MyPyProc2 = findPythonProcessor("MyPyProc2");
REQUIRE(MyPyProc2);
auto python_bundle = findPythonBundle("MyPyProc2");
auto MyPyProc2 = getProcessorNode(python_bundle);

REQUIRE(getNode(MyPyProc2->children, "inputRequirement").value == "INPUT_ALLOWED");
REQUIRE(getNode(MyPyProc2->children, "isSingleThreaded").value == true);
REQUIRE(getNode(MyPyProc2->children, "typeDescription").value == "Another amazing processor");
REQUIRE(getNode(MyPyProc2->children, "supportsDynamicRelationships").value == false);
REQUIRE(getNode(MyPyProc2->children, "supportsDynamicProperties").value == true);
REQUIRE(getNode(MyPyProc2->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc2");
CHECK(getNode(MyPyProc2->children, "inputRequirement").value == "INPUT_ALLOWED");
CHECK(getNode(MyPyProc2->children, "isSingleThreaded").value == true);
CHECK(getNode(MyPyProc2->children, "typeDescription").value == "Another amazing processor");
CHECK(getNode(MyPyProc2->children, "supportsDynamicRelationships").value == false);
CHECK(getNode(MyPyProc2->children, "supportsDynamicProperties").value == true);
CHECK(getNode(MyPyProc2->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc2");

auto& properties = getNode(MyPyProc2->children, "propertyDescriptors").children;
REQUIRE(properties.size() == 1);
REQUIRE(properties[0].name == "Prop1");
REQUIRE(getNode(properties[0].children, "name").value == "Prop1");
REQUIRE(getNode(properties[0].children, "required").value == true);
REQUIRE(getNode(properties[0].children, "expressionLanguageScope").value == "NONE");
REQUIRE(getNode(properties[0].children, "defaultValue").value == "banana");
CHECK(properties[0].name == "Prop1");
CHECK(getNode(properties[0].children, "name").value == "Prop1");
CHECK(getNode(properties[0].children, "required").value == true);
CHECK(getNode(properties[0].children, "expressionLanguageScope").value == "NONE");
CHECK(getNode(properties[0].children, "defaultValue").value == "banana");
auto& allowable_values = getNode(properties[0].children, "allowableValues");
REQUIRE(allowable_values.children.size() == 4);
CHECK(getNthAllowableValue(allowable_values, 0) == "apple");
Expand All @@ -147,10 +199,51 @@ TEST_CASE("Python processor's description is part of the manifest") {

auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";});
REQUIRE(success);
REQUIRE(getNode(success->children, "description").value == "Script succeeds");
CHECK(getNode(success->children, "description").value == "Script succeeds");

auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";});
REQUIRE(failure);
REQUIRE(getNode(failure->children, "description").value == "Script fails");
CHECK(getNode(failure->children, "description").value == "Script fails");
}

{
auto python_bundle = findPythonBundle("MyPyProc3");
auto MyPyProc3 = getProcessorNode(python_bundle);

CHECK(getNode(python_bundle->children, "version").value == "1.2.3");

CHECK(getNode(MyPyProc3->children, "inputRequirement").value == "INPUT_ALLOWED");
CHECK(getNode(MyPyProc3->children, "isSingleThreaded").value == true);
CHECK(getNode(MyPyProc3->children, "typeDescription").value == "Test processor number three.");
CHECK(getNode(MyPyProc3->children, "supportsDynamicRelationships").value == false);
CHECK(getNode(MyPyProc3->children, "supportsDynamicProperties").value == true);
CHECK(getNode(MyPyProc3->children, "type").value == "org.apache.nifi.minifi.processors.nifi_python_processors.MyPyProc3");

auto& properties = getNode(MyPyProc3->children, "propertyDescriptors").children;
REQUIRE(properties.size() == 2);
CHECK(properties[0].name == "Color");
CHECK(getNode(properties[0].children, "name").value == "Color");
CHECK(getNode(properties[0].children, "required").value == true);
CHECK(getNode(properties[0].children, "expressionLanguageScope").value == "NONE");
CHECK(getNode(properties[0].children, "defaultValue").value == "red");
CHECK(properties[1].name == "Mood");
CHECK(getNode(properties[1].children, "name").value == "Mood");
CHECK(getNode(properties[1].children, "required").value == false);
CHECK(getNode(properties[1].children, "expressionLanguageScope").value == "FLOWFILE_ATTRIBUTES");

auto& rels = getNode(MyPyProc3->children, "supportedRelationships").children;
REQUIRE(rels.size() == 3);

auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";});
REQUIRE(success);
CHECK(getNode(success->children, "description").value == "Script succeeds");

auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";});
REQUIRE(failure);
CHECK(getNode(failure->children, "description").value == "Script fails");

auto* original = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "original";});
REQUIRE(original);
CHECK(getNode(original->children, "description").value == "Original flow file");
}
}
16 changes: 16 additions & 0 deletions extensions/python/types/PyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern "C" {
static PyMethodDef PyProcessor_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays)
{"setSupportsDynamicProperties", (PyCFunction) PyProcessor::setSupportsDynamicProperties, METH_VARARGS, nullptr},
{"setDescription", (PyCFunction) PyProcessor::setDescription, METH_VARARGS, nullptr},
{"setVersion", (PyCFunction) PyProcessor::setVersion, METH_VARARGS, nullptr},
{"addProperty", (PyCFunction) PyProcessor::addProperty, METH_VARARGS, nullptr},
{} /* Sentinel */
};
Expand Down Expand Up @@ -103,6 +104,21 @@ PyObject* PyProcessor::setDescription(PyProcessor* self, PyObject* args) {
Py_RETURN_NONE;
}

PyObject* PyProcessor::setVersion(PyProcessor* self, PyObject* args) {
auto processor = self->processor_.lock();
if (!processor) {
PyErr_SetString(PyExc_AttributeError, "tried reading processor outside 'on_trigger'");
return nullptr;
}

const char* version = nullptr;
if (!PyArg_ParseTuple(args, "s", &version)) {
return nullptr;
}
processor->setVersion(std::string(version));
Py_RETURN_NONE;
}

PyObject* PyProcessor::addProperty(PyProcessor* self, PyObject* args) {
auto processor = self->processor_.lock();
if (!processor) {
Expand Down
1 change: 1 addition & 0 deletions extensions/python/types/PyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct PyProcessor {

static PyObject* setSupportsDynamicProperties(PyProcessor* self, PyObject* args);
static PyObject* setDescription(PyProcessor* self, PyObject* args);
static PyObject* setVersion(PyProcessor* self, PyObject* args);
static PyObject* addProperty(PyProcessor* self, PyObject* args);

static PyTypeObject* typeObject();
Expand Down
12 changes: 12 additions & 0 deletions libminifi/test/libtest/unit/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
#include <utility>
#include <vector>

#include "core/state/Value.h"
#include "utils/file/FileUtils.h"
#include "utils/Id.h"
#include "utils/TimeUtil.h"
#include "TestBase.h"

#include "catch2/catch_tostring.hpp"
#include "fmt/format.h"
#include "rapidjson/document.h"
#include "asio.hpp"
#include "asio/ssl.hpp"
Expand Down Expand Up @@ -229,3 +232,12 @@ inline bool runningAsUnixRoot() {
#endif
}
} // namespace org::apache::nifi::minifi::test::utils

namespace Catch {
template <>
struct StringMaker<minifi::state::response::ValueNode> {
static std::string convert(const minifi::state::response::ValueNode& value_node) {
return fmt::format(R"("{}")", value_node.to_string());
}
};
} // namespace Catch