Skip to content

Commit

Permalink
Merge pull request #5 from rockset/sudhindra_avro_parsing
Browse files Browse the repository at this point in the history
Fix avro parser to handle nested arrays
  • Loading branch information
sudhindra-rockset authored Jan 24, 2022
2 parents ed2290a + e9de1b5 commit f22c5cf
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 37 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.0.0-cp1</version>
<version>2.8.0-1</version>
</parent>

<scm>
Expand Down Expand Up @@ -137,4 +137,5 @@
</plugin>
</plugins>
</build>

</project>
63 changes: 37 additions & 26 deletions src/main/java/rockset/AvroParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.serializers.NonRecordContainer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.avro.generic.GenericData.Record;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -46,7 +46,7 @@ public Map<String, Object> parseValue(SinkRecord record) {
}
if (val instanceof Record) {
Map<String, Object> map = getMap(val);
return convertLogicalTypes(record.valueSchema(), map);
return convertLogicalTypesMap(record.valueSchema(), map);
}

return getMap(val);
Expand All @@ -56,33 +56,46 @@ private boolean isLogicalType(Schema schema) {
return LOGICAL_TYPE_CONVERTERS.containsKey(schema.name());
}

private Map<String, Object> convertLogicalTypes(Schema valueSchema, Map<String, Object> map) {
private Object convertType(Schema schema, Object o) {
if (isLogicalType(schema)) {
return convertLogicalType(schema, o);
}

Type type = schema.type();

switch (type) {
case STRUCT:
case MAP:
return convertLogicalTypesMap(schema, (Map<String, Object>)o);

case ARRAY:
return convertLogicalTypesArray(schema, (List<Object>)o);
}

// cld be a scalar type, use as-is
return o;
}

public List<Object> convertLogicalTypesArray(Schema schema, List<Object> arr) {
List<Object> res = new ArrayList<>();

for (Object o : arr) {
res.add(convertType(schema.valueSchema(), o));
}

return res;
}

public Map<String, Object> convertLogicalTypesMap(Schema valueSchema, Map<String, Object> map) {
for (Entry<String, Object> e : map.entrySet()) {
Schema schema = getSchemaForField(valueSchema, e.getKey());
if (schema == null) {
continue;
}

if (isLogicalType(schema)) {
Object value = convertLogicalType(schema, e.getValue());
e.setValue(value);
continue;
}
Type type = schema.type();
switch (type) {
case STRUCT:
case MAP:
e.setValue(convertLogicalTypes(schema, (Map<String, Object>) e.getValue()));
break;
case ARRAY:
Schema arraySchema = schema.valueSchema();
List<Object> convertedElements = ((List<Object>) e.getValue()).stream()
.map(element -> convertLogicalType(arraySchema, element))
.collect(Collectors.toList());
e.setValue(convertedElements);
break;
}
e.setValue(convertType(schema, e.getValue()));
}

return map;
}

Expand All @@ -99,10 +112,8 @@ private Schema getSchemaForField(Schema schema, String key) {
}
}
}
if (schema.type() == Type.MAP) {
return schema.valueSchema();
}
return null;

return schema.valueSchema();
}

public Map<String, Object> getMap(Object val) {
Expand Down
221 changes: 221 additions & 0 deletions src/test/java/rockset/AvroParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;

import io.confluent.connect.avro.AvroData;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -53,6 +56,224 @@ public void testSimpleKeys() throws IOException {
verifySimpleKey(true);
}

@Test
public void testAvroArraySchema1() throws IOException {
final String schemaStr = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"KsqlDataSourceSchema\",\n" +
" \"namespace\": \"io.confluent.ksql.avro_schemas\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"user_id\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"array\",\n" +
" \"items\": [\n" +
" \"null\",\n" +
" \"string\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"default\": null\n" +
" }\n" +
" ]\n" +
"}";

org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schemaStr);
Schema schema = new AvroData(1).toConnectSchema(avroSchema);

AvroParser avroParser = new AvroParser();

String value = "{\n" +
" \"user_id\": [\n" +
" \"1\",\n" +
" \"2\",\n" +
" \"3\"\n" +
" ]\n" +
"}";

Map<String, Object> map = avroParser.getMap(value);
Map<String, Object> res = avroParser.convertLogicalTypesMap(schema, map);

assertEquals(res.keySet().stream().iterator().next(), "user_id");
assertEquals(new ObjectMapper().writeValueAsString(res.entrySet().stream().iterator().next().getValue()), "[\"1\",\"2\",\"3\"]");
}

@Test
public void testAvroArraySchema2() {
final String schemaStr = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"KsqlDataSourceSchema\",\n" +
" \"namespace\": \"io.confluent.ksql.avro_schemas\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"data\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"array\",\n" +
" \"items\": [\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"KsqlDataSourceSchema_data\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"key\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"string\"\n" +
" ],\n" +
" \"default\": null\n" +
" },\n" +
" {\n" +
" \"name\": \"value\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"string\"\n" +
" ],\n" +
" \"default\": null\n" +
" }\n" +
" ],\n" +
" \"connect.internal.type\": \"MapEntry\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"default\": null\n" +
" }\n" +
" ]\n" +
"}";

org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schemaStr);
Schema schema = new AvroData(1).toConnectSchema(avroSchema);

AvroParser avroParser = new AvroParser();

String value = "{\n" +
" \"data\": [\n" +
" {\n" +
" \"KsqlDataSourceSchema_data\": {\n" +
" \"key\": \"1\",\n" +
" \"value\": \"11\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"KsqlDataSourceSchema_data\": {\n" +
" \"key\": \"2\",\n" +
" \"value\": \"22\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";

Map<String, Object> map = avroParser.getMap(value);
Map<String, Object> res = avroParser.convertLogicalTypesMap(schema, map);

assertEquals("data", res.keySet().stream().iterator().next());
List<Map<String, Object>> values = (List<Map<String, Object>>)res.entrySet().stream().iterator().next().getValue();

assertEquals(values.size(), 2);
assertEquals(((Map<String, String>)values.get(0).get("KsqlDataSourceSchema_data")).get("key"), "1");
assertEquals(((Map<String, String>)values.get(0).get("KsqlDataSourceSchema_data")).get("value"), "11");
assertEquals(((Map<String, String>)values.get(1).get("KsqlDataSourceSchema_data")).get("key"), "2");
assertEquals(((Map<String, String>)values.get(1).get("KsqlDataSourceSchema_data")).get("value"), "22");
}

@Test
public void testAvroArraySchema3() throws IOException {
Schema schema = SchemaBuilder.struct().name("record").field("data", SchemaBuilder.array(
SchemaBuilder.map(Schema.STRING_SCHEMA, Timestamp.SCHEMA)));

String value = "{\n" +
" \"data\": [\n" +
" {\n" +
" \"1\": 1642784652\n" +
" },\n" +
" {\n" +
" \"2\": 1642784653\n" +
" },\n" +
" {\n" +
" \"3\": 1642784654\n" +
" }\n" +
" ]\n" +
"}";

AvroParser avroParser = new AvroParser();
Map<String, Object> map = avroParser.getMap(value);
Map<String, Object> res = avroParser.convertLogicalTypesMap(schema, map);

String expectedOutput = "{\n" +
" \"data\" : [ {\n" +
" \"1\" : {\n" +
" \"value\" : 1642784652,\n" +
" \"__rockset_type\" : \"timestamp\"\n" +
" }\n" +
" }, {\n" +
" \"2\" : {\n" +
" \"value\" : 1642784653,\n" +
" \"__rockset_type\" : \"timestamp\"\n" +
" }\n" +
" }, {\n" +
" \"3\" : {\n" +
" \"value\" : 1642784654,\n" +
" \"__rockset_type\" : \"timestamp\"\n" +
" }\n" +
" } ]\n" +
"}";

assertEquals(expectedOutput.replaceAll("[\\n\\t ]", ""), new ObjectMapper().writeValueAsString(res));
}

@Test
public void testAvroArraySchema4() throws IOException {
Schema schema = SchemaBuilder.struct().name("record").field("data", SchemaBuilder.struct().field("foo", SchemaBuilder.array(
SchemaBuilder.map(Schema.STRING_SCHEMA, Timestamp.SCHEMA))));

String value = "{\n" +
" \"data\": {\n" +
" \"foo\": [\n" +
" {\n" +
" \"1\": 1642784652\n" +
" },\n" +
" {\n" +
" \"2\": 1642784653\n" +
" },\n" +
" {\n" +
" \"3\": 1642784654\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";

AvroParser avroParser = new AvroParser();
Map<String, Object> map = avroParser.getMap(value);
Map<String, Object> res = avroParser.convertLogicalTypesMap(schema, map);

String expectedOutput = "{\n" +
" \"data\" : {\n" +
" \"foo\" : [ {\n" +
" \"1\" : {\n" +
" \"value\" : 1642784652,\n" +
" \"__rockset_type\" : \"timestamp\"\n" +
" }\n" +
" }, {\n" +
" \"2\" : {\n" +
" \"value\" : 1642784653,\n" +
" \"__rockset_type\" : \"timestamp\"\n" +
" }\n" +
" }, {\n" +
" \"3\" : {\n" +
" \"value\" : 1642784654,\n" +
" \"__rockset_type\" : \"timestamp\"\n" +
" }\n" +
" } ]\n" +
" }\n" +
"}";

assertEquals(expectedOutput.replaceAll("[\\n\\t ]", ""), new ObjectMapper().writeValueAsString(res));
}

private void verifySimpleKey(Object key) throws IOException {
Schema valueSchema = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
Expand Down
10 changes: 0 additions & 10 deletions src/test/java/rockset/DocumentationTest.java

This file was deleted.

0 comments on commit f22c5cf

Please sign in to comment.