Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51384][SQL] Support java.time.LocalTime as the external type of TimeType #50153

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-ref-datatypes.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ You can access them by doing
|**TimestampType**|java.time.Instant or java.sql.Timestamp|TimestampType|
|**TimestampNTZType**|java.time.LocalDateTime|TimestampNTZType|
|**DateType**|java.time.LocalDate or java.sql.Date|DateType|
|**TimeType**|java.time.LocalTime|TimeType|
|**YearMonthIntervalType**|java.time.Period|YearMonthIntervalType|
|**DayTimeIntervalType**|java.time.Duration|DayTimeIntervalType|
|**ArrayType**|scala.collection.Seq|ArrayType(*elementType*, [*containsNull]*)<br/>**Note:** The default value of *containsNull* is true.|
Expand Down Expand Up @@ -201,6 +202,7 @@ please use factory methods provided in
|**TimestampType**|java.time.Instant or java.sql.Timestamp|DataTypes.TimestampType|
|**TimestampNTZType**|java.time.LocalDateTime|DataTypes.TimestampNTZType|
|**DateType**|java.time.LocalDate or java.sql.Date|DataTypes.DateType|
|**TimeType**|java.time.LocalTime|DataTypes.TimeType|
|**YearMonthIntervalType**|java.time.Period|DataTypes.YearMonthIntervalType|
|**DayTimeIntervalType**|java.time.Duration|DataTypes.DayTimeIntervalType|
|**ArrayType**|java.util.List|DataTypes.createArrayType(*elementType*)<br/>**Note:** The value of *containsNull* will be true.<br/>DataTypes.createArrayType(*elementType*, *containsNull*).|
Expand Down
8 changes: 8 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ object Encoders {
*/
def INSTANT: Encoder[java.time.Instant] = STRICT_INSTANT_ENCODER

/**
* Creates an encoder that serializes instances of the `java.time.LocalTime` class to the
* internal representation of nullable Catalyst's TimeType.
*
* @since 4.1.0
*/
def LOCALTIME: Encoder[java.time.LocalTime] = LocalTimeEncoder

/**
* An encoder for arrays of bytes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ trait EncoderImplicits extends LowPrioritySQLImplicits with Serializable {
/** @since 3.0.0 */
implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT

/** @since 4.1.0 */
implicit def newLocalTimeEncoder: Encoder[java.time.LocalTime] = Encoders.LOCALTIME

/** @since 3.2.0 */
implicit def newDurationEncoder: Encoder[java.time.Duration] = Encoders.DURATION

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.reflect.ClassTag
import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils}

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, STRICT_DATE_ENCODER, STRICT_INSTANT_ENCODER, STRICT_LOCAL_DATE_ENCODER, STRICT_TIMESTAMP_ENCODER, StringEncoder, UDTEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, STRICT_DATE_ENCODER, STRICT_INSTANT_ENCODER, STRICT_LOCAL_DATE_ENCODER, STRICT_TIMESTAMP_ENCODER, StringEncoder, UDTEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -89,6 +89,7 @@ object JavaTypeInference {
case c: Class[_] if c == classOf[java.math.BigDecimal] => DEFAULT_JAVA_DECIMAL_ENCODER
case c: Class[_] if c == classOf[java.math.BigInteger] => JavaBigIntEncoder
case c: Class[_] if c == classOf[java.time.LocalDate] => STRICT_LOCAL_DATE_ENCODER
case c: Class[_] if c == classOf[java.time.LocalTime] => LocalTimeEncoder
case c: Class[_] if c == classOf[java.sql.Date] => STRICT_DATE_ENCODER
case c: Class[_] if c == classOf[java.time.Instant] => STRICT_INSTANT_ENCODER
case c: Class[_] if c == classOf[java.sql.Timestamp] => STRICT_TIMESTAMP_ENCODER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ object ScalaReflection extends ScalaReflection {
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) => STRICT_TIMESTAMP_ENCODER
case t if isSubtype(t, localTypeOf[java.time.Instant]) => STRICT_INSTANT_ENCODER
case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) => LocalDateTimeEncoder
case t if isSubtype(t, localTypeOf[java.time.LocalTime]) => LocalTimeEncoder
case t if isSubtype(t, localTypeOf[VariantVal]) => VariantEncoder
case t if isSubtype(t, localTypeOf[Row]) => UnboundRowEncoder

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.encoders

import java.{sql => jsql}
import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInt}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, Period}

import scala.reflect.{classTag, ClassTag}

Expand Down Expand Up @@ -249,6 +249,7 @@ object AgnosticEncoders {
case class InstantEncoder(override val lenientSerialization: Boolean)
extends LeafEncoder[Instant](TimestampType)
case object LocalDateTimeEncoder extends LeafEncoder[LocalDateTime](TimestampNTZType)
case object LocalTimeEncoder extends LeafEncoder[LocalTime](TimeType())

case class SparkDecimalEncoder(dt: DecimalType) extends LeafEncoder[Decimal](dt)
case class ScalaDecimalEncoder(dt: DecimalType) extends LeafEncoder[BigDecimal](dt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.reflect.classTag

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.errors.{DataTypeErrorsBase, ExecutionErrors}
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.util.ArrayImplicits._
* TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true
*
* TimestampNTZType -> java.time.LocalDateTime
* TimeType -> java.time.LocalTime
*
* DayTimeIntervalType -> java.time.Duration
* YearMonthIntervalType -> java.time.Period
Expand Down Expand Up @@ -90,6 +91,7 @@ object RowEncoder extends DataTypeErrorsBase {
case TimestampNTZType => LocalDateTimeEncoder
case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient)
case DateType => DateEncoder(lenient)
case _: TimeType => LocalTimeEncoder
case CalendarIntervalType => CalendarIntervalEncoder
case _: DayTimeIntervalType => DayTimeIntervalEncoder
case _: YearMonthIntervalType => YearMonthIntervalEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util
import java.lang.invoke.{MethodHandles, MethodType}
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZonedDateTime, ZoneId, ZoneOffset}
import java.time.temporal.ChronoField.MICRO_OF_DAY
import java.util.TimeZone
import java.util.concurrent.TimeUnit.{MICROSECONDS, NANOSECONDS}
import java.util.regex.Pattern
Expand Down Expand Up @@ -184,6 +185,19 @@ trait SparkDateTimeUtils {
instantToMicros(instant)
}

/**
* Converts the local time to the number of microseconds within the day, from 0 to (24 * 60 * 60
* * 1000000) - 1.
*/
def localTimeToMicros(localTime: LocalTime): Long = localTime.getLong(MICRO_OF_DAY)

/**
* Converts the number of microseconds within the day to the local time.
*/
def microsToLocalTime(micros: Long): LocalTime = {
LocalTime.ofNanoOfDay(Math.multiplyExact(micros, NANOS_PER_MICROS))
}

/**
* Converts a local date at the default JVM time zone to the number of days since 1970-01-01 in
* the hybrid calendar (Julian + Gregorian) by discarding the time part. The resulted days are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.errors.DataTypeErrors
@Unstable
case class TimeType(precision: Int) extends DatetimeType {

if (precision < 0 || precision > 6) {
if (precision < TimeType.MIN_PRECISION || precision > TimeType.MAX_PRECISION) {
throw DataTypeErrors.unsupportedTimePrecisionError(precision)
}

Expand All @@ -46,3 +46,11 @@ case class TimeType(precision: Int) extends DatetimeType {

private[spark] override def asNullable: TimeType = this
}

object TimeType {
val MIN_PRECISION: Int = 0
val MICROS_PRECISION: Int = 6
val MAX_PRECISION: Int = MICROS_PRECISION

def apply(): TimeType = new TimeType(MICROS_PRECISION)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.math.{BigInteger => JavaBigInteger}
import java.sql.{Date, Timestamp}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, Period}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

Expand Down Expand Up @@ -71,6 +71,7 @@ object CatalystTypeConverters {
case _: StringType => StringConverter
case DateType if SQLConf.get.datetimeJava8ApiEnabled => LocalDateConverter
case DateType => DateConverter
case _: TimeType => TimeConverter
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter
case TimestampType => TimestampConverter
case TimestampNTZType => TimestampNTZConverter
Expand Down Expand Up @@ -372,6 +373,18 @@ object CatalystTypeConverters {
DateTimeUtils.daysToLocalDate(row.getInt(column))
}

private object TimeConverter extends CatalystTypeConverter[LocalTime, LocalTime, Any] {
override def toCatalystImpl(scalaValue: LocalTime): Long = {
DateTimeUtils.localTimeToMicros(scalaValue)
}
override def toScala(catalystValue: Any): LocalTime = {
if (catalystValue == null) null
else DateTimeUtils.microsToLocalTime(catalystValue.asInstanceOf[Long])
}
override def toScalaImpl(row: InternalRow, column: Int): LocalTime =
DateTimeUtils.microsToLocalTime(row.getLong(column))
}

private object TimestampConverter extends CatalystTypeConverter[Any, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Any): Long = scalaValue match {
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
Expand Down Expand Up @@ -558,6 +571,7 @@ object CatalystTypeConverters {
case c: Char => StringConverter.toCatalyst(c.toString)
case d: Date => DateConverter.toCatalyst(d)
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: LocalTime => TimeConverter.toCatalyst(t)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case i: Instant => InstantConverter.toCatalyst(i)
case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{expressions => exprs}
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder}
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, IsNull, Literal, MapKeys, MapValues, UpCast}
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, UnresolvedMapObjects, WrapOption}
Expand Down Expand Up @@ -156,6 +156,15 @@ object DeserializerBuildHelper {
returnNullable = false)
}

def createDeserializerForLocalTime(path: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
ObjectType(classOf[java.time.LocalTime]),
"microsToLocalTime",
path :: Nil,
returnNullable = false)
}

def createDeserializerForJavaBigDecimal(
path: Expression,
returnNullable: Boolean): Expression = {
Expand Down Expand Up @@ -314,6 +323,8 @@ object DeserializerBuildHelper {
createDeserializerForInstant(path)
case LocalDateTimeEncoder =>
createDeserializerForLocalDateTime(path)
case LocalTimeEncoder =>
createDeserializerForLocalTime(path)
case UDTEncoder(udt, udtClass) =>
val obj = NewInstance(udtClass, Nil, ObjectType(udtClass))
Invoke(obj, "deserialize", ObjectType(udt.userClass), path :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.language.existentials
import org.apache.spark.sql.catalyst.{expressions => exprs}
import org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, UnsafeArrayData}
import org.apache.spark.sql.catalyst.expressions.objects._
Expand Down Expand Up @@ -99,6 +99,15 @@ object SerializerBuildHelper {
returnNullable = false)
}

def createSerializerForLocalTime(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
TimeType(),
"localTimeToMicros",
inputObject :: Nil,
returnNullable = false)
}

def createSerializerForScalaEnum(inputObject: Expression): Expression = {
createSerializerForString(
Invoke(
Expand Down Expand Up @@ -334,6 +343,7 @@ object SerializerBuildHelper {
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
case InstantEncoder(false) => createSerializerForJavaInstant(input)
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
case LocalTimeEncoder => createSerializerForLocalTime(input)
case UDTEncoder(udt, udtClass) => createSerializerForUserDefinedType(input, udt, udtClass)
case OptionEncoder(valueEnc) =>
createSerializer(valueEnc, UnwrapOption(externalDataTypeFor(valueEnc), input))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, C
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}

/**
Expand Down Expand Up @@ -102,6 +102,7 @@ object EncoderUtils {
case _: DecimalType => classOf[Decimal]
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType]
case _: TimeType => classOf[PhysicalLongType.InternalType]
case _: StringType => classOf[UTF8String]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
Expand Down
Loading