From bcd8975b5976ba7cfdc7cc5676cf9f41ceb450ec Mon Sep 17 00:00:00 2001 From: Bhagtani Date: Wed, 9 Mar 2016 10:29:59 +0530 Subject: [PATCH 1/6] Added support for logical datatypes starting with Decimal type --- .../databricks/spark/avro/AvroRelation.scala | 13 ++++--- .../spark/avro/SchemaConverters.scala | 33 ++++++++++++++++-- src/test/resources/users.avro | Bin 0 -> 279 bytes .../com/databricks/spark/avro/AvroSuite.scala | 15 +++++++- 4 files changed, 53 insertions(+), 8 deletions(-) create mode 100644 src/test/resources/users.avro diff --git a/src/main/scala/com/databricks/spark/avro/AvroRelation.scala b/src/main/scala/com/databricks/spark/avro/AvroRelation.scala index 3e1fda20..5db81a1b 100644 --- a/src/main/scala/com/databricks/spark/avro/AvroRelation.scala +++ b/src/main/scala/com/databricks/spark/avro/AvroRelation.scala @@ -18,11 +18,9 @@ package com.databricks.spark.avro import java.io.FileNotFoundException import java.util.zip.Deflater - import scala.collection.Iterator import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer - import com.google.common.base.Objects import org.apache.avro.SchemaBuilder import org.apache.avro.file.{DataFileConstants, DataFileReader, FileReader} @@ -31,12 +29,12 @@ import org.apache.avro.mapred.{AvroOutputFormat, FsInput} import org.apache.avro.mapreduce.AvroJob import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.Job - import org.apache.spark.Logging import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} +import org.apache.avro.Schema.Type private[avro] class AvroRelation( override val paths: Array[String], @@ -130,8 +128,13 @@ private[avro] class AvroRelation( val firstRecord = records.next() val superSchema = firstRecord.getSchema // the schema of the actual record // the fields that are actually required along with their converters - val avroFieldMap = superSchema.getFields.map(f => (f.name, f)).toMap - + val avroFieldMap = superSchema.getFields.map{f => + if(f.schema().getType.equals(Type.STRING)){ + f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) + } + (f.name, f) + }.toMap + new Iterator[Row] { private[this] val baseIterator = records private[this] var currentRecord = firstRecord diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index c2a42ba9..acf54a62 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -27,6 +27,7 @@ import org.apache.avro.SchemaBuilder._ import org.apache.avro.Schema.Type._ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ +import org.apache.avro.Schema.Type /** * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice @@ -34,6 +35,11 @@ import org.apache.spark.sql.types._ */ object SchemaConverters { + val LOGICAL_TYPE = "logicalType" + val DECIMAL = "decimal" + val PRECISION = "precision" + val SCALE = "scale" + case class SchemaType(dataType: DataType, nullable: Boolean) /** @@ -42,7 +48,16 @@ object SchemaConverters { def toSqlType(avroSchema: Schema): SchemaType = { avroSchema.getType match { case INT => SchemaType(IntegerType, nullable = false) - case STRING => SchemaType(StringType, nullable = false) + case STRING => { + val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) + if(logicalType != null && logicalType.asText().equalsIgnoreCase(DECIMAL)){ + val precision = avroSchema.getJsonProp(PRECISION).asInt + val scale = avroSchema.getJsonProp(SCALE).asInt + SchemaType(DecimalType(precision,scale), nullable = false) + }else { + SchemaType(StringType, nullable = false) + } + } case BOOLEAN => SchemaType(BooleanType, nullable = false) case BYTES => SchemaType(BinaryType, nullable = false) case DOUBLE => SchemaType(DoubleType, nullable = false) @@ -53,6 +68,9 @@ object SchemaConverters { case RECORD => val fields = avroSchema.getFields.map { f => + if(f.schema().getType.equals(Type.STRING)){ + f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) + } val schemaType = toSqlType(f.schema()) StructField(f.name, schemaType.dataType, schemaType.nullable) } @@ -125,7 +143,18 @@ object SchemaConverters { private[avro] def createConverterToSQL(schema: Schema): Any => Any = { schema.getType match { // Avro strings are in Utf8, so we have to call toString on them - case STRING | ENUM => (item: Any) => if (item == null) null else item.toString + case STRING | ENUM => (item: Any) => if (item == null) { + null + }else { + val logicalType = schema.getJsonProp(LOGICAL_TYPE) + if(logicalType != null && logicalType.asText().equalsIgnoreCase(DECIMAL)){ + val precision = schema.getJsonProp(PRECISION).asInt + val scale = schema.getJsonProp(SCALE).asInt + Decimal.apply(BigDecimal.apply(item.toString()), precision, scale) + }else{ + item.toString + } + } case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity // Byte arrays are reused by avro, so we have to make a copy of them. case FIXED => (item: Any) => if (item == null) { diff --git a/src/test/resources/users.avro b/src/test/resources/users.avro new file mode 100644 index 0000000000000000000000000000000000000000..3bd7a29558886908bfe10d99c34f6ce3ab0d413c GIT binary patch literal 279 zcmeZI%3@>^ODrqO*DFrWNX<=L#Z;|SQdy9yWTjM;nw(#hqNJmgmzWFUm!uY#0C{Pd zsW~adN> Decimal.apply(x.getDecimal(0))) + val a = Decimal.apply(BigDecimal.apply("100000000000000000000000000"), 38, 0) + val b = Decimal.apply(BigDecimal.apply("55555555555555555555555555555"), 38, 0) + + assert(decimals.apply(0).equals(a)) + assert(decimals.apply(1).equals(b)) + + assert(df.schema.apply(0).dataType == DecimalType(38,0)) + } +} From 1cec3c40e02a3d0cb31c80ece0ac6445931263ae Mon Sep 17 00:00:00 2001 From: Bhagtani Date: Wed, 9 Mar 2016 10:49:32 +0530 Subject: [PATCH 2/6] removed tab for which build was failing --- src/main/scala/com/databricks/spark/avro/SchemaConverters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index acf54a62..a12ff481 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -68,7 +68,7 @@ object SchemaConverters { case RECORD => val fields = avroSchema.getFields.map { f => - if(f.schema().getType.equals(Type.STRING)){ + if(f.schema().getType.equals(Type.STRING)){ f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) } val schemaType = toSqlType(f.schema()) From 15f415c881231decb9e9f6e159b4d52ced0f7179 Mon Sep 17 00:00:00 2001 From: Bhagtani Date: Wed, 9 Mar 2016 17:47:40 +0530 Subject: [PATCH 3/6] added support for union types --- .../com/databricks/spark/avro/AvroRelation.scala | 4 +--- .../com/databricks/spark/avro/SchemaConverters.scala | 12 +++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/AvroRelation.scala b/src/main/scala/com/databricks/spark/avro/AvroRelation.scala index 5db81a1b..863677c6 100644 --- a/src/main/scala/com/databricks/spark/avro/AvroRelation.scala +++ b/src/main/scala/com/databricks/spark/avro/AvroRelation.scala @@ -129,9 +129,7 @@ private[avro] class AvroRelation( val superSchema = firstRecord.getSchema // the schema of the actual record // the fields that are actually required along with their converters val avroFieldMap = superSchema.getFields.map{f => - if(f.schema().getType.equals(Type.STRING)){ - f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) - } + f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) (f.name, f) }.toMap diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index a12ff481..375c63c6 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -68,9 +68,7 @@ object SchemaConverters { case RECORD => val fields = avroSchema.getFields.map { f => - if(f.schema().getType.equals(Type.STRING)){ - f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) - } + f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) val schemaType = toSqlType(f.schema()) StructField(f.name, schemaType.dataType, schemaType.nullable) } @@ -94,7 +92,9 @@ object SchemaConverters { // In case of a union with null, eliminate it and make a recursive call val remainingUnionTypes = avroSchema.getTypes.filterNot(_.getType == NULL) if (remainingUnionTypes.size == 1) { - toSqlType(remainingUnionTypes.get(0)).copy(nullable = true) + val remainingSchema = remainingUnionTypes.get(0) + avroSchema.getJsonProps.foreach(x => remainingSchema.addProp(x._1, x._2)) + toSqlType(remainingSchema).copy(nullable = true) } else { toSqlType(Schema.createUnion(remainingUnionTypes)).copy(nullable = true) } @@ -202,7 +202,9 @@ object SchemaConverters { if (schema.getTypes.exists(_.getType == NULL)) { val remainingUnionTypes = schema.getTypes.filterNot(_.getType == NULL) if (remainingUnionTypes.size == 1) { - createConverterToSQL(remainingUnionTypes.get(0)) + val remainingSchema = remainingUnionTypes.get(0) + schema.getJsonProps.foreach(x => remainingSchema.addProp(x._1, x._2)) + createConverterToSQL(remainingSchema) } else { createConverterToSQL(Schema.createUnion(remainingUnionTypes)) } From e2b67711d6bfca19a259cf19ad437ddd7317f869 Mon Sep 17 00:00:00 2001 From: Bhagtani Date: Wed, 9 Mar 2016 17:58:47 +0530 Subject: [PATCH 4/6] updated users.avro with union types having logical types --- src/test/resources/users.avro | Bin 279 -> 290 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/src/test/resources/users.avro b/src/test/resources/users.avro index 3bd7a29558886908bfe10d99c34f6ce3ab0d413c..32feb0ef4f4c8b28c8720eb44bc4879c2bf1afc0 100644 GIT binary patch delta 106 zcmbQvw1`Q_KPiimNi4CfC||EQIU_YUanD2{Tfu0hywaQ;B^{;WlA_GKbfwsd;Xz^y x&7Y3S@BF!|HRsk`{j_uMS!OXZnHf$zD9&SGV8EB0k(ifKl*lykfw};?N&xrCDk=Z~ delta 95 zcmZ3)G@VJvKPiimNi4CfC||EQIU_YUan(d2TMnh-lA_GKbft+2LBjD&IeOA>3^U^` m`=VDb>e6DF#yIhq3ZvD;j}ns@StSJ$i!(CIGxIXh)d2wLz#!rP From 55eb88ce79a587ea1ec6269a9ed2d55fa573ca64 Mon Sep 17 00:00:00 2001 From: Bhagtani Date: Fri, 11 Mar 2016 11:15:50 +0530 Subject: [PATCH 5/6] added support for record types --- .../scala/com/databricks/spark/avro/SchemaConverters.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 375c63c6..496b2ecb 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -171,7 +171,10 @@ object SchemaConverters { javaBytes } case RECORD => - val fieldConverters = schema.getFields.map(f => createConverterToSQL(f.schema)) + val fieldConverters = schema.getFields.map{f => + f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) + createConverterToSQL(f.schema) + } (item: Any) => if (item == null) { null } else { From b115f10e1ab892c0a911352cd2ad599aa889a50d Mon Sep 17 00:00:00 2001 From: Bhagtani Date: Tue, 19 Apr 2016 14:25:40 +0530 Subject: [PATCH 6/6] added timestamp and date type support --- .../spark/avro/SchemaConverters.scala | 29 ++++++++++++++++-- src/test/resources/users.avro | Bin 290 -> 513 bytes .../com/databricks/spark/avro/AvroSuite.scala | 26 ++++++++++++---- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 496b2ecb..07b6ed4c 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -28,6 +28,8 @@ import org.apache.avro.Schema.Type._ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.avro.Schema.Type +import java.sql.Timestamp +import java.sql.Date /** * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice @@ -37,6 +39,8 @@ object SchemaConverters { val LOGICAL_TYPE = "logicalType" val DECIMAL = "decimal" + val TIMESTAMP = "timestamp"; + val DATE = "date"; val PRECISION = "precision" val SCALE = "scale" @@ -62,7 +66,16 @@ object SchemaConverters { case BYTES => SchemaType(BinaryType, nullable = false) case DOUBLE => SchemaType(DoubleType, nullable = false) case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => SchemaType(LongType, nullable = false) + case LONG => { + val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) + if(logicalType != null && logicalType.asText().equalsIgnoreCase(TIMESTAMP)) { + SchemaType(TimestampType, nullable = false) + }else if(logicalType != null && logicalType.asText().equalsIgnoreCase(DATE)) { + SchemaType(TimestampType, nullable = false) + }else{ + SchemaType(LongType, nullable = false) + } + } case FIXED => SchemaType(BinaryType, nullable = false) case ENUM => SchemaType(StringType, nullable = false) @@ -155,7 +168,19 @@ object SchemaConverters { item.toString } } - case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity + case LONG => (item: Any) => if (item == null) { + null + }else { + val logicalType = schema.getJsonProp(LOGICAL_TYPE) + if(logicalType != null && logicalType.asText().equalsIgnoreCase(TIMESTAMP)){ + new Timestamp(item.asInstanceOf[Long].longValue()) + }else if(logicalType != null && logicalType.asText().equalsIgnoreCase(DATE)){ + new Timestamp(item.asInstanceOf[Long].longValue()) + }else{ + item + } + } + case INT | BOOLEAN | DOUBLE | FLOAT => identity // Byte arrays are reused by avro, so we have to make a copy of them. case FIXED => (item: Any) => if (item == null) { null diff --git a/src/test/resources/users.avro b/src/test/resources/users.avro index 32feb0ef4f4c8b28c8720eb44bc4879c2bf1afc0..95050de4973cbdc9de10cf70e1dec70a62e28cc9 100644 GIT binary patch literal 513 zcmeZI%3@>^ODrqO*DFrWNX<=r!&$;xw>YB)uh>xv9k^K(` zq$X$PCW4%Y!?hUBfNLlK23BTqW`3TMm60jXdqAfG!^aZIaHxOKOoq6wHnx^w*AwQ( zyuBXczOSZk`8CBooMiy}QxMPtlT7!3$`Xq+ v$}{sa85%AgelrQEY|`ZB$)QY27Um}A#>Qr5rY3r(#%AWGW=wmS(X9ahHAk+v literal 290 zcmeZI%3@>^ODrqO*DFrWNX<>$!&I$QQdy9yWTjM;nw(#hqNJmgmzWFUm!uY#0C{Pd zsW~adN>fsH90dk5u~C3 zXnbaIW`3TMm9Yg-15gdn5`$VDxUoraV`0YC#?~@0G=DlOzw_s=)|^{&_0!J1XPL#w iWM*i96YwQxB<7_QB{Er=;sXMS#Tl99nR%J$<^uptwO;N3 diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 038a2839..795aed3a 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -16,6 +16,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterAll, FunSuite} +import java.sql.Date class AvroSuite extends FunSuite with BeforeAndAfterAll { val episodesFile = "src/test/resources/episodes.avro" @@ -446,13 +447,26 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { test("Logical Types") { val df = sqlContext.read.avro(userFile) - val decimals = df.select("a").collect().map(x => Decimal.apply(x.getDecimal(0))) - val a = Decimal.apply(BigDecimal.apply("100000000000000000000000000"), 38, 0) - val b = Decimal.apply(BigDecimal.apply("55555555555555555555555555555"), 38, 0) + + val decimals = df.select("decimal").collect().map(x => Decimal.apply(x.getDecimal(0))) + val dec1 = Decimal.apply(BigDecimal.apply("55555.555550000"), 25, 9) + val dec2 = Decimal.apply(BigDecimal.apply("8747336654.536756000"), 25, 9) + + assert(decimals.apply(0).equals(dec1)) + assert(decimals.apply(1).equals(dec2)) + + assert(df.schema.apply("decimal").dataType == DecimalType(25,9)) + + + val timestamps = df.select("timestamp").collect().map(x => x.getAs[Timestamp](0)) + val t1 = new Timestamp(1460354720000l) + val t2 = new Timestamp(1462842320000l) + + assert(timestamps.apply(0).equals(t1)) + assert(timestamps.apply(1).equals(t2)) + + assert(df.schema.apply("timestamp").dataType == TimestampType) - assert(decimals.apply(0).equals(a)) - assert(decimals.apply(1).equals(b)) - assert(df.schema.apply(0).dataType == DecimalType(38,0)) } }