diff --git a/pom.xml b/pom.xml
index 7797dea..7e9b094 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
com.github.jcustenborder.kafka.connect
kafka-connect-parent
- 2.0.0-cp1
+ 2.8.0-1
@@ -137,4 +137,5 @@
+
diff --git a/src/main/java/rockset/AvroParser.java b/src/main/java/rockset/AvroParser.java
index 4b59a54..53c0aa3 100644
--- a/src/main/java/rockset/AvroParser.java
+++ b/src/main/java/rockset/AvroParser.java
@@ -6,11 +6,11 @@
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.serializers.NonRecordContainer;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData.Record;
import org.apache.kafka.connect.data.Date;
@@ -46,7 +46,7 @@ public Map parseValue(SinkRecord record) {
}
if (val instanceof Record) {
Map map = getMap(val);
- return convertLogicalTypes(record.valueSchema(), map);
+ return convertLogicalTypesMap(record.valueSchema(), map);
}
return getMap(val);
@@ -56,33 +56,46 @@ private boolean isLogicalType(Schema schema) {
return LOGICAL_TYPE_CONVERTERS.containsKey(schema.name());
}
- private Map convertLogicalTypes(Schema valueSchema, Map map) {
+ private Object convertType(Schema schema, Object o) {
+ if (isLogicalType(schema)) {
+ return convertLogicalType(schema, o);
+ }
+
+ Type type = schema.type();
+
+ switch (type) {
+ case STRUCT:
+ case MAP:
+ return convertLogicalTypesMap(schema, (Map)o);
+
+ case ARRAY:
+ return convertLogicalTypesArray(schema, (List