Skip to content

Commit

Permalink
NIFI-13963 Default to Drop Unknown Fields in JSON Record Reader (#9485)
Browse files Browse the repository at this point in the history
- Ensured that unknown fields are not included in output schema for JSON Records

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
dan-s1 authored Nov 12, 2024
1 parent 2453fa1 commit 9969eca
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,6 @@ public void setValue(final String fieldName, final Object value) {
private Optional<RecordField> setValueAndGetField(final String fieldName, final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (!field.isPresent()) {
if (dropUnknownFields) {
return field;
}

final Object previousValue = values.put(fieldName, value);
if (!Objects.equals(value, previousValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ protected Supplier<DateFormat> getLazyTimestampFormat() {
return lazyTimestampFormat;
}

@Override
public Record nextRecord() throws IOException, MalformedRecordException {
return nextRecord(true, true);
}

@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,22 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {

final Map<String, Object> values = new LinkedHashMap<>(schema.getFieldCount() * 2);
final JsonNode jsonNodeForSerialization;

if (dropUnknown) {
jsonNodeForSerialization = jsonNode.deepCopy();

// Delete unknown fields for updated serialized representation
final Iterator<Map.Entry<String, JsonNode>> fields = jsonNodeForSerialization.fields();
while (fields.hasNext()) {
final Map.Entry<String, JsonNode> field = fields.next();
final String fieldName = field.getKey();
final Optional<RecordField> recordField = schema.getField(fieldName);
if (!recordField.isPresent()) {
fields.remove();
}
}

for (final RecordField recordField : schema.getFields()) {
final JsonNode childNode = getChildNode(jsonNode, recordField);
if (childNode == null) {
Expand All @@ -169,6 +183,7 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche
values.put(fieldName, value);
}
} else {
jsonNodeForSerialization = jsonNode;
final Iterator<String> fieldNames = jsonNode.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
Expand All @@ -189,7 +204,7 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche
}
}

final Supplier<String> supplier = jsonNode::toString;
final Supplier<String> supplier = jsonNodeForSerialization::toString;
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@
<exclude>src/test/resources/TestConvertRecord/input/person_bad_enum.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person_long_id.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person_dropfield.json</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
<exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Pretty-printing is not portable across operating systems")
Expand Down Expand Up @@ -404,4 +405,37 @@ public void testDateConversionWithUTCMinusTimezone() throws Exception {
}
}
}

@Test
public void testJSONDroppingUnkownFields() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);

final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));

runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);

final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);

runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json"));

runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");

runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);

MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final String content = flowFile.getContent();
assertFalse(content.contains("fieldThatShouldBeRemoved"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[ {
"id" : 485,
"name" : {
"last" : "Doe",
"first" : "John"
},
"status" : "ACTIVE",
"fieldThatShouldBeRemoved": "Test"
} ]

0 comments on commit 9969eca

Please sign in to comment.