diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index 71777906f868e..cf48cce931bc4 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -264,19 +264,29 @@ private object DateTimeFormatterHelper { toFormatter(builder, locale) } - lazy val fractionFormatter: DateTimeFormatter = { - val builder = createBuilder() - .append(DateTimeFormatter.ISO_LOCAL_DATE) - .appendLiteral(' ') + private def appendTimeBuilder(builder: DateTimeFormatterBuilder) = { + builder .appendValue(ChronoField.HOUR_OF_DAY, 2) .appendLiteral(':') .appendValue(ChronoField.MINUTE_OF_HOUR, 2) .appendLiteral(':') .appendValue(ChronoField.SECOND_OF_MINUTE, 2) .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + } + + lazy val fractionFormatter: DateTimeFormatter = { + val dateBuilder = createBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral(' ') + val builder = appendTimeBuilder(dateBuilder) toFormatter(builder, TimestampFormatter.defaultLocale) } + lazy val fractionTimeFormatter: DateTimeFormatter = { + val builder = appendTimeBuilder(createBuilder()) + toFormatter(builder, TimeFormatter.defaultLocale) + } + // SPARK-31892: The week-based date fields are rarely used and really confusing for parsing values // to datetime, especially when they are mixed with other non-week-based ones; // SPARK-31879: It's also difficult for us to restore the behavior of week-based date fields diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala new file mode 100644 index 0000000000000..de08ff7fea780 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time.LocalTime +import java.time.format.DateTimeFormatter +import java.util.Locale + +import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._ + +sealed trait TimeFormatter extends Serializable { + def parse(s: String): Long // returns microseconds since midnight + + def format(localTime: LocalTime): String + // Converts microseconds since the midnight to time string + def format(micros: Long): String + + def validatePatternString(): Unit +} + +/** + * The ISO time formatter is capable of formatting and parsing the ISO-8601 extended time format. + */ +class Iso8601TimeFormatter(pattern: String, locale: Locale, isParsing: Boolean) + extends TimeFormatter + with DateTimeFormatterHelper { + + @transient + protected lazy val formatter: DateTimeFormatter = + getOrCreateFormatter(pattern, locale, isParsing) + + override def parse(s: String): Long = { + val localTime = toLocalTime(formatter.parse(s)) + localTimeToMicros(localTime) + } + + override def format(localTime: LocalTime): String = { + localTime.format(formatter) + } + + override def format(micros: Long): String = { + format(microsToLocalTime(micros)) + } + + override def validatePatternString(): Unit = { + try { + formatter + } catch checkInvalidPattern(pattern) + () + } +} + +/** + * The formatter parses/formats times according to the pattern `HH:mm:ss.[..fff..]` where + * `[..fff..]` is a fraction of second up to microsecond resolution. The formatter does not output + * trailing zeros in the fraction. For example, the time `15:00:01.123400` is formatted as the + * string `15:00:01.1234`. + */ +class FractionTimeFormatter + extends Iso8601TimeFormatter( + TimeFormatter.defaultPattern, + TimeFormatter.defaultLocale, + isParsing = false) { + + @transient + override protected lazy val formatter: DateTimeFormatter = + DateTimeFormatterHelper.fractionTimeFormatter +} + +object TimeFormatter { + + val defaultLocale: Locale = Locale.US + + val defaultPattern: String = "HH:mm:ss" + + def apply( + format: String = defaultPattern, + locale: Locale = defaultLocale, + isParsing: Boolean = false): TimeFormatter = { + val formatter = new Iso8601TimeFormatter(format, locale, isParsing) + formatter.validatePatternString() + formatter + } +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 4fcb84daf187d..15784e9762e35 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -545,7 +545,8 @@ object TimestampFormatter { val defaultLocale: Locale = Locale.US - def defaultPattern(): String = s"${DateFormatter.defaultPattern} HH:mm:ss" + def defaultPattern(): String = + s"${DateFormatter.defaultPattern} ${TimeFormatter.defaultPattern}" private def getFormatter( format: Option[String], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index 41c87dd804be1..f281c42bbe715 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.getZoneId +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, localTimeToMicros} /** * Helper functions for testing date and time functionality. @@ -112,4 +112,15 @@ object DateTimeTestUtils { result = Math.addExact(result, Math.multiplyExact(seconds, MICROS_PER_SECOND)) result } + + // Returns microseconds since midnight + def localTime( + hour: Byte = 0, + minute: Byte = 0, + sec: Byte = 0, + micros: Int = 0): Long = { + val nanos = TimeUnit.MICROSECONDS.toNanos(micros).toInt + val localTime = LocalTime.of(hour, minute, sec, nanos) + localTimeToMicros(localTime) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala new file mode 100644 index 0000000000000..ef2f41ab100f0 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time.DateTimeException + +import scala.util.Random + +import org.apache.spark.{SPARK_DOC_ROOT, SparkFunSuite, SparkRuntimeException} +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToLocalTime + +class TimeFormatterSuite extends SparkFunSuite with SQLHelper { + + test("time parsing") { + Seq( + ("12", "HH") -> 12 * 3600 * 1000000L, + ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000L, + ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000L, + ("00:00:00", "HH:mm:ss") -> 0L, + ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000L, + ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000L, + ("00:00:00.000000", "HH:mm:ss.SSSSSS") -> 0L, + ("12:34:56.789012", "HH:mm:ss.SSSSSS") -> ((12 * 3600 + 34 * 60 + 56) * 1000000L + 789012), + ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) * 1000000L, + ("23:59:59.999999", "HH:mm:ss.SSSSSS") -> ((23 * 3600 + 59 * 60 + 59) * 1000000L + 999999) + ).foreach { case ((inputStr, pattern), expectedMicros) => + val formatter = TimeFormatter(format = pattern, isParsing = true) + assert(formatter.parse(inputStr) === expectedMicros) + } + } + + test("time strings do not match to the pattern") { + def assertError(str: String, expectedMsg: String): Unit = { + val e = intercept[DateTimeException](TimeFormatter().parse(str)) + assert(e.getMessage.contains(expectedMsg)) + } + + assertError("11.12.13", "Text '11.12.13' could not be parsed") + assertError("25:00:00", "Text '25:00:00' could not be parsed: Invalid value") + } + + test("time formatting") { + Seq( + (12 * 3600 * 1000000L, "HH") -> "12", + ((1 * 3600 + 2 * 60) * 1000000L, "HH:mm") -> "01:02", + ((10 * 3600 + 20 * 60) * 1000000L, "HH:mm") -> "10:20", + (0L, "HH:mm:ss") -> "00:00:00", + ((1 * 3600 + 2 * 60 + 3) * 1000000L, "HH:mm:ss") -> "01:02:03", + ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss") -> "23:59:59", + (0L, "HH:mm:ss.SSSSSS") -> "00:00:00.000000", + ((12 * 3600 + 34 * 60 + 56) * 1000000L + 789012, "HH:mm:ss.SSSSSS") -> "12:34:56.789012", + ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss.SSSSSS") -> "23:59:59.000000", + ((23 * 3600 + 59 * 60 + 59) * 1000000L + 999999, "HH:mm:ss.SSSSSS") -> "23:59:59.999999" + ).foreach { case ((micros, pattern), expectedStr) => + val formatter = TimeFormatter(format = pattern) + assert(formatter.format(micros) === expectedStr) + } + } + + test("micros are out of supported range") { + def assertError(micros: Long, expectedMsg: String): Unit = { + val e = intercept[DateTimeException](TimeFormatter().format(micros)) + assert(e.getMessage.contains(expectedMsg)) + } + + assertError(-1, "Invalid value for NanoOfDay (valid values 0 - 86399999999999): -1000") + assertError(25L * 3600 * 1000 * 1000, + "Invalid value for NanoOfDay (valid values 0 - 86399999999999): 90000000000000") + } + + test("invalid pattern") { + Seq("hHH:mmm:s", "kkk", "GGGGGG").foreach { invalidPattern => + checkError( + exception = intercept[SparkRuntimeException] { + TimeFormatter(invalidPattern) + }, + condition = "_LEGACY_ERROR_TEMP_2130", + parameters = Map( + "pattern" -> s"'$invalidPattern'", + "docroot" -> SPARK_DOC_ROOT)) + } + } + + test("round trip with the default pattern: format -> parse") { + val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 * 1000000L) } + val pattern = "HH:mm:ss.SSSSSS" + val (formatter, parser) = + (TimeFormatter(pattern), TimeFormatter(pattern, isParsing = true)) + data.foreach { micros => + val str = formatter.format(micros) + assert(parser.parse(str) === micros, s"micros = $micros") + assert(formatter.format(microsToLocalTime(micros)) === str) + } + } + + test("format fraction of second") { + val formatter = new FractionTimeFormatter() + Seq( + 0 -> "00:00:00", + 1 -> "00:00:00.000001", + 1000 -> "00:00:00.001", + 900000 -> "00:00:00.9", + 1000000 -> "00:00:01").foreach { case (micros, tsStr) => + assert(formatter.format(micros) === tsStr) + assert(formatter.format(microsToLocalTime(micros)) === tsStr) + } + } + + test("missing am/pm field") { + Seq("HH", "hh", "KK", "kk").foreach { hour => + val formatter = TimeFormatter(format = s"$hour:mm:ss", isParsing = true) + val micros = formatter.parse("11:30:01") + assert(micros === localTime(11, 30, 1)) + } + } + + test("missing hour field") { + val f1 = TimeFormatter(format = "mm:ss a", isParsing = true) + val t1 = f1.parse("30:01 PM") + assert(t1 === localTime(12, 30, 1)) + val t2 = f1.parse("30:01 AM") + assert(t2 === localTime(0, 30, 1)) + val f2 = TimeFormatter(format = "mm:ss", isParsing = true) + val t3 = f2.parse("30:01") + assert(t3 === localTime(0, 30, 1)) + val f3 = TimeFormatter(format = "a", isParsing = true) + val t4 = f3.parse("PM") + assert(t4 === localTime(12)) + val t5 = f3.parse("AM") + assert(t5 === localTime()) + } +}