Skip to content

Commit

Permalink
NIFI-13491 InvokeHTTP - new property - Response Header Request
Browse files Browse the repository at this point in the history
Attributes Prefix
  • Loading branch information
NissimShiman committed Nov 8, 2024
1 parent d20be80 commit 67305e5
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,22 @@ public class InvokeHTTP extends AbstractProcessor {

public static final PropertyDescriptor RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED = new PropertyDescriptor.Builder()
.name("Response Header Request Attributes Enabled")
.description("Enable adding HTTP response headers as attributes to FlowFiles transferred to the Original relationship.")
.description("Enable adding HTTP response headers as attributes to FlowFiles transferred to the Original, Retry or No Retry relationships.")
.required(false)
.defaultValue(Boolean.FALSE.toString())
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.build();

public static final PropertyDescriptor RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX = new PropertyDescriptor.Builder()
.name("Response Header Request Attributes Prefix")
.description("Prefix to HTTP response headers when included as attributes to FlowFiles transferred to the Original, Retry or No Retry relationships. "
+ "It is recommended to end with a separator character like '.' or '-'.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED, Boolean.TRUE.toString())
.build();

public static final PropertyDescriptor RESPONSE_REDIRECTS_ENABLED = new PropertyDescriptor.Builder()
.name("Response Redirects Enabled")
.description("Enable following HTTP redirects sent with HTTP 300 series responses as described in RFC 7231 Section 6.4.")
Expand Down Expand Up @@ -508,6 +518,7 @@ public class InvokeHTTP extends AbstractProcessor {
RESPONSE_GENERATION_REQUIRED,
RESPONSE_FLOW_FILE_NAMING_STRATEGY,
RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED,
RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX,
RESPONSE_REDIRECTS_ENABLED
);

Expand Down Expand Up @@ -635,6 +646,7 @@ public void migrateProperties(final PropertyConfiguration config) {
config.renameProperty("Always Output Response", RESPONSE_GENERATION_REQUIRED.getName());
config.renameProperty("flow-file-naming-strategy", RESPONSE_FLOW_FILE_NAMING_STRATEGY.getName());
config.renameProperty("Add Response Headers to Request", RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED.getName());
config.renameProperty("Prefix to Added Response Headers to the Request", RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX.getName());
config.renameProperty("Follow Redirects", RESPONSE_REDIRECTS_ENABLED.getName());
}

Expand Down Expand Up @@ -878,13 +890,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
}

// If the property to add the response headers to the request flowfile is true then add them
if (context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED).asBoolean() && requestFlowFile != null) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp));
}

boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(RESPONSE_GENERATION_REQUIRED).asBoolean();
ResponseBody responseBody = responseHttp.body();
Expand Down Expand Up @@ -987,6 +992,18 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
}

// If the property to add the response headers to the request flowfile is true then add them
//
// This needs to be done after the response flowFile has been created from the request flowFile
// as the added attribute headers may have a prefix added that doesn't make sense for the response flowFile.
if (context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED).asBoolean() && requestFlowFile != null) {
String prefix = context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX).evaluateAttributeExpressions(requestFlowFile).getValue();

// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp, prefix));
}

route(requestFlowFile, responseFlowFile, session, context, statusCode);

}
Expand Down Expand Up @@ -1257,6 +1274,14 @@ private String getLogString(Map<String, List<String>> map) {
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
*/
private Map<String, String> convertAttributesFromHeaders(final Response responseHttp) {
return convertAttributesFromHeaders(responseHttp, "");
}

/**
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
* Prefix is passed in to allow differentiation for these new attributes.
*/
private Map<String, String> convertAttributesFromHeaders(final Response responseHttp, String prefix) {
// create a new hashmap to store the values from the connection
final Map<String, String> attributes = new HashMap<>();
final Headers headers = responseHttp.headers();
Expand All @@ -1266,7 +1291,7 @@ private Map<String, String> convertAttributesFromHeaders(final Response response
if (!values.isEmpty()) {
// create a comma separated string from the values, this is stored in the map
final String value = StringUtils.join(values, MULTIPLE_HEADER_DELIMITER);
attributes.put(key, value);
attributes.put(trimToEmpty(prefix) + key, value);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,10 @@ public void testRunGetHttp200SuccessRequestHeaderAttributesAndDynamicProperties(

@Test
public void testRunGetHttp200SuccessResponseHeaderRequestAttributes() {
final String prefix = "response.";
setUrlProperty();
runner.setProperty(InvokeHTTP.RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED, Boolean.TRUE.toString());
runner.setProperty(InvokeHTTP.RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX, prefix);

final String firstHeader = String.class.getSimpleName();
final String secondHeader = Integer.class.getSimpleName();
Expand All @@ -470,10 +472,19 @@ public void testRunGetHttp200SuccessResponseHeaderRequestAttributes() {
assertRelationshipStatusCodeEquals(InvokeHTTP.RESPONSE, HTTP_OK);

final MockFlowFile requestFlowFile = getRequestFlowFile();
requestFlowFile.assertAttributeEquals(CONTENT_LENGTH_HEADER, Integer.toString(0));
requestFlowFile.assertAttributeEquals(prefix + CONTENT_LENGTH_HEADER, Integer.toString(0));
requestFlowFile.assertAttributeNotExists(CONTENT_LENGTH_HEADER);

final String repeatedHeaders = String.format("%s, %s", firstHeader, secondHeader);
requestFlowFile.assertAttributeEquals(REPEATED_HEADER, repeatedHeaders);
requestFlowFile.assertAttributeEquals(prefix + REPEATED_HEADER, repeatedHeaders);
requestFlowFile.assertAttributeNotExists(REPEATED_HEADER);

final MockFlowFile responseFlowFile = getResponseFlowFile();
responseFlowFile.assertAttributeNotExists(prefix + CONTENT_LENGTH_HEADER);
responseFlowFile.assertAttributeEquals(CONTENT_LENGTH_HEADER, Integer.toString(0));

responseFlowFile.assertAttributeNotExists(prefix + REPEATED_HEADER);
responseFlowFile.assertAttributeEquals(REPEATED_HEADER, repeatedHeaders);
}

@Test
Expand Down

0 comments on commit 67305e5

Please sign in to comment.