diff --git a/Cargo.lock b/Cargo.lock index cdfd97abcd166..67f9c1cf47bbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2579,6 +2579,7 @@ dependencies = [ "serde_json", "sha1 0.11.0", "sha2", + "tokio", "twox-hash", "url", ] diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 09347d6d7dc2c..800e33f645e1b 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -28,7 +28,7 @@ use crate::{ }; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; -use datafusion::config::ConfigFileType; +use datafusion::config::{ConfigFileType, Dialect}; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::memory_pool::MemoryConsumer; @@ -223,9 +223,8 @@ pub(super) async fn exec_and_print( let dialect = &options.sql_parser.dialect; let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; @@ -613,9 +612,8 @@ mod tests { let dialect = &task_ctx.session_config().options().sql_parser.dialect; let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; for location in locations { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ab1405054cab1..4025157cef75d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -279,7 +279,7 @@ config_namespace! { pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, - /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. + /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks and Spark. pub dialect: Dialect, default = Dialect::Generic // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive @@ -342,6 +342,13 @@ pub enum Dialect { Ansi, DuckDB, Databricks, + Spark, +} + +impl Dialect { + /// List of all supported dialect names, for use in error messages. + pub const AVAILABLE: &'static str = "Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ + MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks, Spark"; } impl AsRef for Dialect { @@ -360,6 +367,7 @@ impl AsRef for Dialect { Self::Ansi => "ansi", Self::DuckDB => "duckdb", Self::Databricks => "databricks", + Self::Spark => "spark", } } } @@ -382,11 +390,12 @@ impl FromStr for Dialect { "ansi" => Self::Ansi, "duckdb" => Self::DuckDB, "databricks" => Self::Databricks, + "spark" | "sparksql" => Self::Spark, other => { - let error_message = format!( - "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks" - ); - return Err(DataFusionError::Configuration(error_message)); + return Err(DataFusionError::Configuration(format!( + "Invalid Dialect: {other}. Expected one of: {}", + Self::AVAILABLE + ))); } }; Ok(value) @@ -4161,6 +4170,18 @@ mod tests { assert_eq!(cdc.norm_level, 0); } + #[test] + fn test_dialect_spark_roundtrip() { + use crate::config::Dialect; + use std::str::FromStr; + + assert_eq!(Dialect::from_str("spark").unwrap(), Dialect::Spark); + assert_eq!(Dialect::from_str("sparksql").unwrap(), Dialect::Spark); + assert_eq!(Dialect::from_str("SPARK").unwrap(), Dialect::Spark); + assert_eq!(Dialect::Spark.as_ref(), "spark"); + assert_eq!(Dialect::Spark.to_string(), "spark"); + } + #[test] fn max_row_group_bytes_rejects_zero() { use crate::config::MaxRowGroupBytes; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ed2ea27cf4aa6..dfd1eea709215 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -439,9 +439,8 @@ impl SessionState { ) -> datafusion_common::Result { let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; @@ -488,9 +487,8 @@ impl SessionState { ) -> datafusion_common::Result { let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 14f9396d7656e..93987b553f2f5 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -71,7 +71,8 @@ url = { workspace = true } arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } # for SessionStateBuilderSpark tests -datafusion = { workspace = true, default-features = false } +datafusion = { workspace = true, default-features = false, features = ["sql"] } +tokio = { workspace = true, features = ["rt"] } [[bench]] harness = false diff --git a/datafusion/spark/src/session_state.rs b/datafusion/spark/src/session_state.rs index e39de3a5888ea..839487772a9b2 100644 --- a/datafusion/spark/src/session_state.rs +++ b/datafusion/spark/src/session_state.rs @@ -88,6 +88,9 @@ impl SessionStateBuilderSpark for SessionStateBuilder { #[cfg(test)] mod tests { use super::*; + use datafusion::common::config::Dialect; + use datafusion::prelude::SessionConfig; + use datafusion::prelude::SessionContext; #[test] fn test_session_state_with_spark_features() { @@ -108,4 +111,37 @@ mod tests { "Apache Spark expr planners should be registered" ); } + + #[tokio::test] + async fn test_spark_dialect_with_spark_functions() { + let query = "SELECT sha2('abc', 256), CAST(1 AS LONG)"; + + let mut config = SessionConfig::new(); + config.options_mut().sql_parser.dialect = Dialect::Spark; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .with_spark_features() + .build(); + let ctx = SessionContext::new_with_state(state); + + let result = ctx.sql(query).await.unwrap().collect().await.unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result[0].num_rows(), 1); + + let mut config = SessionConfig::new(); + config.options_mut().sql_parser.dialect = Dialect::Generic; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .with_spark_features() + .build(); + let ctx = SessionContext::new_with_state(state); + + let err = ctx.sql(query).await.unwrap_err().to_string(); + assert!( + err.contains("Unsupported SQL type LONG"), + "unexpected error: {err}" + ); + } } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index f9b7663108f92..8d437271fee86 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -36,6 +36,7 @@ use arrow::record_batch::RecordBatch; use datafusion::catalog::{ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, Session, }; +use datafusion::common::config::Dialect; use datafusion::common::{DataFusionError, Result, not_impl_err}; use datafusion::functions::math::abs; use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; @@ -151,6 +152,9 @@ impl TestContext { if is_spark_path(relative_path) { state_builder = state_builder.with_spark_features(); + if let Some(config) = state_builder.config() { + config.options_mut().sql_parser.dialect = Dialect::Spark; + } } if matches!( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 370492c2eb8ce..840bff6ea63ff 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -505,7 +505,7 @@ datafusion.runtime.temp_directory NULL The path to the temporary file directory. datafusion.spark.map_key_dedup_policy EXCEPTION Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. datafusion.sql_parser.collect_spans false When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. datafusion.sql_parser.default_null_ordering nulls_max Specifies the default null ordering for query results. There are 4 options: - `nulls_max`: Nulls appear last in ascending order. - `nulls_min`: Nulls appear first in ascending order. - `nulls_first`: Nulls always be first in any order. - `nulls_last`: Nulls always be last in any order. By default, `nulls_max` is used to follow Postgres's behavior. postgres rule: -datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. +datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks and Spark. datafusion.sql_parser.enable_ident_normalization true When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) datafusion.sql_parser.enable_options_value_normalization false When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. datafusion.sql_parser.enable_subquery_sort_elimination true When set to true, DataFusion may remove `ORDER BY` clauses from subqueries or CTEs during SQL planning when their ordering cannot affect the result, such as when no `LIMIT` or other order-sensitive operator depends on them. Disable this option to preserve explicit subquery ordering in the planned query. diff --git a/datafusion/sqllogictest/test_files/spark/collection/size.slt b/datafusion/sqllogictest/test_files/spark/collection/size.slt index 106760eebfe42..b9c445f4e6805 100644 --- a/datafusion/sqllogictest/test_files/spark/collection/size.slt +++ b/datafusion/sqllogictest/test_files/spark/collection/size.slt @@ -84,7 +84,7 @@ SELECT size(make_array(1, NULL, 3)); # NULL array returns -1 (Spark behavior) query I -SELECT size(NULL::int[]); +SELECT size(CAST(NULL AS ARRAY)); ---- -1 diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index aa90cca10b14d..c71b2ccd6b801 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -932,3 +932,8 @@ match register_function { RegisterFunction::Table(name, table) => {}, } ``` + +### New `Dialect::Spark` variant + +The `Dialect` enum in `datafusion_common::config` now includes a `Spark` variant. +If you match exhaustively on `Dialect`, add a `Dialect::Spark` arm. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 442b72ea9bc08..cc679549de89a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -192,7 +192,7 @@ The following configuration settings are available: | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks and Spark. | | datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. | | datafusion.sql_parser.map_string_types_to_utf8view | true | If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning. If false, they are mapped to `Utf8`. Default is true. | | datafusion.sql_parser.collect_spans | false | When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. |