Skip to content

Commit 3c7238f

Browse files
committed
cargo fmt
1 parent 953cbc4 commit 3c7238f

File tree

1 file changed

+82
-151
lines changed

1 file changed

+82
-151
lines changed

datafusion/functions/src/datetime/date_part.rs

Lines changed: 82 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,23 @@ use arrow::datatypes::DataType::{
2727
Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
2828
};
2929
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
30-
use arrow::datatypes::{
31-
ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType,
32-
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
33-
};
34-
30+
use arrow::datatypes::{ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType};
31+
use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
3532
use datafusion_common::cast::as_primitive_array;
3633
use datafusion_common::types::{logical_date, NativeType};
34+
use std::ops::Add;
3735

38-
use super::adjust_to_local_time;
3936
use datafusion_common::{
4037
cast::{
4138
as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array,
4239
as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
4340
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
4441
as_timestamp_nanosecond_array, as_timestamp_second_array,
4542
},
46-
exec_err, internal_err, not_impl_err,
43+
exec_err, internal_datafusion_err, internal_err, not_impl_err,
4744
types::logical_string,
4845
utils::take_function_args,
49-
DataFusionError, Result, ScalarValue,
46+
Result, ScalarValue,
5047
};
5148
use datafusion_expr::{
5249
ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
@@ -131,7 +128,7 @@ impl DatePartFunc {
131128
],
132129
Volatility::Immutable,
133130
),
134-
aliases: vec![String::from("datepart"), String::from("extract")],
131+
aliases: vec![String::from("datepart")],
135132
}
136133
}
137134
}
@@ -202,17 +199,12 @@ impl ScalarUDFImpl for DatePartFunc {
202199
};
203200

204201
let (is_timezone_aware, tz_str_opt) = match array.data_type() {
205-
Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
202+
Timestamp(_, Some(tz_str)) => (true, Some(tz_str.clone())),
206203
_ => (false, None),
207204
};
208205

209-
let part_trim = part_normalization(&part);
210-
let is_epoch = is_epoch(&part);
211-
212-
// Epoch is timezone-independent - it always returns seconds since 1970-01-01 UTC
213-
let array = if is_epoch {
214-
array
215-
} else if is_timezone_aware {
206+
// Adjust timestamps for extraction
207+
let array = if is_timezone_aware {
216208
// For timezone-aware timestamps, extract in their own timezone
217209
let tz_str = tz_str_opt.as_ref().unwrap();
218210
let tz = match tz_str.parse::<Tz>() {
@@ -231,14 +223,16 @@ impl ScalarUDFImpl for DatePartFunc {
231223
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
232224
}
233225
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
226+
_ => array,
234227
},
235228
_ => array,
236229
}
237230
} else if let Timestamp(time_unit, None) = array.data_type() {
238231
// For naive timestamps, interpret in session timezone
239-
let tz: Tz = config.execution.time_zone.as_str().parse().map_err(|_| {
240-
DataFusionError::Execution("Invalid timezone".to_string())
241-
})?;
232+
let tz = match config.execution.time_zone.parse::<Tz>() {
233+
Ok(tz) => tz,
234+
Err(_) => return exec_err!("Invalid timezone"),
235+
};
242236
match time_unit {
243237
Nanosecond => {
244238
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
@@ -250,15 +244,18 @@ impl ScalarUDFImpl for DatePartFunc {
250244
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
251245
}
252246
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
247+
_ => array,
253248
}
254249
} else {
255250
array
256251
};
257252

253+
let part_trim = part_normalization(&part);
254+
258255
// using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds")
259256
// and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
260-
let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
261-
let extracted = match interval_unit {
257+
let mut arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
258+
match interval_unit {
262259
IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
263260
IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
264261
IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
@@ -269,39 +266,8 @@ impl ScalarUDFImpl for DatePartFunc {
269266
IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
270267
IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
271268
IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
269+
// century and decade are not supported by `DatePart`
272270
_ => return exec_err!("Date part '{part}' not supported"),
273-
};
274-
275-
// For fixed offsets (like +04:00, -05:30), apply the offset to extract values.
276-
// Named timezones (like 'America/New_York') are handled by adjust_to_local_time
277-
// and DST is already applied via chrono.
278-
if is_timezone_aware {
279-
let tz_str = tz_str_opt.as_ref().unwrap().as_ref();
280-
if is_fixed_offset(tz_str) {
281-
if let Some(offset_info) = extract_offset_components(tz_str) {
282-
match interval_unit {
283-
IntervalUnit::Hour => apply_hour_offset(
284-
extracted.as_ref(),
285-
offset_info.hours,
286-
offset_info.minutes,
287-
)?,
288-
IntervalUnit::Minute => apply_minute_offset(
289-
extracted.as_ref(),
290-
offset_info.minutes,
291-
)?,
292-
IntervalUnit::Day => {
293-
apply_day_offset(extracted.as_ref(), offset_info.hours)?
294-
}
295-
_ => extracted,
296-
}
297-
} else {
298-
extracted
299-
}
300-
} else {
301-
extracted
302-
}
303-
} else {
304-
extracted
305271
}
306272
} else {
307273
// special cases that can be extracted (in postgres) but are not interval units
@@ -315,6 +281,8 @@ impl ScalarUDFImpl for DatePartFunc {
315281
}
316282
};
317283

284+
285+
318286
Ok(if is_scalar {
319287
ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
320288
} else {
@@ -331,6 +299,54 @@ impl ScalarUDFImpl for DatePartFunc {
331299
}
332300
}
333301

302+
fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
303+
fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
304+
where
305+
F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
306+
{
307+
match converter(ts) {
308+
MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
309+
"Ambiguous timestamp. Do you mean {:?} or {:?}",
310+
earliest,
311+
latest
312+
),
313+
MappedLocalTime::None => exec_err!(
314+
"The local time does not exist because there is a gap in the local time."
315+
),
316+
MappedLocalTime::Single(date_time) => Ok(date_time),
317+
}
318+
}
319+
320+
let date_time = match T::UNIT {
321+
Nanosecond => Utc.timestamp_nanos(ts),
322+
Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
323+
Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
324+
Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
325+
};
326+
327+
let offset_seconds: i64 = tz
328+
.offset_from_utc_datetime(&date_time.naive_utc())
329+
.fix()
330+
.local_minus_utc() as i64;
331+
332+
let adjusted_date_time = date_time.add(
333+
TimeDelta::try_seconds(offset_seconds)
334+
.ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
335+
);
336+
337+
// convert back to i64
338+
match T::UNIT {
339+
Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| {
340+
internal_datafusion_err!(
341+
"Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
342+
)
343+
}),
344+
Microsecond => Ok(adjusted_date_time.timestamp_micros()),
345+
Millisecond => Ok(adjusted_date_time.timestamp_millis()),
346+
Second => Ok(adjusted_date_time.timestamp()),
347+
}
348+
}
349+
334350
fn adjust_timestamp_array<T: ArrowTimestampType>(
335351
array: &ArrayRef,
336352
tz: Tz,
@@ -354,108 +370,18 @@ fn is_epoch(part: &str) -> bool {
354370
matches!(part.to_lowercase().as_str(), "epoch")
355371
}
356372

357-
// Check if a timezone string is a fixed offset
358-
fn is_fixed_offset(tz_str: &str) -> bool {
359-
tz_str.starts_with('+') || tz_str.starts_with('-')
360-
}
361-
362-
// Holds the components of a timezone offset (hours and minutes).
363-
struct OffsetInfo {
364-
hours: i32,
365-
minutes: i32,
366-
}
367-
368-
// Extracts the offset components from a timezone string like "+04:00" or "-05:30".
369-
fn extract_offset_components(tz_str: &str) -> Option<OffsetInfo> {
370-
if tz_str.len() < 6 {
371-
return None;
372-
}
373-
374-
let sign = match &tz_str[0..1] {
375-
"+" => 1,
376-
"-" => -1,
377-
_ => return None,
378-
};
379-
380-
let hours: i32 = tz_str[1..3].parse().ok()?;
381-
let minutes: i32 = tz_str[4..6].parse().ok()?;
382-
383-
Some(OffsetInfo {
384-
hours: sign * hours,
385-
minutes: sign * minutes,
386-
})
387-
}
388-
389-
// Applies the timezone offset to hour values in an array.
390-
fn apply_hour_offset(
391-
array: &dyn Array,
392-
offset_hours: i32,
393-
offset_minutes: i32,
394-
) -> Result<ArrayRef> {
395-
let hour_array = as_int32_array(array)?;
396-
let result: Int32Array = hour_array
397-
.iter()
398-
.map(|hour| {
399-
hour.map(|h| {
400-
let mut adjusted = h + offset_hours;
401-
if offset_minutes.abs() >= 30 {
402-
adjusted += if offset_minutes > 0 { 1 } else { -1 };
403-
}
404-
((adjusted % 24) + 24) % 24
405-
})
406-
})
407-
.collect();
408-
Ok(Arc::new(result))
409-
}
410-
411-
// Applies the timezone offset to minute values in an array.
412-
fn apply_minute_offset(array: &dyn Array, offset_minutes: i32) -> Result<ArrayRef> {
413-
let minute_array = as_int32_array(array)?;
414-
let result: Int32Array = minute_array
415-
.iter()
416-
.map(|minute| {
417-
minute.map(|m| {
418-
let adjusted = m + offset_minutes;
419-
((adjusted % 60) + 60) % 60
420-
})
421-
})
422-
.collect();
423-
Ok(Arc::new(result))
424-
}
425-
426-
// Applies the timezone offset to day values in an array.
427-
fn apply_day_offset(array: &dyn Array, offset_hours: i32) -> Result<ArrayRef> {
428-
let day_array = as_int32_array(array)?;
429-
let result: Int32Array = day_array
430-
.iter()
431-
.map(|day| {
432-
day.map(|d| {
433-
if offset_hours >= 24 {
434-
d + (offset_hours / 24)
435-
} else if offset_hours <= -24 {
436-
d + (offset_hours / 24)
437-
} else if offset_hours > 0 {
438-
d + 1
439-
} else if offset_hours < 0 {
440-
d - 1
441-
} else {
442-
d
443-
}
444-
})
445-
})
446-
.collect();
447-
Ok(Arc::new(result))
448-
}
449-
450-
// Try to remove quotes if they exist. If the quotes are invalid, return original string.
373+
// Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error
451374
fn part_normalization(part: &str) -> &str {
452375
part.strip_prefix(|c| c == '\'' || c == '\"')
453376
.and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
454377
.unwrap_or(part)
455378
}
456379

457-
// Converts seconds to i32 with the specified time unit.
380+
/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
381+
/// result to a total number of seconds, milliseconds, microseconds or
382+
/// nanoseconds
458383
fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
384+
// Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing
459385
// with overflow and precision issue we don't support nanosecond
460386
if unit == Nanosecond {
461387
return not_impl_err!("Date part {unit:?} not supported");
@@ -476,6 +402,7 @@ fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
476402
};
477403

478404
let secs = date_part(array, DatePart::Second)?;
405+
// This assumes array is primitive and not a dictionary
479406
let secs = as_int32_array(secs.as_ref())?;
480407
let subsecs = date_part(array, DatePart::Nanosecond)?;
481408
let subsecs = as_int32_array(subsecs.as_ref())?;
@@ -503,8 +430,11 @@ fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
503430
}
504431
}
505432

506-
// Converts seconds to f64 with the specified time unit.
507-
// Used for Interval and Duration types that need floating-point precision.
433+
/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
434+
/// result to a total number of seconds, milliseconds, microseconds or
435+
/// nanoseconds
436+
///
437+
/// Given epoch return f64, this is a duplicated function to optimize for f64 type
508438
fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
509439
let sf = match unit {
510440
Second => 1_f64,
@@ -513,6 +443,7 @@ fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
513443
Nanosecond => 1_000_000_000_f64,
514444
};
515445
let secs = date_part(array, DatePart::Second)?;
446+
// This assumes array is primitive and not a dictionary
516447
let secs = as_int32_array(secs.as_ref())?;
517448
let subsecs = date_part(array, DatePart::Nanosecond)?;
518449
let subsecs = as_int32_array(subsecs.as_ref())?;

0 commit comments

Comments
 (0)