From f15b7caaa483588869553cc7f84f98dfcbbd475a Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Tue, 12 Nov 2024 09:24:30 -0500 Subject: [PATCH] NIFI-13989 Reduced constructors in JsonTreeRowRecordReader to one (#9506) Signed-off-by: David Handermann --- .../nifi/json/JsonTreeRowRecordReader.java | 17 -- .../processor/RecordTransformProxy.java | 4 +- .../salesforce/QuerySalesforceObject.java | 6 +- .../processors/standard/TestForkRecord.java | 6 +- .../json/TestJsonTreeRowRecordReader.java | 226 +++++++++--------- 5 files changed, 130 insertions(+), 129 deletions(-) diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 12bc6b0878b4..43cc121f1af9 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -53,23 +53,6 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { private final RecordSchema schema; - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, - final String dateFormat, final String timeFormat, final String timestampFormat) - throws IOException, MalformedRecordException { - - this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null); - } - - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, - final String dateFormat, final String timeFormat, final String timestampFormat, - final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, - final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate captureFieldPredicate) - throws IOException, MalformedRecordException { - - this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, - captureFieldPredicate, false, null, new JsonParserFactory()); - } - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final String dateFormat, final String timeFormat, final String timestampFormat, final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index 91d774e130b2..e6f981165164 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.json.JsonParserFactory; import org.apache.nifi.json.JsonRecordSource; import org.apache.nifi.json.JsonSchemaInference; import org.apache.nifi.json.JsonTreeRowRecordReader; @@ -301,7 +302,8 @@ private Record createRecordFromJson(final RecordTransformResult transformResult) } try (final InputStream in = new ByteArrayInputStream(jsonBytes)) { - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null, null, + null, null, null, false, null, new JsonParserFactory()); final Record record = reader.nextRecord(false, false); return record; } diff --git a/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index 82e8b54b3791..4d1fce433cf1 100644 --- a/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -44,6 +44,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonParserFactory; import org.apache.nifi.json.JsonTreeRowRecordReader; import org.apache.nifi.json.SchemaApplicationStrategy; import org.apache.nifi.json.StartingFieldStrategy; @@ -505,7 +506,10 @@ private JsonTreeRowRecordReader createJsonReader(InputStream querySObjectResultI StartingFieldStrategy.NESTED_FIELD, STARTING_FIELD_NAME, SchemaApplicationStrategy.SELECTED_PART, - CAPTURE_PREDICATE + CAPTURE_PREDICATE, + false, + null, + new JsonParserFactory() ); } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java index 00589d2a08a9..650dc48c4788 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java @@ -19,6 +19,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.json.JsonParserFactory; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeRowRecordReader; @@ -460,15 +461,14 @@ public JsonRecordReader(RecordSchema schema) { @Override public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat); + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory()); } @Override public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat); + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory()); } - } private class CustomRecordWriter extends MockRecordWriter { diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index 48b6c5e94b18..f08e8f129c74 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -35,9 +35,11 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.util.EqualsWrapper; -import org.apache.nifi.util.MockComponentLog; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,8 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -63,19 +67,23 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; +@ExtendWith(MockitoExtension.class) class TestJsonTreeRowRecordReader { private static final Logger LOGGER = LoggerFactory.getLogger(TestJsonTreeRowRecordReader.class); private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); + @Mock + private ComponentLog log; + private List getDefaultFields() { return getFields(RecordFieldType.DOUBLE.getDataType()); } @@ -102,7 +110,7 @@ private RecordSchema getAccountSchema() { } @Test - void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException { + void testReadChoiceOfStringOrArrayOfRecords() throws Exception { final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc"); final File jsonFile = new File("src/test/resources/json/choice-of-string-or-array-record.json"); @@ -110,23 +118,23 @@ void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecor final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); try (final InputStream fis = new FileInputStream(jsonFile); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(fis, recordSchema)) { final Record record = reader.nextRecord(); final Object[] fieldsArray = record.getAsArray("fields"); assertEquals(2, fieldsArray.length); final Object firstElement = fieldsArray[0]; - assertTrue(firstElement instanceof Record); + assertInstanceOf(Record.class, firstElement); assertEquals("string", ((Record) firstElement).getAsString("type")); final Object secondElement = fieldsArray[1]; - assertTrue(secondElement instanceof Record); + assertInstanceOf(Record.class, secondElement); final Object[] typeArray = ((Record) secondElement).getAsArray("type"); assertEquals(1, typeArray.length); final Object firstType = typeArray[0]; - assertTrue(firstType instanceof Record); + assertInstanceOf(Record.class, firstType); final Record firstTypeRecord = (Record) firstType; assertEquals("string", firstTypeRecord.getAsString("type")); } @@ -135,14 +143,12 @@ void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecor @Test @Disabled("Intended only for manual testing to determine performance before/after modifications") - void testPerformanceOnLocalFile() throws IOException, MalformedRecordException { + void testPerformanceOnLocalFile() throws Exception { final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList()); final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289"); final byte[] data = Files.readAllBytes(file.toPath()); - final ComponentLog logger = mock(ComponentLog.class); - int recordCount = 0; final int iterations = 1000; @@ -150,10 +156,10 @@ void testPerformanceOnLocalFile() throws IOException, MalformedRecordException { final long start = System.nanoTime(); for (int i = 0; i < iterations; i++) { try (final InputStream in = new ByteArrayInputStream(data); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) { - while (reader.nextRecord() != null) { - recordCount++; - } + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { + while (reader.nextRecord() != null) { + recordCount++; + } } } final long nanos = System.nanoTime() - start; @@ -164,14 +170,12 @@ void testPerformanceOnLocalFile() throws IOException, MalformedRecordException { @Test @Disabled("Intended only for manual testing to determine performance before/after modifications") - void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException { + void testPerformanceOnIndividualMessages() throws Exception { final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList()); final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json"); final byte[] data = Files.readAllBytes(file.toPath()); - final ComponentLog logger = mock(ComponentLog.class); - int recordCount = 0; final int iterations = 1_000_000; @@ -179,7 +183,7 @@ void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordEx final long start = System.nanoTime(); for (int i = 0; i < iterations; i++) { try (final InputStream in = new ByteArrayInputStream(data); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { while (reader.nextRecord() != null) { recordCount++; } @@ -192,12 +196,12 @@ void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordEx } @Test - void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { + void testChoiceOfRecordTypes() throws Exception { final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc")); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); try (final InputStream in = new FileInputStream("src/test/resources/json/elements-for-record-choice.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, recordSchema)) { // evaluate first record final Record firstRecord = reader.nextRecord(); @@ -214,7 +218,7 @@ void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { // child record should have a schema with "id" as the only field final Object childObject = firstRecord.getValue("child"); - assertTrue(childObject instanceof Record); + assertInstanceOf(Record.class, childObject); final Record firstChildRecord = (Record) childObject; final RecordSchema firstChildSchema = firstChildRecord.getSchema(); @@ -236,7 +240,7 @@ void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { // child record should have a schema with "name" as the only field final Object secondChildObject = secondRecord.getValue("child"); - assertTrue(secondChildObject instanceof Record); + assertInstanceOf(Record.class, secondChildObject); final Record secondChildRecord = (Record) secondChildObject; final RecordSchema secondChildSchema = secondChildRecord.getSchema(); @@ -248,11 +252,11 @@ void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { } @Test - void testReadArray() throws IOException, MalformedRecordException { + void testReadArray() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -274,11 +278,11 @@ void testReadArray() throws IOException, MalformedRecordException { } @Test - void testReadOneLinePerJSON() throws IOException, MalformedRecordException { + void testReadOneLinePerJSON() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -300,7 +304,7 @@ void testReadOneLinePerJSON() throws IOException, MalformedRecordException { } @Test - void testReadMultilineJSON() throws IOException, MalformedRecordException { + void testReadMultilineJSON() throws Exception { testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, null); } @@ -313,23 +317,22 @@ void testReadJSONStringTooLong() { } @Test - void testReadJSONComments() throws IOException, MalformedRecordException { + void testReadJSONComments() throws Exception { testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", true, StreamReadConstraints.builder().maxStringLength(20_000).build()); } @Test void testReadJSONDisallowComments() { - final MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> + assertThrows(MalformedRecordException.class, () -> testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", false, StreamReadConstraints.builder().maxStringLength(20_000).build())); } - private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException { + private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws Exception { final List fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)); final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream in = new FileInputStream(inputFile); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, - null, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory())) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema, null, null, null, null, null, null, null, allowComments, streamReadConstraints)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -351,11 +354,11 @@ null, null, null, null, allowComments, streamReadConstraints, new JsonParserFact } @Test - void testReadMultilineArrays() throws IOException, MalformedRecordException { + void testReadMultilineArrays() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiarray.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -383,11 +386,11 @@ void testReadMultilineArrays() throws IOException, MalformedRecordException { } @Test - void testReadMixedJSON() throws IOException, MalformedRecordException { + void testReadMixedJSON() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-mixed.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -416,14 +419,14 @@ void testReadMixedJSON() throws IOException, MalformedRecordException { } @Test - void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException { + void testReadRawRecordIncludesFieldsNotInSchema() throws Exception { final List fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final Record schemaValidatedRecord = reader.nextRecord(true, true); assertEquals(1, schemaValidatedRecord.getValue("id")); @@ -432,7 +435,7 @@ void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedR } try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final Record rawRecord = reader.nextRecord(false, false); assertEquals(1, rawRecord.getValue("id")); @@ -447,7 +450,7 @@ void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedR } @Test - void testReadRawRecordFieldOrderPreserved() throws IOException, MalformedRecordException { + void testReadRawRecordFieldOrderPreserved() throws Exception { final List fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); final RecordSchema schema = new SimpleRecordSchema(fields); @@ -458,7 +461,7 @@ void testReadRawRecordFieldOrderPreserved() throws IOException, MalformedRecordE final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account={\"id\":42,\"balance\":4750.89}}"; try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final Record rawRecord = reader.nextRecord(false, false); @@ -470,14 +473,14 @@ void testReadRawRecordFieldOrderPreserved() throws IOException, MalformedRecordE } @Test - void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException { + void testReadRawRecordTypeCoercion() throws Exception { final List fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final Record schemaValidatedRecord = reader.nextRecord(true, true); assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema @@ -488,7 +491,7 @@ void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordExceptio } try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final Record rawRecord = reader.nextRecord(false, false); assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1 @@ -505,7 +508,7 @@ void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordExceptio } @Test - void testDateCoercedFromString() throws IOException, MalformedRecordException { + void testDateCoercedFromString() throws Exception { final String dateField = "date"; final List recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType())); final RecordSchema schema = new SimpleRecordSchema(recordFields); @@ -515,38 +518,38 @@ void testDateCoercedFromString() throws IOException, MalformedRecordException { final String json = String.format("{ \"%s\": \"%s\" }", dateField, date); for (final boolean coerceTypes : new boolean[] {true, false}) { try (final InputStream in = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema, datePattern, timeFormat, timestampFormat, null, null, null, null, false, null)) { final Record record = reader.nextRecord(coerceTypes, false); final Object value = record.getValue(dateField); - assertTrue(value instanceof java.sql.Date, "With coerceTypes set to " + coerceTypes + ", value is not a Date"); + assertInstanceOf(Date.class, value, "With coerceTypes set to " + coerceTypes + ", value is not a Date"); assertEquals(date, value.toString()); } } } @Test - void testTimestampCoercedFromString() throws IOException, MalformedRecordException { + void testTimestampCoercedFromString() throws Exception { final List recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType())); final RecordSchema schema = new SimpleRecordSchema(recordFields); for (final boolean coerceTypes : new boolean[] {true, false}) { try (final InputStream in = new FileInputStream("src/test/resources/json/timestamp.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss", null, null, null, null, false, null)) { final Record record = reader.nextRecord(coerceTypes, false); final Object value = record.getValue("timestamp"); - assertTrue(value instanceof java.sql.Timestamp, "With coerceTypes set to " + coerceTypes + ", value is not a Timestamp"); + assertInstanceOf(Timestamp.class, value, "With coerceTypes set to " + coerceTypes + ", value is not a Timestamp"); } } } @Test - void testSingleJsonElement() throws IOException, MalformedRecordException { + void testSingleJsonElement() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -565,14 +568,14 @@ void testSingleJsonElement() throws IOException, MalformedRecordException { } @Test - void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException { + void testSingleJsonElementWithChoiceFields() throws Exception { // Wraps default fields by Choice data type to test mapping to a Choice type. final List choiceFields = getDefaultFields().stream() .map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList()); final RecordSchema schema = new SimpleRecordSchema(choiceFields); try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -582,9 +585,9 @@ void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecord RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); final List fields = schema.getFields(); for (int i = 0; i < schema.getFields().size(); i++) { - assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType); + assertInstanceOf(ChoiceDataType.class, fields.get(i).getDataType()); final ChoiceDataType choiceDataType = (ChoiceDataType) fields.get(i).getDataType(); - assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().get(0).getFieldType()); + assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().getFirst().getFieldType()); } final Object[] firstRecordValues = reader.nextRecord().getValues(); @@ -595,7 +598,7 @@ void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecord } @Test - void testElementWithNestedData() throws IOException, MalformedRecordException { + void testElementWithNestedData() throws Exception { final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); final List fields = getDefaultFields(); fields.add(new RecordField("account", accountType)); @@ -603,7 +606,7 @@ void testElementWithNestedData() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); final List expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, @@ -625,7 +628,7 @@ void testElementWithNestedData() throws IOException, MalformedRecordException { } @Test - void testElementWithNestedArray() throws IOException, MalformedRecordException { + void testElementWithNestedArray() throws Exception { final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); @@ -635,7 +638,7 @@ void testElementWithNestedArray() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested-array.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "address", "city", "state", "zipCode", "country", "accounts"); @@ -658,11 +661,11 @@ void testElementWithNestedArray() throws IOException, MalformedRecordException { } @Test - void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException { + void testReadArrayDifferentSchemas() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -687,11 +690,11 @@ void testReadArrayDifferentSchemas() throws IOException, MalformedRecordExceptio } @Test - void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException { + void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws Exception { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-optional-balance.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -716,7 +719,7 @@ void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOExcep } @Test - public void testMultipleInputRecordsWithStartingFieldArray() throws IOException, MalformedRecordException { + public void testMultipleInputRecordsWithStartingFieldArray() throws Exception { final String inputJson = """ [{ "books": [{ @@ -743,8 +746,8 @@ public void testMultipleInputRecordsWithStartingFieldArray() throws IOException, final List ids = new ArrayList<>(); try (final InputStream in = new ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8)); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, timeFormat, timestampFormat, - StartingFieldStrategy.NESTED_FIELD, "books", SchemaApplicationStrategy.SELECTED_PART, null)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, bookSchema, dateFormat, timeFormat, timestampFormat, + StartingFieldStrategy.NESTED_FIELD, "books", SchemaApplicationStrategy.SELECTED_PART, null, false, null)) { Record record; while ((record = reader.nextRecord()) != null) { @@ -757,7 +760,7 @@ public void testMultipleInputRecordsWithStartingFieldArray() throws IOException, } @Test - public void testMultipleInputRecordsWithStartingFieldSingleObject() throws IOException, MalformedRecordException { + public void testMultipleInputRecordsWithStartingFieldSingleObject() throws Exception { final String inputJson = """ {"book": {"id": 1,"title": "Book 1"}} {"book": {"id": 2,"title": "Book 2"}} @@ -772,8 +775,8 @@ public void testMultipleInputRecordsWithStartingFieldSingleObject() throws IOExc final List ids = new ArrayList<>(); try (final InputStream in = new ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8)); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, timeFormat, timestampFormat, - StartingFieldStrategy.NESTED_FIELD, "book", SchemaApplicationStrategy.SELECTED_PART, null)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, bookSchema, dateFormat, timeFormat, timestampFormat, + StartingFieldStrategy.NESTED_FIELD, "book", SchemaApplicationStrategy.SELECTED_PART, null, false, null)) { Record record; while ((record = reader.nextRecord()) != null) { @@ -788,7 +791,7 @@ public void testMultipleInputRecordsWithStartingFieldSingleObject() throws IOExc @Test - void testReadUnicodeCharacters() throws IOException, MalformedRecordException { + void testReadUnicodeCharacters() throws Exception { final List fromFields = new ArrayList<>(); fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType())); @@ -804,12 +807,12 @@ void testReadUnicodeCharacters() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream in = new FileInputStream("src/test/resources/json/json-with-unicode.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { final Object[] firstRecordValues = reader.nextRecord().getValues(); final Object secondValue = firstRecordValues[1]; - assertTrue(secondValue instanceof Long); + assertInstanceOf(Long.class, secondValue); assertEquals(832036744985577473L, secondValue); final Object unicodeValue = firstRecordValues[2]; @@ -830,7 +833,7 @@ void testIncorrectSchema() { MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> { try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account-wrong-field-type.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) { reader.nextRecord().getValues(); } @@ -858,12 +861,12 @@ void testMergeOfSimilarRecords() throws Exception { )); List expected = Arrays.asList( - new MapRecord(expectedSchema, new HashMap() {{ + new MapRecord(expectedSchema, new HashMap<>() {{ put("integer", 1); put("boolean", true); put("booleanOrString", true); }}), - new MapRecord(expectedSchema, new HashMap() {{ + new MapRecord(expectedSchema, new HashMap<>() {{ put("integer", 2); put("string", "stringValue2"); put("booleanOrString", "booleanOrStringValue2"); @@ -893,14 +896,14 @@ void testChoiceOfEmbeddedSimilarRecords() throws Exception { )); List expected = Arrays.asList( - new MapRecord(expectedRecordChoiceSchema, new HashMap() {{ - put("record", new MapRecord(expectedRecordSchema1, new HashMap() {{ + new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ + put("record", new MapRecord(expectedRecordSchema1, new HashMap<>() {{ put("integer", 1); put("boolean", true); }})); }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap() {{ - put("record", new MapRecord(expectedRecordSchema2, new HashMap() {{ + new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ + put("record", new MapRecord(expectedRecordSchema2, new HashMap<>() {{ put("integer", 2); put("string", "stringValue2"); }})); @@ -992,7 +995,7 @@ void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception { } @Test - void testStartFromNestedArray() throws IOException, MalformedRecordException { + void testStartFromNestedArray() throws Exception { String jsonPath = "src/test/resources/json/single-element-nested-array.json"; SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1009,7 +1012,7 @@ void testStartFromNestedArray() throws IOException, MalformedRecordException { } @Test - void testStartFromNestedObject() throws IOException, MalformedRecordException { + void testStartFromNestedObject() throws Exception { String jsonPath = "src/test/resources/json/single-element-nested.json"; SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1023,7 +1026,7 @@ void testStartFromNestedObject() throws IOException, MalformedRecordException { } @Test - void testStartFromMultipleNestedField() throws IOException, MalformedRecordException { + void testStartFromMultipleNestedField() throws Exception { String jsonPath = "src/test/resources/json/multiple-nested-field.json"; SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1040,14 +1043,14 @@ void testStartFromMultipleNestedField() throws IOException, MalformedRecordExcep } @Test - void testStartFromSimpleFieldReturnsEmptyJson() throws IOException, MalformedRecordException { + void testStartFromSimpleFieldReturnsEmptyJson() throws Exception { String jsonPath = "src/test/resources/json/single-element-nested.json"; testReadRecords(jsonPath, Collections.emptyList(), StartingFieldStrategy.NESTED_FIELD, "name"); } @Test - void testStartFromNonExistentFieldWithDefinedSchema() throws IOException, MalformedRecordException { + void testStartFromNonExistentFieldWithDefinedSchema() throws Exception { String jsonPath = "src/test/resources/json/single-element-nested.json"; SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields()); @@ -1058,7 +1061,7 @@ void testStartFromNonExistentFieldWithDefinedSchema() throws IOException, Malfor } @Test - void testStartFromNestedFieldThenStartObject() throws IOException, MalformedRecordException { + void testStartFromNestedFieldThenStartObject() throws Exception { String jsonPath = "src/test/resources/json/nested-array-then-start-object.json"; final SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1076,7 +1079,7 @@ void testStartFromNestedFieldThenStartObject() throws IOException, MalformedReco } @Test - void testStartFromNestedObjectWithWholeJsonSchemaScope() throws IOException, MalformedRecordException { + void testStartFromNestedObjectWithWholeJsonSchemaScope() throws Exception { String jsonPath = "src/test/resources/json/single-element-nested.json"; final RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList( @@ -1099,7 +1102,7 @@ void testStartFromNestedObjectWithWholeJsonSchemaScope() throws IOException, Mal } @Test - void testStartFromNestedArrayWithWholeJsonSchemaScope() throws IOException, MalformedRecordException { + void testStartFromNestedArrayWithWholeJsonSchemaScope() throws Exception { String jsonPath = "src/test/resources/json/single-element-nested-array.json"; RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList( @@ -1123,7 +1126,7 @@ void testStartFromNestedArrayWithWholeJsonSchemaScope() throws IOException, Malf } @Test - void testStartFromDeepNestedObject() throws IOException, MalformedRecordException { + void testStartFromDeepNestedObject() throws Exception { String jsonPath = "src/test/resources/json/single-element-deep-nested.json"; RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1149,7 +1152,7 @@ void testStartFromDeepNestedObject() throws IOException, MalformedRecordExceptio )); List expected = Collections.singletonList( - new MapRecord(expectedRecordSchema, new HashMap() {{ + new MapRecord(expectedRecordSchema, new HashMap<>() {{ put("nestedLevel2Int", 111); put("nestedLevel2String", "root.level1.level2:string"); }}) @@ -1160,7 +1163,7 @@ void testStartFromDeepNestedObject() throws IOException, MalformedRecordExceptio } @Test - void testCaptureFields() throws IOException, MalformedRecordException { + void testCaptureFields() throws Exception { Map expectedCapturedFields = new HashMap<>(); expectedCapturedFields.put("id", "1"); expectedCapturedFields.put("zipCode", "11111"); @@ -1194,11 +1197,9 @@ void testCaptureFields() throws IOException, MalformedRecordException { )); try (InputStream in = new FileInputStream("src/test/resources/json/capture-fields.json")) { - JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader( - in, mock(ComponentLog.class), recordSchema, - dateFormat, timeFormat, timestampFormat, - StartingFieldStrategy.NESTED_FIELD, startingFieldName, - SchemaApplicationStrategy.SELECTED_PART, capturePredicate); + JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, recordSchema, dateFormat, timeFormat, timestampFormat, + StartingFieldStrategy.NESTED_FIELD, startingFieldName, SchemaApplicationStrategy.SELECTED_PART, + capturePredicate, false, null); while (reader.nextRecord() != null); Map capturedFields = reader.getCapturedFields(); @@ -1207,7 +1208,7 @@ in, mock(ComponentLog.class), recordSchema, } } - private void testReadRecords(String jsonFilename, List expected) throws IOException, MalformedRecordException { + private void testReadRecords(String jsonFilename, List expected) throws Exception { final File jsonFile = new File(jsonFilename); try (final InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { final RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null); @@ -1219,7 +1220,7 @@ private void testReadRecords(String jsonPath, List expected, StartingFieldStrategy strategy, String startingFieldName) - throws IOException, MalformedRecordException { + throws Exception { final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { @@ -1228,7 +1229,7 @@ private void testReadRecords(String jsonPath, } } - private void testReadRecords(String jsonPath, RecordSchema schema, List expected) throws IOException, MalformedRecordException { + private void testReadRecords(String jsonPath, RecordSchema schema, List expected) throws Exception { final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { testReadRecords(jsonStream, schema, expected); @@ -1240,15 +1241,15 @@ private void testReadRecords(String jsonPath, List expected, StartingFieldStrategy strategy, String startingFieldName, - SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException { + SchemaApplicationStrategy schemaApplicationStrategy) throws Exception { final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy); } } - private void testReadRecords(InputStream jsonStream, RecordSchema schema, List expected) throws IOException, MalformedRecordException { - try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + private void testReadRecords(InputStream jsonStream, RecordSchema schema, List expected) throws Exception { + try (JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(jsonStream, schema)) { List actual = new ArrayList<>(); Record record; while ((record = reader.nextRecord()) != null) { @@ -1280,10 +1281,10 @@ private void testReadRecords(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) - throws IOException, MalformedRecordException { + throws Exception { - try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, - strategy, startingFieldName, schemaApplicationStrategy, null)) { + try (JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(jsonStream, schema, dateFormat, timeFormat, timestampFormat, + strategy, startingFieldName, schemaApplicationStrategy, null, false, null)) { List actual = new ArrayList<>(); Record record; @@ -1312,12 +1313,23 @@ private void testReadRecords(InputStream jsonStream, private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException { RecordSchema schema = new InferSchemaAccessStrategy<>( (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()), - new JsonSchemaInference(new TimeValueInference(null, null, null)), - mock(ComponentLog.class) + new JsonSchemaInference(new TimeValueInference(null, null, null)), log ).getSchema(Collections.emptyMap(), jsonStream, null); jsonStream.reset(); return schema; } + + private JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream inputStream, RecordSchema recordSchema) throws Exception { + return createJsonTreeRowRecordReader(inputStream, recordSchema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null); + } + + private JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream inputStream, RecordSchema recordSchema, String dateFormat, String timeFormat, String timestampFormat, + StartingFieldStrategy startingFieldStrategy, String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy, + BiPredicate captureFieldPredicate, boolean allowComments, StreamReadConstraints streamReadConstraints) + throws Exception { + return new JsonTreeRowRecordReader(inputStream, log, recordSchema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, + captureFieldPredicate, allowComments, streamReadConstraints, new JsonParserFactory()); + } }