From e9de1b51ab94bc096760554382fd0fb17a3859d9 Mon Sep 17 00:00:00 2001 From: Sudhindra Tirupati Nagaraj Date: Fri, 21 Jan 2022 10:43:48 -0800 Subject: [PATCH] Fix avro parser to handle nested arrays --- pom.xml | 3 +- src/main/java/rockset/AvroParser.java | 63 +++--- src/test/java/rockset/AvroParserTest.java | 221 +++++++++++++++++++ src/test/java/rockset/DocumentationTest.java | 10 - 4 files changed, 260 insertions(+), 37 deletions(-) delete mode 100644 src/test/java/rockset/DocumentationTest.java diff --git a/pom.xml b/pom.xml index 7797dea..7e9b094 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ com.github.jcustenborder.kafka.connect kafka-connect-parent - 2.0.0-cp1 + 2.8.0-1 @@ -137,4 +137,5 @@ + diff --git a/src/main/java/rockset/AvroParser.java b/src/main/java/rockset/AvroParser.java index 4b59a54..53c0aa3 100644 --- a/src/main/java/rockset/AvroParser.java +++ b/src/main/java/rockset/AvroParser.java @@ -6,11 +6,11 @@ import io.confluent.connect.avro.AvroData; import io.confluent.kafka.serializers.NonRecordContainer; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.stream.Collectors; import org.apache.avro.generic.GenericData.Record; import org.apache.kafka.connect.data.Date; @@ -46,7 +46,7 @@ public Map parseValue(SinkRecord record) { } if (val instanceof Record) { Map map = getMap(val); - return convertLogicalTypes(record.valueSchema(), map); + return convertLogicalTypesMap(record.valueSchema(), map); } return getMap(val); @@ -56,33 +56,46 @@ private boolean isLogicalType(Schema schema) { return LOGICAL_TYPE_CONVERTERS.containsKey(schema.name()); } - private Map convertLogicalTypes(Schema valueSchema, Map map) { + private Object convertType(Schema schema, Object o) { + if (isLogicalType(schema)) { + return convertLogicalType(schema, o); + } + + Type type = schema.type(); + + switch (type) { + case STRUCT: + case MAP: + return convertLogicalTypesMap(schema, (Map)o); + + case ARRAY: + return convertLogicalTypesArray(schema, (List)o); + } + + // cld be a scalar type, use as-is + return o; + } + + public List convertLogicalTypesArray(Schema schema, List arr) { + List res = new ArrayList<>(); + + for (Object o : arr) { + res.add(convertType(schema.valueSchema(), o)); + } + + return res; + } + + public Map convertLogicalTypesMap(Schema valueSchema, Map map) { for (Entry e : map.entrySet()) { Schema schema = getSchemaForField(valueSchema, e.getKey()); if (schema == null) { continue; } - if (isLogicalType(schema)) { - Object value = convertLogicalType(schema, e.getValue()); - e.setValue(value); - continue; - } - Type type = schema.type(); - switch (type) { - case STRUCT: - case MAP: - e.setValue(convertLogicalTypes(schema, (Map) e.getValue())); - break; - case ARRAY: - Schema arraySchema = schema.valueSchema(); - List convertedElements = ((List) e.getValue()).stream() - .map(element -> convertLogicalType(arraySchema, element)) - .collect(Collectors.toList()); - e.setValue(convertedElements); - break; - } + e.setValue(convertType(schema, e.getValue())); } + return map; } @@ -99,10 +112,8 @@ private Schema getSchemaForField(Schema schema, String key) { } } } - if (schema.type() == Type.MAP) { - return schema.valueSchema(); - } - return null; + + return schema.valueSchema(); } public Map getMap(Object val) { diff --git a/src/test/java/rockset/AvroParserTest.java b/src/test/java/rockset/AvroParserTest.java index 51f5e27..8c15687 100644 --- a/src/test/java/rockset/AvroParserTest.java +++ b/src/test/java/rockset/AvroParserTest.java @@ -4,13 +4,16 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.time.Instant; import java.util.Date; +import java.util.List; import java.util.Map; +import io.confluent.connect.avro.AvroData; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -53,6 +56,224 @@ public void testSimpleKeys() throws IOException { verifySimpleKey(true); } + @Test + public void testAvroArraySchema1() throws IOException { + final String schemaStr = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"KsqlDataSourceSchema\",\n" + + " \"namespace\": \"io.confluent.ksql.avro_schemas\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"user_id\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"array\",\n" + + " \"items\": [\n" + + " \"null\",\n" + + " \"string\"\n" + + " ]\n" + + " }\n" + + " ],\n" + + " \"default\": null\n" + + " }\n" + + " ]\n" + + "}"; + + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schemaStr); + Schema schema = new AvroData(1).toConnectSchema(avroSchema); + + AvroParser avroParser = new AvroParser(); + + String value = "{\n" + + " \"user_id\": [\n" + + " \"1\",\n" + + " \"2\",\n" + + " \"3\"\n" + + " ]\n" + + "}"; + + Map map = avroParser.getMap(value); + Map res = avroParser.convertLogicalTypesMap(schema, map); + + assertEquals(res.keySet().stream().iterator().next(), "user_id"); + assertEquals(new ObjectMapper().writeValueAsString(res.entrySet().stream().iterator().next().getValue()), "[\"1\",\"2\",\"3\"]"); + } + + @Test + public void testAvroArraySchema2() { + final String schemaStr = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"KsqlDataSourceSchema\",\n" + + " \"namespace\": \"io.confluent.ksql.avro_schemas\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"data\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"array\",\n" + + " \"items\": [\n" + + " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"KsqlDataSourceSchema_data\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"key\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"string\"\n" + + " ],\n" + + " \"default\": null\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"string\"\n" + + " ],\n" + + " \"default\": null\n" + + " }\n" + + " ],\n" + + " \"connect.internal.type\": \"MapEntry\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " ],\n" + + " \"default\": null\n" + + " }\n" + + " ]\n" + + "}"; + + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schemaStr); + Schema schema = new AvroData(1).toConnectSchema(avroSchema); + + AvroParser avroParser = new AvroParser(); + + String value = "{\n" + + " \"data\": [\n" + + " {\n" + + " \"KsqlDataSourceSchema_data\": {\n" + + " \"key\": \"1\",\n" + + " \"value\": \"11\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"KsqlDataSourceSchema_data\": {\n" + + " \"key\": \"2\",\n" + + " \"value\": \"22\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + Map map = avroParser.getMap(value); + Map res = avroParser.convertLogicalTypesMap(schema, map); + + assertEquals("data", res.keySet().stream().iterator().next()); + List> values = (List>)res.entrySet().stream().iterator().next().getValue(); + + assertEquals(values.size(), 2); + assertEquals(((Map)values.get(0).get("KsqlDataSourceSchema_data")).get("key"), "1"); + assertEquals(((Map)values.get(0).get("KsqlDataSourceSchema_data")).get("value"), "11"); + assertEquals(((Map)values.get(1).get("KsqlDataSourceSchema_data")).get("key"), "2"); + assertEquals(((Map)values.get(1).get("KsqlDataSourceSchema_data")).get("value"), "22"); + } + + @Test + public void testAvroArraySchema3() throws IOException { + Schema schema = SchemaBuilder.struct().name("record").field("data", SchemaBuilder.array( + SchemaBuilder.map(Schema.STRING_SCHEMA, Timestamp.SCHEMA))); + + String value = "{\n" + + " \"data\": [\n" + + " {\n" + + " \"1\": 1642784652\n" + + " },\n" + + " {\n" + + " \"2\": 1642784653\n" + + " },\n" + + " {\n" + + " \"3\": 1642784654\n" + + " }\n" + + " ]\n" + + "}"; + + AvroParser avroParser = new AvroParser(); + Map map = avroParser.getMap(value); + Map res = avroParser.convertLogicalTypesMap(schema, map); + + String expectedOutput = "{\n" + + " \"data\" : [ {\n" + + " \"1\" : {\n" + + " \"value\" : 1642784652,\n" + + " \"__rockset_type\" : \"timestamp\"\n" + + " }\n" + + " }, {\n" + + " \"2\" : {\n" + + " \"value\" : 1642784653,\n" + + " \"__rockset_type\" : \"timestamp\"\n" + + " }\n" + + " }, {\n" + + " \"3\" : {\n" + + " \"value\" : 1642784654,\n" + + " \"__rockset_type\" : \"timestamp\"\n" + + " }\n" + + " } ]\n" + + "}"; + + assertEquals(expectedOutput.replaceAll("[\\n\\t ]", ""), new ObjectMapper().writeValueAsString(res)); + } + + @Test + public void testAvroArraySchema4() throws IOException { + Schema schema = SchemaBuilder.struct().name("record").field("data", SchemaBuilder.struct().field("foo", SchemaBuilder.array( + SchemaBuilder.map(Schema.STRING_SCHEMA, Timestamp.SCHEMA)))); + + String value = "{\n" + + " \"data\": {\n" + + " \"foo\": [\n" + + " {\n" + + " \"1\": 1642784652\n" + + " },\n" + + " {\n" + + " \"2\": 1642784653\n" + + " },\n" + + " {\n" + + " \"3\": 1642784654\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + + AvroParser avroParser = new AvroParser(); + Map map = avroParser.getMap(value); + Map res = avroParser.convertLogicalTypesMap(schema, map); + + String expectedOutput = "{\n" + + " \"data\" : {\n" + + " \"foo\" : [ {\n" + + " \"1\" : {\n" + + " \"value\" : 1642784652,\n" + + " \"__rockset_type\" : \"timestamp\"\n" + + " }\n" + + " }, {\n" + + " \"2\" : {\n" + + " \"value\" : 1642784653,\n" + + " \"__rockset_type\" : \"timestamp\"\n" + + " }\n" + + " }, {\n" + + " \"3\" : {\n" + + " \"value\" : 1642784654,\n" + + " \"__rockset_type\" : \"timestamp\"\n" + + " }\n" + + " } ]\n" + + " }\n" + + "}"; + + assertEquals(expectedOutput.replaceAll("[\\n\\t ]", ""), new ObjectMapper().writeValueAsString(res)); + } + private void verifySimpleKey(Object key) throws IOException { Schema valueSchema = SchemaBuilder.struct() .field("id", Schema.INT64_SCHEMA) diff --git a/src/test/java/rockset/DocumentationTest.java b/src/test/java/rockset/DocumentationTest.java deleted file mode 100644 index 8d50b70..0000000 --- a/src/test/java/rockset/DocumentationTest.java +++ /dev/null @@ -1,10 +0,0 @@ -package rockset; - -import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest; - -public class DocumentationTest extends BaseDocumentationTest { - @Override - protected String[] packages() { - return new String[0]; - } -} \ No newline at end of file