Skip to content

Commit d8c2827

Browse files
MaxGekkanoopj
authored andcommitted
[SPARK-51384][SQL] Support java.time.LocalTime as the external type of TimeType
### What changes were proposed in this pull request? In the PR, I propose to support `java.time.LocalTime` as the external type of new data type `TIME` introduced by apache#50103. After the changes, users can create Datasets with `TimeType` columns and collect them back as instances of `java.time.LocalTime`. For example: ```scala scala> val df = Seq(LocalTime.of(12, 15)).toDF val df: org.apache.spark.sql.DataFrame = [value: time(6)] scala> df.printSchema root |-- value: time(6) (nullable = true) scala> df.first.getAs[LocalTime](0) val res8: java.time.LocalTime = 12:15 ``` By default the external type is encoded to the `TIME` type column with precision = 6 (microseconds). ### Why are the changes needed? 1. To allow creation of TIME columns using public Scala/Java API otherwise new type is useless. 2. To be able to write tests when supporting new type in other parts of Spark SQL. ### Does this PR introduce _any_ user-facing change? Yes, in some sense since the PR allow to create `TimeType` columns using Scala/Java APIs. ### How was this patch tested? By running new tests: ``` $ build/sbt "test:testOnly *DateTimeUtilsSuite" $ build/sbt "test:testOnly *CatalystTypeConvertersSuite" $ build/sbt "test:testOnly *DatasetSuite" ``` and modified: ``` $ build/sbt "test:testOnly *DataTypeSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50153 from MaxGekk/time-localtime. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent d8585b9 commit d8c2827

File tree

19 files changed

+163
-15
lines changed

19 files changed

+163
-15
lines changed

docs/sql-ref-datatypes.md

+2
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ You can access them by doing
168168
|**TimestampType**|java.time.Instant or java.sql.Timestamp|TimestampType|
169169
|**TimestampNTZType**|java.time.LocalDateTime|TimestampNTZType|
170170
|**DateType**|java.time.LocalDate or java.sql.Date|DateType|
171+
|**TimeType**|java.time.LocalTime|TimeType|
171172
|**YearMonthIntervalType**|java.time.Period|YearMonthIntervalType|
172173
|**DayTimeIntervalType**|java.time.Duration|DayTimeIntervalType|
173174
|**ArrayType**|scala.collection.Seq|ArrayType(*elementType*, [*containsNull]*)<br/>**Note:** The default value of *containsNull* is true.|
@@ -201,6 +202,7 @@ please use factory methods provided in
201202
|**TimestampType**|java.time.Instant or java.sql.Timestamp|DataTypes.TimestampType|
202203
|**TimestampNTZType**|java.time.LocalDateTime|DataTypes.TimestampNTZType|
203204
|**DateType**|java.time.LocalDate or java.sql.Date|DataTypes.DateType|
205+
|**TimeType**|java.time.LocalTime|DataTypes.TimeType|
204206
|**YearMonthIntervalType**|java.time.Period|DataTypes.YearMonthIntervalType|
205207
|**DayTimeIntervalType**|java.time.Duration|DataTypes.DayTimeIntervalType|
206208
|**ArrayType**|java.util.List|DataTypes.createArrayType(*elementType*)<br/>**Note:** The value of *containsNull* will be true.<br/>DataTypes.createArrayType(*elementType*, *containsNull*).|

sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala

+8
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,14 @@ object Encoders {
147147
*/
148148
def INSTANT: Encoder[java.time.Instant] = STRICT_INSTANT_ENCODER
149149

150+
/**
151+
* Creates an encoder that serializes instances of the `java.time.LocalTime` class to the
152+
* internal representation of nullable Catalyst's TimeType.
153+
*
154+
* @since 4.1.0
155+
*/
156+
def LOCALTIME: Encoder[java.time.LocalTime] = LocalTimeEncoder
157+
150158
/**
151159
* An encoder for arrays of bytes.
152160
*

sql/api/src/main/scala/org/apache/spark/sql/SQLImplicits.scala

+3
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ trait EncoderImplicits extends LowPrioritySQLImplicits with Serializable {
119119
/** @since 3.0.0 */
120120
implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT
121121

122+
/** @since 4.1.0 */
123+
implicit def newLocalTimeEncoder: Encoder[java.time.LocalTime] = Encoders.LOCALTIME
124+
122125
/** @since 3.2.0 */
123126
implicit def newDurationEncoder: Encoder[java.time.Duration] = Encoders.DURATION
124127

sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
2727
import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils}
2828

2929
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
30-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, STRICT_DATE_ENCODER, STRICT_INSTANT_ENCODER, STRICT_LOCAL_DATE_ENCODER, STRICT_TIMESTAMP_ENCODER, StringEncoder, UDTEncoder, YearMonthIntervalEncoder}
30+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, STRICT_DATE_ENCODER, STRICT_INSTANT_ENCODER, STRICT_LOCAL_DATE_ENCODER, STRICT_TIMESTAMP_ENCODER, StringEncoder, UDTEncoder, YearMonthIntervalEncoder}
3131
import org.apache.spark.sql.errors.ExecutionErrors
3232
import org.apache.spark.sql.types._
3333
import org.apache.spark.util.ArrayImplicits._
@@ -89,6 +89,7 @@ object JavaTypeInference {
8989
case c: Class[_] if c == classOf[java.math.BigDecimal] => DEFAULT_JAVA_DECIMAL_ENCODER
9090
case c: Class[_] if c == classOf[java.math.BigInteger] => JavaBigIntEncoder
9191
case c: Class[_] if c == classOf[java.time.LocalDate] => STRICT_LOCAL_DATE_ENCODER
92+
case c: Class[_] if c == classOf[java.time.LocalTime] => LocalTimeEncoder
9293
case c: Class[_] if c == classOf[java.sql.Date] => STRICT_DATE_ENCODER
9394
case c: Class[_] if c == classOf[java.time.Instant] => STRICT_INSTANT_ENCODER
9495
case c: Class[_] if c == classOf[java.sql.Timestamp] => STRICT_TIMESTAMP_ENCODER

sql/api/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

+1
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ object ScalaReflection extends ScalaReflection {
330330
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) => STRICT_TIMESTAMP_ENCODER
331331
case t if isSubtype(t, localTypeOf[java.time.Instant]) => STRICT_INSTANT_ENCODER
332332
case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) => LocalDateTimeEncoder
333+
case t if isSubtype(t, localTypeOf[java.time.LocalTime]) => LocalTimeEncoder
333334
case t if isSubtype(t, localTypeOf[VariantVal]) => VariantEncoder
334335
case t if isSubtype(t, localTypeOf[Row]) => UnboundRowEncoder
335336

sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.encoders
1818

1919
import java.{sql => jsql}
2020
import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInt}
21-
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
21+
import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, Period}
2222

2323
import scala.reflect.{classTag, ClassTag}
2424

@@ -249,6 +249,7 @@ object AgnosticEncoders {
249249
case class InstantEncoder(override val lenientSerialization: Boolean)
250250
extends LeafEncoder[Instant](TimestampType)
251251
case object LocalDateTimeEncoder extends LeafEncoder[LocalDateTime](TimestampNTZType)
252+
case object LocalTimeEncoder extends LeafEncoder[LocalTime](TimeType())
252253

253254
case class SparkDecimalEncoder(dt: DecimalType) extends LeafEncoder[Decimal](dt)
254255
case class ScalaDecimalEncoder(dt: DecimalType) extends LeafEncoder[BigDecimal](dt)

sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121
import scala.reflect.classTag
2222

2323
import org.apache.spark.sql.{AnalysisException, Row}
24-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
24+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
2525
import org.apache.spark.sql.errors.{DataTypeErrorsBase, ExecutionErrors}
2626
import org.apache.spark.sql.internal.SqlApiConf
2727
import org.apache.spark.sql.types._
@@ -49,6 +49,7 @@ import org.apache.spark.util.ArrayImplicits._
4949
* TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true
5050
*
5151
* TimestampNTZType -> java.time.LocalDateTime
52+
* TimeType -> java.time.LocalTime
5253
*
5354
* DayTimeIntervalType -> java.time.Duration
5455
* YearMonthIntervalType -> java.time.Period
@@ -90,6 +91,7 @@ object RowEncoder extends DataTypeErrorsBase {
9091
case TimestampNTZType => LocalDateTimeEncoder
9192
case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient)
9293
case DateType => DateEncoder(lenient)
94+
case _: TimeType => LocalTimeEncoder
9395
case CalendarIntervalType => CalendarIntervalEncoder
9496
case _: DayTimeIntervalType => DayTimeIntervalEncoder
9597
case _: YearMonthIntervalType => YearMonthIntervalEncoder

sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala

+14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util
1919
import java.lang.invoke.{MethodHandles, MethodType}
2020
import java.sql.{Date, Timestamp}
2121
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZonedDateTime, ZoneId, ZoneOffset}
22+
import java.time.temporal.ChronoField.MICRO_OF_DAY
2223
import java.util.TimeZone
2324
import java.util.concurrent.TimeUnit.{MICROSECONDS, NANOSECONDS}
2425
import java.util.regex.Pattern
@@ -184,6 +185,19 @@ trait SparkDateTimeUtils {
184185
instantToMicros(instant)
185186
}
186187

188+
/**
189+
* Converts the local time to the number of microseconds within the day, from 0 to (24 * 60 * 60
190+
* * 1000000) - 1.
191+
*/
192+
def localTimeToMicros(localTime: LocalTime): Long = localTime.getLong(MICRO_OF_DAY)
193+
194+
/**
195+
* Converts the number of microseconds within the day to the local time.
196+
*/
197+
def microsToLocalTime(micros: Long): LocalTime = {
198+
LocalTime.ofNanoOfDay(Math.multiplyExact(micros, NANOS_PER_MICROS))
199+
}
200+
187201
/**
188202
* Converts a local date at the default JVM time zone to the number of days since 1970-01-01 in
189203
* the hybrid calendar (Julian + Gregorian) by discarding the time part. The resulted days are

sql/api/src/main/scala/org/apache/spark/sql/types/TimeType.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.errors.DataTypeErrors
3333
@Unstable
3434
case class TimeType(precision: Int) extends DatetimeType {
3535

36-
if (precision < 0 || precision > 6) {
36+
if (precision < TimeType.MIN_PRECISION || precision > TimeType.MAX_PRECISION) {
3737
throw DataTypeErrors.unsupportedTimePrecisionError(precision)
3838
}
3939

@@ -46,3 +46,11 @@ case class TimeType(precision: Int) extends DatetimeType {
4646

4747
private[spark] override def asNullable: TimeType = this
4848
}
49+
50+
object TimeType {
51+
val MIN_PRECISION: Int = 0
52+
val MICROS_PRECISION: Int = 6
53+
val MAX_PRECISION: Int = MICROS_PRECISION
54+
55+
def apply(): TimeType = new TimeType(MICROS_PRECISION)
56+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.lang.{Iterable => JavaIterable}
2121
import java.math.{BigDecimal => JavaBigDecimal}
2222
import java.math.{BigInteger => JavaBigInteger}
2323
import java.sql.{Date, Timestamp}
24-
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
24+
import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, Period}
2525
import java.util.{Map => JavaMap}
2626
import javax.annotation.Nullable
2727

@@ -71,6 +71,7 @@ object CatalystTypeConverters {
7171
case _: StringType => StringConverter
7272
case DateType if SQLConf.get.datetimeJava8ApiEnabled => LocalDateConverter
7373
case DateType => DateConverter
74+
case _: TimeType => TimeConverter
7475
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter
7576
case TimestampType => TimestampConverter
7677
case TimestampNTZType => TimestampNTZConverter
@@ -372,6 +373,18 @@ object CatalystTypeConverters {
372373
DateTimeUtils.daysToLocalDate(row.getInt(column))
373374
}
374375

376+
private object TimeConverter extends CatalystTypeConverter[LocalTime, LocalTime, Any] {
377+
override def toCatalystImpl(scalaValue: LocalTime): Long = {
378+
DateTimeUtils.localTimeToMicros(scalaValue)
379+
}
380+
override def toScala(catalystValue: Any): LocalTime = {
381+
if (catalystValue == null) null
382+
else DateTimeUtils.microsToLocalTime(catalystValue.asInstanceOf[Long])
383+
}
384+
override def toScalaImpl(row: InternalRow, column: Int): LocalTime =
385+
DateTimeUtils.microsToLocalTime(row.getLong(column))
386+
}
387+
375388
private object TimestampConverter extends CatalystTypeConverter[Any, Timestamp, Any] {
376389
override def toCatalystImpl(scalaValue: Any): Long = scalaValue match {
377390
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
@@ -558,6 +571,7 @@ object CatalystTypeConverters {
558571
case c: Char => StringConverter.toCatalyst(c.toString)
559572
case d: Date => DateConverter.toCatalyst(d)
560573
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
574+
case t: LocalTime => TimeConverter.toCatalyst(t)
561575
case t: Timestamp => TimestampConverter.toCatalyst(t)
562576
case i: Instant => InstantConverter.toCatalyst(i)
563577
case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala

+12-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
2020
import org.apache.spark.sql.catalyst.{expressions => exprs}
2121
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue}
2222
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec}
23-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
23+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
2424
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder}
2525
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, IsNull, Literal, MapKeys, MapValues, UpCast}
2626
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, UnresolvedMapObjects, WrapOption}
@@ -156,6 +156,15 @@ object DeserializerBuildHelper {
156156
returnNullable = false)
157157
}
158158

159+
def createDeserializerForLocalTime(path: Expression): Expression = {
160+
StaticInvoke(
161+
DateTimeUtils.getClass,
162+
ObjectType(classOf[java.time.LocalTime]),
163+
"microsToLocalTime",
164+
path :: Nil,
165+
returnNullable = false)
166+
}
167+
159168
def createDeserializerForJavaBigDecimal(
160169
path: Expression,
161170
returnNullable: Boolean): Expression = {
@@ -314,6 +323,8 @@ object DeserializerBuildHelper {
314323
createDeserializerForInstant(path)
315324
case LocalDateTimeEncoder =>
316325
createDeserializerForLocalDateTime(path)
326+
case LocalTimeEncoder =>
327+
createDeserializerForLocalTime(path)
317328
case UDTEncoder(udt, udtClass) =>
318329
val obj = NewInstance(udtClass, Nil, ObjectType(udtClass))
319330
Invoke(obj, "deserialize", ObjectType(udt.userClass), path :: Nil)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala

+11-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.language.existentials
2222
import org.apache.spark.sql.catalyst.{expressions => exprs}
2323
import org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
2424
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec}
25-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
25+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
2626
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor}
2727
import org.apache.spark.sql.catalyst.expressions.{BoundReference, CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, UnsafeArrayData}
2828
import org.apache.spark.sql.catalyst.expressions.objects._
@@ -99,6 +99,15 @@ object SerializerBuildHelper {
9999
returnNullable = false)
100100
}
101101

102+
def createSerializerForLocalTime(inputObject: Expression): Expression = {
103+
StaticInvoke(
104+
DateTimeUtils.getClass,
105+
TimeType(),
106+
"localTimeToMicros",
107+
inputObject :: Nil,
108+
returnNullable = false)
109+
}
110+
102111
def createSerializerForScalaEnum(inputObject: Expression): Expression = {
103112
createSerializerForString(
104113
Invoke(
@@ -334,6 +343,7 @@ object SerializerBuildHelper {
334343
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
335344
case InstantEncoder(false) => createSerializerForJavaInstant(input)
336345
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
346+
case LocalTimeEncoder => createSerializerForLocalTime(input)
337347
case UDTEncoder(udt, udtClass) => createSerializerForUserDefinedType(input, udt, udtClass)
338348
case OptionEncoder(valueEnc) =>
339349
createSerializer(valueEnc, UnwrapOption(externalDataTypeFor(valueEnc), input))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, C
2424
import org.apache.spark.sql.catalyst.expressions.Expression
2525
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType}
2626
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
27-
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, UserDefinedType, VariantType, YearMonthIntervalType}
27+
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
2828
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
2929

3030
/**
@@ -102,6 +102,7 @@ object EncoderUtils {
102102
case _: DecimalType => classOf[Decimal]
103103
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
104104
case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType]
105+
case _: TimeType => classOf[PhysicalLongType.InternalType]
105106
case _: StringType => classOf[UTF8String]
106107
case _: StructType => classOf[InternalRow]
107108
case _: ArrayType => classOf[ArrayData]

0 commit comments

Comments
 (0)