From 9969eca817e8f1b7311ec42a5e9f1e30a708494f Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Tue, 12 Nov 2024 09:50:10 -0500 Subject: [PATCH] NIFI-13963 Default to Drop Unknown Fields in JSON Record Reader (#9485) - Ensured that unknown fields are not included in output schema for JSON Records Signed-off-by: David Handermann --- .../nifi/serialization/record/MapRecord.java | 3 -- .../json/AbstractJsonRowRecordReader.java | 4 +++ .../nifi/json/JsonTreeRowRecordReader.java | 17 +++++++++- .../nifi-standard-processors/pom.xml | 1 + .../standard/TestConvertRecord.java | 34 +++++++++++++++++++ .../input/person_dropfield.json | 9 +++++ 6 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index a3c5dd522d82..059d6c8d9187 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -514,9 +514,6 @@ public void setValue(final String fieldName, final Object value) { private Optional setValueAndGetField(final String fieldName, final Object value) { final Optional field = getSchema().getField(fieldName); if (!field.isPresent()) { - if (dropUnknownFields) { - return field; - } final Object previousValue = values.put(fieldName, value); if (!Objects.equals(value, previousValue)) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index 0fef6d50c126..ba1ddab87369 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -185,6 +185,10 @@ protected Supplier getLazyTimestampFormat() { return lazyTimestampFormat; } + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true, true); + } @Override public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 489474e804dc..8655ec0386f6 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -147,8 +147,22 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { final Map values = new LinkedHashMap<>(schema.getFieldCount() * 2); + final JsonNode jsonNodeForSerialization; if (dropUnknown) { + jsonNodeForSerialization = jsonNode.deepCopy(); + + // Delete unknown fields for updated serialized representation + final Iterator> fields = jsonNodeForSerialization.fields(); + while (fields.hasNext()) { + final Map.Entry field = fields.next(); + final String fieldName = field.getKey(); + final Optional recordField = schema.getField(fieldName); + if (!recordField.isPresent()) { + fields.remove(); + } + } + for (final RecordField recordField : schema.getFields()) { final JsonNode childNode = getChildNode(jsonNode, recordField); if (childNode == null) { @@ -169,6 +183,7 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche values.put(fieldName, value); } } else { + jsonNodeForSerialization = jsonNode; final Iterator fieldNames = jsonNode.fieldNames(); while (fieldNames.hasNext()) { final String fieldName = fieldNames.next(); @@ -189,7 +204,7 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche } } - final Supplier supplier = jsonNode::toString; + final Supplier supplier = jsonNodeForSerialization::toString; return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 231df0e370d6..43147a765f76 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -557,6 +557,7 @@ src/test/resources/TestConvertRecord/input/person_bad_enum.json src/test/resources/TestConvertRecord/input/person_long_id.json src/test/resources/TestConvertRecord/input/person.json + src/test/resources/TestConvertRecord/input/person_dropfield.json src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc src/test/resources/TestConvertRecord/schema/person.avsc src/test/resources/TestCountText/jabberwocky.txt diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java index 189caecc98a0..a99f0943fd2d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java @@ -51,6 +51,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledOnOs(value = OS.WINDOWS, disabledReason = "Pretty-printing is not portable across operating systems") @@ -404,4 +405,37 @@ public void testDateConversionWithUTCMinusTimezone() throws Exception { } } } + + @Test + public void testJSONDroppingUnkownFields() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json")); + + runner.setProperty(ConvertRecord.RECORD_READER, "reader"); + runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + final String content = flowFile.getContent(); + assertFalse(content.contains("fieldThatShouldBeRemoved")); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json new file mode 100644 index 000000000000..50f955f5d202 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json @@ -0,0 +1,9 @@ +[ { + "id" : 485, + "name" : { + "last" : "Doe", + "first" : "John" + }, + "status" : "ACTIVE", + "fieldThatShouldBeRemoved": "Test" +} ] \ No newline at end of file