diff --git a/Cargo.lock b/Cargo.lock index 706597123168..d30251dcb7c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1952,6 +1952,7 @@ dependencies = [ "datatypes", "geohash", "h3o", + "jsonb", "num", "num-traits", "once_cell", @@ -3167,6 +3168,7 @@ dependencies = [ "datafusion-common", "enum_dispatch", "greptime-proto", + "jsonb", "num", "num-traits", "ordered-float 3.9.2", @@ -3699,6 +3701,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fast-float" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c" + [[package]] name = "fastdivide" version = "0.4.1" @@ -4303,7 +4311,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=157cfdb52709e489cf1f3ce8e3042ed4ee8a524a#157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=973f49cde88a582fb65755cc572ebcf6fb93ccf7#973f49cde88a582fb65755cc572ebcf6fb93ccf7" dependencies = [ "prost 0.12.6", "serde", @@ -5410,6 +5418,21 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonb" +version = "0.4.1" +source = "git+https://github.com/CookiePieWw/jsonb.git?rev=d0166c130fce903bf6c58643417a3173a6172d31#d0166c130fce903bf6c58643417a3173a6172d31" +dependencies = [ + "byteorder", + "fast-float", + "itoa", + "nom", + "ordered-float 4.2.0", + "rand", + "ryu", + "serde_json", +] + [[package]] name = "jsonpath-rust" version = "0.5.1" @@ -8063,6 +8086,8 @@ dependencies = [ "chrono", "fallible-iterator", "postgres-protocol", + "serde", + "serde_json", ] [[package]] @@ -10401,6 +10426,7 @@ dependencies = [ "hyper 0.14.29", "influxdb_line_protocol", "itertools 0.10.5", + "jsonb", "lazy_static", "mime_guess", "mysql_async", @@ -10780,6 +10806,7 @@ dependencies = [ "hex", "iso8601", "itertools 0.10.5", + "jsonb", "lazy_static", "regex", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 93ea8db134a3..d412bf7e978e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,10 +120,11 @@ etcd-client = { version = "0.13" } fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "973f49cde88a582fb65755cc572ebcf6fb93ccf7" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" +jsonb = { git = "https://github.com/CookiePieWw/jsonb.git", rev = "d0166c130fce903bf6c58643417a3173a6172d31", default-features = false } lazy_static = "1.4" meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" } mockall = "0.11.4" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index d8e9c524d899..101cae880207 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -42,7 +42,8 @@ use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ - ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, QueryRequest, Row, SemanticType, + ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, QueryRequest, + Row, SemanticType, }; use paste::paste; use snafu::prelude::*; @@ -103,7 +104,17 @@ impl From for ConcreteDataType { ColumnDataType::Uint64 => ConcreteDataType::uint64_datatype(), ColumnDataType::Float32 => ConcreteDataType::float32_datatype(), ColumnDataType::Float64 => ConcreteDataType::float64_datatype(), - ColumnDataType::Binary => ConcreteDataType::binary_datatype(), + ColumnDataType::Binary => { + if let Some(TypeExt::JsonType(_)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) + { + ConcreteDataType::json_datatype() + } else { + ConcreteDataType::binary_datatype() + } + } ColumnDataType::String => ConcreteDataType::string_datatype(), ColumnDataType::Date => ConcreteDataType::date_datatype(), ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(), @@ -236,7 +247,7 @@ impl TryFrom for ColumnDataTypeWrapper { ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, ConcreteDataType::Float32(_) => ColumnDataType::Float32, ConcreteDataType::Float64(_) => ColumnDataType::Float64, - ConcreteDataType::Binary(_) => ColumnDataType::Binary, + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary, ConcreteDataType::String(_) => ColumnDataType::String, ConcreteDataType::Date(_) => ColumnDataType::Date, ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, @@ -276,6 +287,16 @@ impl TryFrom for ColumnDataTypeWrapper { })), }) } + ColumnDataType::Binary => { + if datatype == ConcreteDataType::json_datatype() { + // Json is the same as binary in proto. The extension marks the binary in proto is actually a json. + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }) + } else { + None + } + } _ => None, }; Ok(Self { @@ -649,7 +670,8 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Duration(_) => { + | ConcreteDataType::Duration(_) + | ConcreteDataType::Json(_) => { unreachable!() } } @@ -813,7 +835,8 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Duration(_) => { + | ConcreteDataType::Duration(_) + | ConcreteDataType::Json(_) => { unreachable!() } } @@ -831,7 +854,13 @@ pub fn is_column_type_value_eq( expect_type: &ConcreteDataType, ) -> bool { ColumnDataTypeWrapper::try_new(type_value, type_extension) - .map(|wrapper| ConcreteDataType::from(wrapper) == *expect_type) + .map(|wrapper| { + let datatype = ConcreteDataType::from(wrapper); + (datatype == *expect_type) + // Json type leverage binary type in pb, so this is valid. + || (datatype == ConcreteDataType::binary_datatype() + && *expect_type == ConcreteDataType::json_datatype()) + }) .unwrap_or(false) } diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 2451b2bcbdab..b2e9c5a98beb 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -29,6 +29,7 @@ datafusion.workspace = true datatypes.workspace = true geohash = { version = "0.13", optional = true } h3o = { version = "0.6", optional = true } +jsonb.workspace = true num = "0.4" num-traits = "0.2" once_cell.workspace = true diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index ed863c16aa75..46af3b761072 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -22,6 +22,7 @@ use crate::function::{AsyncFunctionRef, FunctionRef}; use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions}; use crate::scalars::date::DateFunction; use crate::scalars::expression::ExpressionFunction; +use crate::scalars::json::JsonFunction; use crate::scalars::matches::MatchesFunction; use crate::scalars::math::MathFunction; use crate::scalars::numpy::NumpyFunction; @@ -116,6 +117,9 @@ pub static FUNCTION_REGISTRY: Lazy> = Lazy::new(|| { SystemFunction::register(&function_registry); TableFunction::register(&function_registry); + // Json related functions + JsonFunction::register(&function_registry); + // Geo functions #[cfg(feature = "geo")] crate::scalars::geo::GeoFunctions::register(&function_registry); diff --git a/src/common/function/src/scalars.rs b/src/common/function/src/scalars.rs index f8dc570d1292..f60cf2b0d98b 100644 --- a/src/common/function/src/scalars.rs +++ b/src/common/function/src/scalars.rs @@ -17,9 +17,11 @@ pub(crate) mod date; pub mod expression; #[cfg(feature = "geo")] pub mod geo; +pub mod json; pub mod matches; pub mod math; pub mod numpy; + #[cfg(test)] pub(crate) mod test; pub(crate) mod timestamp; diff --git a/src/common/function/src/scalars/json.rs b/src/common/function/src/scalars/json.rs new file mode 100644 index 000000000000..3812b33f235f --- /dev/null +++ b/src/common/function/src/scalars/json.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +mod json_to_string; +mod to_json; + +use json_to_string::JsonToStringFunction; +use to_json::ToJsonFunction; + +use crate::function_registry::FunctionRegistry; + +pub(crate) struct JsonFunction; + +impl JsonFunction { + pub fn register(registry: &FunctionRegistry) { + registry.register(Arc::new(JsonToStringFunction)); + registry.register(Arc::new(ToJsonFunction)); + } +} diff --git a/src/common/function/src/scalars/json/json_to_string.rs b/src/common/function/src/scalars/json/json_to_string.rs new file mode 100644 index 000000000000..8a5e569a149c --- /dev/null +++ b/src/common/function/src/scalars/json/json_to_string.rs @@ -0,0 +1,174 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Display}; + +use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; +use common_query::prelude::Signature; +use datafusion::logical_expr::Volatility; +use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::VectorRef; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::vectors::{MutableVector, StringVectorBuilder}; +use snafu::ensure; + +use crate::function::{Function, FunctionContext}; + +/// Converts the `JSONB` into `String`. It's useful for displaying JSONB content. +#[derive(Clone, Debug, Default)] +pub struct JsonToStringFunction; + +const NAME: &str = "json_to_string"; + +impl Function for JsonToStringFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + Signature::exact( + vec![ConcreteDataType::json_datatype()], + Volatility::Immutable, + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly one, have: {}", + columns.len() + ), + } + ); + let jsons = &columns[0]; + + let size = jsons.len(); + let datatype = jsons.data_type(); + let mut results = StringVectorBuilder::with_capacity(size); + + match datatype { + // JSON data type uses binary vector + ConcreteDataType::Binary(_) => { + for i in 0..size { + let json = jsons.get_ref(i); + + let json = json.as_binary(); + let result = match json { + Ok(Some(json)) => match jsonb::from_slice(json) { + Ok(json) => { + let json = json.to_string(); + Some(json) + } + Err(_) => { + return InvalidFuncArgsSnafu { + err_msg: format!("Illegal json binary: {:?}", json), + } + .fail() + } + }, + _ => None, + }; + + results.push(result.as_deref()); + } + } + _ => { + return UnsupportedInputDataTypeSnafu { + function: NAME, + datatypes: columns.iter().map(|c| c.data_type()).collect::>(), + } + .fail(); + } + } + + Ok(results.to_vector()) + } +} + +impl Display for JsonToStringFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JSON_TO_STRING") + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::scalars::ScalarVector; + use datatypes::vectors::BinaryVector; + + use super::*; + + #[test] + fn test_get_by_path_function() { + let json_to_string = JsonToStringFunction; + + assert_eq!("json_to_string", json_to_string.name()); + assert_eq!( + ConcreteDataType::string_datatype(), + json_to_string + .return_type(&[ConcreteDataType::json_datatype()]) + .unwrap() + ); + + assert!(matches!(json_to_string.signature(), + Signature { + type_signature: TypeSignature::Exact(valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![ConcreteDataType::json_datatype()] + )); + + let json_strings = [ + r#"{"a": {"b": 2}, "b": 2, "c": 3}"#, + r#"{"a": 4, "b": {"c": 6}, "c": 6}"#, + r#"{"a": 7, "b": 8, "c": {"a": 7}}"#, + ]; + + let jsonbs = json_strings + .iter() + .map(|s| { + let value = jsonb::parse_value(s.as_bytes()).unwrap(); + value.to_vec() + }) + .collect::>(); + + let json_vector = BinaryVector::from_vec(jsonbs); + let args: Vec = vec![Arc::new(json_vector)]; + let vector = json_to_string + .eval(FunctionContext::default(), &args) + .unwrap(); + + assert_eq!(3, vector.len()); + for (i, gt) in json_strings.iter().enumerate() { + let result = vector.get_ref(i); + let result = result.as_string().unwrap().unwrap(); + // remove whitespaces + assert_eq!(gt.replace(" ", ""), result); + } + + let invalid_jsonb = vec![b"invalid json"]; + let invalid_json_vector = BinaryVector::from_vec(invalid_jsonb); + let args: Vec = vec![Arc::new(invalid_json_vector)]; + let vector = json_to_string.eval(FunctionContext::default(), &args); + assert!(vector.is_err()); + } +} diff --git a/src/common/function/src/scalars/json/to_json.rs b/src/common/function/src/scalars/json/to_json.rs new file mode 100644 index 000000000000..9c3cc90b66c8 --- /dev/null +++ b/src/common/function/src/scalars/json/to_json.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Display}; + +use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; +use common_query::prelude::Signature; +use datafusion::logical_expr::Volatility; +use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::VectorRef; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::vectors::{BinaryVectorBuilder, MutableVector}; +use snafu::ensure; + +use crate::function::{Function, FunctionContext}; + +/// Parses the `String` into `JSONB`. +#[derive(Clone, Debug, Default)] +pub struct ToJsonFunction; + +const NAME: &str = "to_json"; + +impl Function for ToJsonFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::json_datatype()) + } + + fn signature(&self) -> Signature { + Signature::exact( + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly one, have: {}", + columns.len() + ), + } + ); + let json_strings = &columns[0]; + + let size = json_strings.len(); + let datatype = json_strings.data_type(); + let mut results = BinaryVectorBuilder::with_capacity(size); + + match datatype { + ConcreteDataType::String(_) => { + for i in 0..size { + let json_string = json_strings.get_ref(i); + + let json_string = json_string.as_string(); + let result = match json_string { + Ok(Some(json_string)) => match jsonb::parse_value(json_string.as_bytes()) { + Ok(json) => Some(json.to_vec()), + Err(_) => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "Cannot convert the string to json, have: {}", + json_string + ), + } + .fail() + } + }, + _ => None, + }; + + results.push(result.as_deref()); + } + } + _ => { + return UnsupportedInputDataTypeSnafu { + function: NAME, + datatypes: columns.iter().map(|c| c.data_type()).collect::>(), + } + .fail(); + } + } + + Ok(results.to_vector()) + } +} + +impl Display for ToJsonFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "TO_JSON") + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::scalars::ScalarVector; + use datatypes::vectors::StringVector; + + use super::*; + + #[test] + fn test_get_by_path_function() { + let to_json = ToJsonFunction; + + assert_eq!("to_json", to_json.name()); + assert_eq!( + ConcreteDataType::json_datatype(), + to_json + .return_type(&[ConcreteDataType::json_datatype()]) + .unwrap() + ); + + assert!(matches!(to_json.signature(), + Signature { + type_signature: TypeSignature::Exact(valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![ConcreteDataType::string_datatype()] + )); + + let json_strings = [ + r#"{"a": {"b": 2}, "b": 2, "c": 3}"#, + r#"{"a": 4, "b": {"c": 6}, "c": 6}"#, + r#"{"a": 7, "b": 8, "c": {"a": 7}}"#, + ]; + + let jsonbs = json_strings + .iter() + .map(|s| { + let value = jsonb::parse_value(s.as_bytes()).unwrap(); + value.to_vec() + }) + .collect::>(); + + let json_string_vector = StringVector::from_vec(json_strings.to_vec()); + let args: Vec = vec![Arc::new(json_string_vector)]; + let vector = to_json.eval(FunctionContext::default(), &args).unwrap(); + + assert_eq!(3, vector.len()); + for (i, gt) in jsonbs.iter().enumerate() { + let result = vector.get_ref(i); + let result = result.as_binary().unwrap().unwrap(); + // remove whitespaces + assert_eq!(gt, result); + } + } +} diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index df4131bde6fe..ba13acf3b7e9 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -70,7 +70,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index b10ea682dd47..23eac53a030c 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -25,6 +25,7 @@ common-time.workspace = true datafusion-common.workspace = true enum_dispatch = "0.3" greptime-proto.workspace = true +jsonb.workspace = true num = "0.4" num-traits = "0.2" ordered-float = { version = "3.0", features = ["serde"] } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 262110dbf53b..495c47dc5f12 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -33,8 +33,8 @@ use crate::types::{ BinaryType, BooleanType, DateTimeType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, ListType, - NullType, StringType, TimeMillisecondType, TimeType, TimestampMicrosecondType, + IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, + ListType, NullType, StringType, TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; @@ -81,6 +81,9 @@ pub enum ConcreteDataType { // Compound types: List(ListType), Dictionary(DictionaryType), + + // JSON type: + Json(JsonType), } impl fmt::Display for ConcreteDataType { @@ -128,6 +131,7 @@ impl fmt::Display for ConcreteDataType { ConcreteDataType::Decimal128(v) => write!(f, "{}", v.name()), ConcreteDataType::List(v) => write!(f, "{}", v.name()), ConcreteDataType::Dictionary(v) => write!(f, "{}", v.name()), + ConcreteDataType::Json(v) => write!(f, "{}", v.name()), } } } @@ -162,6 +166,7 @@ impl ConcreteDataType { | ConcreteDataType::Duration(_) | ConcreteDataType::Decimal128(_) | ConcreteDataType::Binary(_) + | ConcreteDataType::Json(_) ) } @@ -216,6 +221,10 @@ impl ConcreteDataType { matches!(self, ConcreteDataType::Decimal128(_)) } + pub fn is_json(&self) -> bool { + matches!(self, ConcreteDataType::Json(_)) + } + pub fn numerics() -> Vec { vec![ ConcreteDataType::int8_datatype(), @@ -404,7 +413,7 @@ macro_rules! impl_new_concrete_type_functions { impl_new_concrete_type_functions!( Null, Boolean, UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64, - Binary, Date, DateTime, String + Binary, Date, DateTime, String, Json ); impl ConcreteDataType { diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 3bb35a595f4a..2ca79ff490f4 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -25,6 +25,7 @@ use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; +use crate::prelude::DataType; pub use crate::schema::column_schema::{ ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY, TIME_INDEX_KEY, @@ -34,6 +35,8 @@ pub use crate::schema::raw::RawSchema; /// Key used to store version number of the schema in metadata. pub const VERSION_KEY: &str = "greptime:version"; +/// Key used to store actual column type in field metadata. +pub const TYPE_KEY: &str = "greptime:type"; /// A common schema, should be immutable. #[derive(Clone, PartialEq, Eq)] @@ -256,7 +259,13 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result { if column_schema.is_time_index() && timestamp_index.is_none() { timestamp_index = Some(index); } - let field = Field::try_from(column_schema)?; + let mut field = Field::try_from(column_schema)?; + + // Json column performs the same as binary column in Arrow, so we need to mark it + if column_schema.data_type.is_json() { + let metadata = HashMap::from([(TYPE_KEY.to_string(), column_schema.data_type.name())]); + field = field.with_metadata(metadata); + } fields.push(field); ensure!( name_to_index diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index c3cd8b345314..861924df3b9c 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -22,6 +22,8 @@ use snafu::{ensure, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error, Result}; use crate::schema::constraint::ColumnDefaultConstraint; +use crate::schema::TYPE_KEY; +use crate::types::JSON_TYPE_NAME; use crate::value::Value; use crate::vectors::VectorRef; @@ -268,7 +270,14 @@ impl TryFrom<&Field> for ColumnSchema { type Error = Error; fn try_from(field: &Field) -> Result { - let data_type = ConcreteDataType::try_from(field.data_type())?; + let mut data_type = ConcreteDataType::try_from(field.data_type())?; + // Override the data type if it is specified in the metadata. + if field.metadata().contains_key(TYPE_KEY) { + data_type = match field.metadata().get(TYPE_KEY).unwrap().as_str() { + JSON_TYPE_NAME => ConcreteDataType::json_datatype(), + _ => data_type, + }; + } let mut metadata = field.metadata().clone(); let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) { Some(json) => { @@ -528,4 +537,32 @@ mod tests { assert_eq!(formatted_int8, "test_column_1 Int8 null"); assert_eq!(formatted_int32, "test_column_2 Int32 not null"); } + + #[test] + fn test_from_field_to_column_schema() { + let field = Field::new("test", ArrowDataType::Int32, true); + let column_schema = ColumnSchema::try_from(&field).unwrap(); + assert_eq!("test", column_schema.name); + assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type); + assert!(column_schema.is_nullable); + assert!(!column_schema.is_time_index); + assert!(column_schema.default_constraint.is_none()); + assert!(column_schema.metadata.is_empty()); + + let field = Field::new("test", ArrowDataType::Binary, true); + let field = field.with_metadata(Metadata::from([( + TYPE_KEY.to_string(), + ConcreteDataType::json_datatype().name(), + )])); + let column_schema = ColumnSchema::try_from(&field).unwrap(); + assert_eq!("test", column_schema.name); + assert_eq!(ConcreteDataType::json_datatype(), column_schema.data_type); + assert!(column_schema.is_nullable); + assert!(!column_schema.is_time_index); + assert!(column_schema.default_constraint.is_none()); + assert_eq!( + column_schema.metadata.get(TYPE_KEY).unwrap(), + &ConcreteDataType::json_datatype().name() + ); + } } diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 29e3065abe58..d7496a54e0a4 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -68,6 +68,8 @@ pub enum LogicalTypeId { List, Dictionary, + + Json, } impl LogicalTypeId { @@ -126,6 +128,7 @@ impl LogicalTypeId { LogicalTypeId::DurationMicrosecond => ConcreteDataType::duration_microsecond_datatype(), LogicalTypeId::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), LogicalTypeId::Decimal128 => ConcreteDataType::decimal128_default_datatype(), + LogicalTypeId::Json => ConcreteDataType::json_datatype(), } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 686fd9c49f10..0bedd2965c39 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -21,6 +21,7 @@ mod decimal_type; mod dictionary_type; mod duration_type; mod interval_type; +mod json_type; mod list_type; mod null_type; mod primitive_type; @@ -42,6 +43,7 @@ pub use duration_type::{ pub use interval_type::{ IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, }; +pub use json_type::{JsonType, JSON_TYPE_NAME}; pub use list_type::ListType; pub use null_type::NullType; pub use primitive_type::{ diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs new file mode 100644 index 000000000000..416b59b5c0ef --- /dev/null +++ b/src/datatypes/src/types/json_type.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::datatypes::DataType as ArrowDataType; +use common_base::bytes::Bytes; +use serde::{Deserialize, Serialize}; + +use crate::data_type::{DataType, DataTypeRef}; +use crate::scalars::ScalarVectorBuilder; +use crate::type_id::LogicalTypeId; +use crate::value::Value; +use crate::vectors::{BinaryVectorBuilder, MutableVector}; + +pub const JSON_TYPE_NAME: &str = "Json"; + +/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format. +/// It utilizes current binary value and vector implementation. +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct JsonType; + +impl JsonType { + pub fn arc() -> DataTypeRef { + Arc::new(Self) + } +} + +impl DataType for JsonType { + fn name(&self) -> String { + JSON_TYPE_NAME.to_string() + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Json + } + + fn default_value(&self) -> Value { + Bytes::default().into() + } + + fn as_arrow_type(&self) -> ArrowDataType { + ArrowDataType::Binary + } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(BinaryVectorBuilder::with_capacity(capacity)) + } + + fn try_cast(&self, from: Value) -> Option { + match from { + Value::Binary(v) => Some(Value::Binary(v)), + _ => None, + } + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 6c49154e4058..a8e59da51355 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -342,7 +342,8 @@ impl Value { let value_type_id = self.logical_type_id(); let output_type_id = output_type.logical_type_id(); ensure!( - output_type_id == value_type_id || self.is_null(), + // Json type leverage Value(Binary) for storage. + output_type_id == value_type_id || self.is_null() || (output_type_id == LogicalTypeId::Json && value_type_id == LogicalTypeId::Binary), error::ToScalarValueSnafu { reason: format!( "expect value to return output_type {output_type_id:?}, actual: {value_type_id:?}", @@ -484,7 +485,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result ScalarValue::UInt64(None), ConcreteDataType::Float32(_) => ScalarValue::Float32(None), ConcreteDataType::Float64(_) => ScalarValue::Float64(None), - ConcreteDataType::Binary(_) => ScalarValue::Binary(None), + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ScalarValue::Binary(None), ConcreteDataType::String(_) => ScalarValue::Utf8(None), ConcreteDataType::Date(_) => ScalarValue::Date32(None), ConcreteDataType::DateTime(_) => ScalarValue::Date64(None), @@ -1994,6 +1995,10 @@ mod tests { &ConcreteDataType::duration_nanosecond_datatype(), &Value::Duration(Duration::new_nanosecond(1)), ); + check_type_and_value( + &ConcreteDataType::decimal128_datatype(38, 10), + &Value::Decimal128(Decimal128::new(1, 38, 10)), + ); } #[test] @@ -2178,6 +2183,14 @@ mod tests { ValueRef::List(ListValueRef::Ref { val: &list }), Value::List(list.clone()).as_value_ref() ); + + let jsonb_value = jsonb::parse_value(r#"{"key": "value"}"#.as_bytes()) + .unwrap() + .to_vec(); + assert_eq!( + ValueRef::Binary(jsonb_value.clone().as_slice()), + Value::Binary(jsonb_value.into()).as_value_ref() + ); } #[test] @@ -2391,6 +2404,16 @@ mod tests { .try_to_scalar_value(&ConcreteDataType::binary_datatype()) .unwrap() ); + + let jsonb_value = jsonb::parse_value(r#"{"key": "value"}"#.as_bytes()) + .unwrap() + .to_vec(); + assert_eq!( + ScalarValue::Binary(Some(jsonb_value.clone())), + Value::Binary(jsonb_value.into()) + .try_to_scalar_value(&ConcreteDataType::json_datatype()) + .unwrap() + ); } #[test] @@ -2523,6 +2546,12 @@ mod tests { .try_to_scalar_value(&ConcreteDataType::duration_nanosecond_datatype()) .unwrap() ); + assert_eq!( + ScalarValue::Binary(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::json_datatype()) + .unwrap() + ); } #[test] diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index fcf97515ee27..16b0adf6f683 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -80,7 +80,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { match lhs.data_type() { Null(_) => true, Boolean(_) => is_vector_eq!(BooleanVector, lhs, rhs), - Binary(_) => is_vector_eq!(BinaryVector, lhs, rhs), + Binary(_) | Json(_) => is_vector_eq!(BinaryVector, lhs, rhs), String(_) => is_vector_eq!(StringVector, lhs, rhs), Date(_) => is_vector_eq!(DateVector, lhs, rhs), DateTime(_) => is_vector_eq!(DateTimeVector, lhs, rhs), diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index ae0ab1177dae..a7f6f1644e64 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -68,7 +68,7 @@ impl SortField { ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, ConcreteDataType::Float32(_) => 5, ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) => 11, + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => 11, ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes. ConcreteDataType::Date(_) => 5, ConcreteDataType::DateTime(_) => 9, @@ -146,7 +146,8 @@ impl SortField { Time, time, Interval, interval, Duration, duration, - Decimal128, decimal128 + Decimal128, decimal128, + Json, binary ); Ok(()) @@ -169,7 +170,7 @@ impl SortField { Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?)) } )* - ConcreteDataType::Binary(_) => Ok(Value::from( + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => Ok(Value::from( Option::>::deserialize(deserializer) .context(error::DeserializeFieldSnafu)? .map(Bytes::from), @@ -237,7 +238,7 @@ impl SortField { ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, ConcreteDataType::Float32(_) => 5, ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) => { + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => { // Now the encoder encode binary as a list of bytes so we can't use // skip bytes. let pos_before = deserializer.position(); diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index c8a4ebcc77f3..a3fb8004cfab 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -156,20 +156,22 @@ impl MergeScanExec { query_ctx: QueryContextRef, target_partition: usize, ) -> Result { - let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); + // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to + // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs. + // Reconsider if it's possible to remove it. + let arrow_schema = Arc::new(arrow_schema.clone()); let properties = PlanProperties::new( - EquivalenceProperties::new(arrow_schema_without_metadata.clone()), + EquivalenceProperties::new(arrow_schema.clone()), Partitioning::UnknownPartitioning(target_partition), ExecutionMode::Bounded, ); - let schema_without_metadata = - Self::arrow_schema_to_schema(arrow_schema_without_metadata.clone())?; + let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?; Ok(Self { table, regions, plan, - schema: schema_without_metadata, - arrow_schema: arrow_schema_without_metadata, + schema, + arrow_schema, region_query_handler, metric: ExecutionPlanMetricsSet::new(), sub_stage_metrics: Arc::default(), @@ -288,20 +290,6 @@ impl MergeScanExec { })) } - fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef { - Arc::new(ArrowSchema::new( - arrow_schema - .fields() - .iter() - .map(|field| { - let field = field.as_ref().clone(); - let field_without_metadata = field.with_metadata(Default::default()); - Arc::new(field_without_metadata) - }) - .collect::>(), - )) - } - fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result { let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?; Ok(Arc::new(schema)) diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 5abd1466bb76..626fdaa404c2 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -58,6 +58,7 @@ humantime-serde.workspace = true hyper = { version = "0.14", features = ["full"] } influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } itertools.workspace = true +jsonb.workspace = true lazy_static.workspace = true mime_guess = "2.0" notify.workspace = true @@ -70,7 +71,7 @@ parking_lot = "0.12" pgwire = "0.20" pin-project = "1.0" pipeline.workspace = true -postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } +postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] } pprof = { version = "0.13", features = [ "flamegraph", "prost-codec", diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index bf4d967aa5a6..d957edaa559c 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -168,6 +168,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { &mut row_writer, &record_batch, query_context.clone(), + &column_def, ) .await? } @@ -191,9 +192,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { row_writer: &mut RowWriter<'_, W>, recordbatch: &RecordBatch, query_context: QueryContextRef, + column_def: &[Column], ) -> Result<()> { for row in recordbatch.rows() { - for value in row.into_iter() { + for (value, column) in row.into_iter().zip(column_def.iter()) { match value { Value::Null => row_writer.write_col(None::)?, Value::Boolean(v) => row_writer.write_col(v as i8)?, @@ -208,7 +210,14 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Float32(v) => row_writer.write_col(v.0)?, Value::Float64(v) => row_writer.write_col(v.0)?, Value::String(v) => row_writer.write_col(v.as_utf8())?, - Value::Binary(v) => row_writer.write_col(v.deref())?, + Value::Binary(v) => match column.coltype { + ColumnType::MYSQL_TYPE_JSON => { + row_writer.write_col(jsonb::to_string(&v))?; + } + _ => { + row_writer.write_col(v.deref())?; + } + }, Value::Date(v) => row_writer.write_col(v.to_chrono_date())?, // convert datetime and timestamp to timezone of current connection Value::DateTime(v) => row_writer.write_col( @@ -281,6 +290,7 @@ pub(crate) fn create_mysql_column( ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR), ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME), ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL), + ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON), _ => error::UnsupportedDataTypeSnafu { data_type, reason: "not implemented", diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 5d0c041cf2d7..190684ed34fc 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -150,8 +150,8 @@ where .map(move |row| { row.and_then(|row| { let mut encoder = DataRowEncoder::new(pg_schema_ref.clone()); - for value in row.iter() { - encode_value(&query_ctx, value, &mut encoder)?; + for (value, column) in row.iter().zip(schema.column_schemas()) { + encode_value(&query_ctx, value, &mut encoder, &column.data_type)?; } encoder.finish() }) diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 6d47f65183a5..2bec6c2999f5 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -62,6 +62,7 @@ pub(super) fn encode_value( query_ctx: &QueryContextRef, value: &Value, builder: &mut DataRowEncoder, + datatype: &ConcreteDataType, ) -> PgWireResult<()> { match value { Value::Null => builder.encode_field(&None::<&i8>), @@ -77,13 +78,18 @@ pub(super) fn encode_value( Value::Float32(v) => builder.encode_field(&v.0), Value::Float64(v) => builder.encode_field(&v.0), Value::String(v) => builder.encode_field(&v.as_utf8()), - Value::Binary(v) => { - let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output(); - match *bytea_output { - PGByteaOutputValue::ESCAPE => builder.encode_field(&EscapeOutputBytea(v.deref())), - PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())), + Value::Binary(v) => match datatype { + ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)), + _ => { + let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output(); + match *bytea_output { + PGByteaOutputValue::ESCAPE => { + builder.encode_field(&EscapeOutputBytea(v.deref())) + } + PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())), + } } - } + }, Value::Date(v) => { if let Some(date) = v.to_chrono_date() { let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style(); @@ -154,6 +160,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Time(_) => Ok(Type::TIME), &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL), &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC), + &ConcreteDataType::Json(_) => Ok(Type::JSON), &ConcreteDataType::Duration(_) | &ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu { @@ -549,6 +556,23 @@ pub(super) fn parameters_to_scalar_values( } } } + &Type::JSONB => { + let data = portal.parameter::(idx, &client_type)?; + match server_type { + ConcreteDataType::Binary(_) => { + ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec())) + } + _ => { + return Err(invalid_parameter_error( + "invalid_parameter_type", + Some(&format!( + "Expected: {}, found: {}", + server_type, client_type + )), + )); + } + } + } _ => Err(invalid_parameter_error( "unsupported_parameter_value", Some(&format!("Found type: {}", client_type)), @@ -581,6 +605,8 @@ pub(super) fn param_types_to_pg_types( mod test { use std::sync::Arc; + use common_time::interval::IntervalUnit; + use common_time::timestamp::TimeUnit; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::value::ListValue; use pgwire::api::results::{FieldFormat, FieldInfo}; @@ -778,6 +804,35 @@ mod test { ), ]; + let datatypes = vec![ + ConcreteDataType::null_datatype(), + ConcreteDataType::boolean_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::uint64_datatype(), + ConcreteDataType::int8_datatype(), + ConcreteDataType::int8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ConcreteDataType::float64_datatype(), + ConcreteDataType::float64_datatype(), + ConcreteDataType::string_datatype(), + ConcreteDataType::binary_datatype(), + ConcreteDataType::date_datatype(), + ConcreteDataType::time_datatype(TimeUnit::Second), + ConcreteDataType::datetime_datatype(), + ConcreteDataType::timestamp_datatype(TimeUnit::Second), + ConcreteDataType::interval_datatype(IntervalUnit::YearMonth), + ]; let values = vec![ Value::Null, Value::Boolean(true), @@ -812,14 +867,15 @@ mod test { .build() .into(); let mut builder = DataRowEncoder::new(Arc::new(schema)); - for i in values.iter() { - encode_value(&query_context, i, &mut builder).unwrap(); + for (value, datatype) in values.iter().zip(datatypes) { + encode_value(&query_context, value, &mut builder, &datatype).unwrap(); } let err = encode_value( &query_context, &Value::List(ListValue::new(vec![], ConcreteDataType::int16_datatype())), &mut builder, + &ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype()), ) .unwrap_err(); match err { diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index a9ed77e8ea89..e459c1d01559 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -26,6 +26,7 @@ datatypes.workspace = true hex = "0.4" iso8601 = "0.6.1" itertools.workspace = true +jsonb.workspace = true lazy_static.workspace = true regex.workspace = true serde_json.workspace = true diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index a042473503b8..30af7ae5171f 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -124,6 +124,16 @@ fn parse_string_to_value( } } ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())), + ConcreteDataType::Json(_) => { + if let Ok(json) = jsonb::parse_value(s.as_bytes()) { + Ok(Value::Binary(json.to_vec().into())) + } else { + ParseSqlValueSnafu { + msg: format!("Failed to parse {s} to Json value"), + } + .fail() + } + } _ => { unreachable!() } @@ -250,7 +260,19 @@ pub fn sql_value_to_value( SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => { parse_string_to_value(column_name, s.clone(), data_type, timezone)? } - SqlValue::HexStringLiteral(s) => parse_hex_string(s)?, + SqlValue::HexStringLiteral(s) => { + // Should not directly write binary into json column + ensure!( + !matches!(data_type, ConcreteDataType::Json(_)), + ColumnTypeMismatchSnafu { + column_name, + expect: ConcreteDataType::binary_datatype(), + actual: ConcreteDataType::json_datatype(), + } + ); + + parse_hex_string(s)? + } SqlValue::Placeholder(s) => return InvalidSqlValueSnafu { value: s }.fail(), // TODO(dennis): supports binary string @@ -571,6 +593,7 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::json_datatype()), _ => error::SqlTypeNotSupportedSnafu { t: data_type.clone(), } @@ -607,6 +630,7 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu ConcreteDataType::Decimal128(d) => Ok(SqlDataType::Decimal( ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as u64), )), + ConcreteDataType::Json(_) => Ok(SqlDataType::JSON), ConcreteDataType::Duration(_) | ConcreteDataType::Null(_) | ConcreteDataType::List(_) @@ -872,6 +896,35 @@ mod tests { ); assert!(v.is_err()); assert!(format!("{v:?}").contains("invalid character"), "v is {v:?}",); + + let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string()); + let v = sql_value_to_value( + "a", + &ConcreteDataType::json_datatype(), + &sql_val, + None, + None, + ); + assert!(v.is_err()); + + let sql_val = SqlValue::DoubleQuotedString(r#"{"a":"b"}"#.to_string()); + let v = sql_value_to_value( + "a", + &ConcreteDataType::json_datatype(), + &sql_val, + None, + None, + ) + .unwrap(); + assert_eq!( + Value::Binary(Bytes::from( + jsonb::parse_value(r#"{"a":"b"}"#.as_bytes()) + .unwrap() + .to_vec() + .as_slice() + )), + v + ); } #[test] @@ -1037,6 +1090,36 @@ mod tests { } } + #[test] + fn test_parse_json_to_jsonb() { + match parse_string_to_value( + "json_col", + r#"{"a": "b"}"#.to_string(), + &ConcreteDataType::json_datatype(), + None, + ) { + Ok(Value::Binary(b)) => { + assert_eq!( + b, + jsonb::parse_value(r#"{"a": "b"}"#.as_bytes()) + .unwrap() + .to_vec() + ); + } + _ => { + unreachable!() + } + } + + assert!(parse_string_to_value( + "json_col", + r#"Nicola Kovac is the best rifler in the world"#.to_string(), + &ConcreteDataType::json_datatype(), + None, + ) + .is_err()) + } + #[test] pub fn test_parse_column_default_constraint() { let bool_value = sqlparser::ast::Value::Boolean(true); diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index a41968496b05..1e87c54e5f31 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -383,26 +383,42 @@ pub async fn test_postgres_crud(store_type: StorageType) { .await .unwrap(); - sqlx::query("create table demo(i bigint, ts timestamp time index, d date, dt datetime)") - .execute(&pool) - .await - .unwrap(); + sqlx::query( + "create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob, j json)", + ) + .execute(&pool) + .await + .unwrap(); for i in 0..10 { let d = NaiveDate::from_yo_opt(2015, 100).unwrap(); let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis(); + let bytes = "hello".as_bytes(); + let json = serde_json::json!({ + "code": 200, + "success": true, + "payload": { + "features": [ + "serde", + "json" + ], + "homepage": null + } + }); - sqlx::query("insert into demo values($1, $2, $3, $4)") + sqlx::query("insert into demo values($1, $2, $3, $4, $5, $6)") .bind(i) .bind(i) .bind(d) .bind(dt) + .bind(bytes) + .bind(json) .execute(&pool) .await .unwrap(); } - let rows = sqlx::query("select i,d,dt from demo") + let rows = sqlx::query("select i,d,dt,b,j from demo") .fetch_all(&pool) .await .unwrap(); @@ -412,6 +428,8 @@ pub async fn test_postgres_crud(store_type: StorageType) { let ret: i64 = row.get("i"); let d: NaiveDate = row.get("d"); let dt: NaiveDateTime = row.get("dt"); + let bytes: Vec = row.get("b"); + let json: serde_json::Value = row.get("j"); assert_eq!(ret, i as i64); @@ -422,6 +440,20 @@ pub async fn test_postgres_crud(store_type: StorageType) { .and_then(|d| d.and_hms_opt(0, 0, 0)) .unwrap(); assert_eq!(expected_dt, dt); + assert_eq!("hello".as_bytes(), bytes); + + let expected_j = serde_json::json!({ + "code": 200, + "success": true, + "payload": { + "features": [ + "serde", + "json" + ], + "homepage": null + } + }); + assert_eq!(json.to_string(), expected_j.to_string()); } let rows = sqlx::query("select i from demo where i=$1") diff --git a/tests/cases/standalone/common/types/json/json.result b/tests/cases/standalone/common/types/json/json.result new file mode 100644 index 000000000000..710d0b230ac5 --- /dev/null +++ b/tests/cases/standalone/common/types/json/json.result @@ -0,0 +1,156 @@ +CREATE TABLE jsons (j JSON, t timestamp time index); + +Affected Rows: 0 + +--Insert valid json strings-- +INSERT INTO jsons VALUES('[null]', 0), +('[true]', 1), +('[false]', 2), +('[0]', 3), +('["foo"]', 4), +('[]', 5), +('{}', 6), +('[0,1]', 7), +('{"foo":"bar"}', 8), +('{"a":null,"foo":"bar"}', 9), +('[-1]', 10), +('{"entities": { + "description": { + "urls": [ + { + "url": "http://t.co/QMLJeFmfMT", + "expanded_url": "http://www.pixiv.net/member.php?id=4776", + "display_url": "pixiv.net/member.php?id=…", + "indices": [ + 58, + 80 + ] + }, + { + "url": "http://t.co/LU8T7vmU3h", + "expanded_url": "http://ask.fm/KATANA77", + "display_url": "ask.fm/KATANA77", + "indices": [ + 95, + 117 + ] + } + ] + } +}}', 11); + +Affected Rows: 12 + +INSERT INTO jsons VALUES(to_json('[null]'), 12), +(to_json('[true]'), 13), +(to_json('[false]'), 14), +(to_json('[0]'), 15), +(to_json('["foo"]'), 16), +(to_json('[]'), 17), +(to_json('{}'), 18), +(to_json('[0,1]'), 19), +(to_json('{"foo":"bar"}'), 20), +(to_json('{"a":null,"foo":"bar"}'), 21), +(to_json('[-1]'), 22), +(to_json('[-2147483648]'), 23), +(to_json('{"entities": { + "description": { + "urls": [ + { + "url": "http://t.co/QMLJeFmfMT", + "expanded_url": "http://www.pixiv.net/member.php?id=4776", + "display_url": "pixiv.net/member.php?id=…", + "indices": [ + 58, + 80 + ] + }, + { + "url": "http://t.co/LU8T7vmU3h", + "expanded_url": "http://ask.fm/KATANA77", + "display_url": "ask.fm/KATANA77", + "indices": [ + 95, + 117 + ] + } + ] + } + }}'), 24); + +Affected Rows: 13 + +SELECT json_to_string(j), t FROM jsons; + ++---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+ +| json_to_string(jsons.j) | t | ++---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+ +| [null] | 1970-01-01T00:00:00 | +| [true] | 1970-01-01T00:00:00.001 | +| [false] | 1970-01-01T00:00:00.002 | +| [0] | 1970-01-01T00:00:00.003 | +| ["foo"] | 1970-01-01T00:00:00.004 | +| [] | 1970-01-01T00:00:00.005 | +| {} | 1970-01-01T00:00:00.006 | +| [0,1] | 1970-01-01T00:00:00.007 | +| {"foo":"bar"} | 1970-01-01T00:00:00.008 | +| {"a":null,"foo":"bar"} | 1970-01-01T00:00:00.009 | +| [-1] | 1970-01-01T00:00:00.010 | +| {"entities":{"description":{"urls":[{"display_url":"pixiv.net/member.php?id=…","expanded_url":"http://www.pixiv.net/member.php?id=4776","indices":[58,80],"url":"http://t.co/QMLJeFmfMT"},{"display_url":"ask.fm/KATANA77","expanded_url":"http://ask.fm/KATANA77","indices":[95,117],"url":"http://t.co/LU8T7vmU3h"}]}}} | 1970-01-01T00:00:00.011 | +| [null] | 1970-01-01T00:00:00.012 | +| [true] | 1970-01-01T00:00:00.013 | +| [false] | 1970-01-01T00:00:00.014 | +| [0] | 1970-01-01T00:00:00.015 | +| ["foo"] | 1970-01-01T00:00:00.016 | +| [] | 1970-01-01T00:00:00.017 | +| {} | 1970-01-01T00:00:00.018 | +| [0,1] | 1970-01-01T00:00:00.019 | +| {"foo":"bar"} | 1970-01-01T00:00:00.020 | +| {"a":null,"foo":"bar"} | 1970-01-01T00:00:00.021 | +| [-1] | 1970-01-01T00:00:00.022 | +| [-2147483648] | 1970-01-01T00:00:00.023 | +| {"entities":{"description":{"urls":[{"display_url":"pixiv.net/member.php?id=…","expanded_url":"http://www.pixiv.net/member.php?id=4776","indices":[58,80],"url":"http://t.co/QMLJeFmfMT"},{"display_url":"ask.fm/KATANA77","expanded_url":"http://ask.fm/KATANA77","indices":[95,117],"url":"http://t.co/LU8T7vmU3h"}]}}} | 1970-01-01T00:00:00.024 | ++---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+ + +--Insert invalid json strings-- +DELETE FROM jsons; + +Affected Rows: 25 + +INSERT INTO jsons VALUES(to_json('{"a":1, "b":2, "c":3'), 4); + +Error: 3001(EngineExecuteQuery), DataFusion error: Invalid function args: Cannot convert the string to json, have: {"a":1, "b":2, "c":3 + +INSERT INTO jsons VALUES(to_json('Morning my friends, have a nice day :)'), 5); + +Error: 3001(EngineExecuteQuery), DataFusion error: Invalid function args: Cannot convert the string to json, have: Morning my friends, have a nice day :) + +SELECT json_to_string(j), t FROM jsons; + +++ +++ + +CREATE TABLE json_empty (j JSON, t timestamp time index); + +Affected Rows: 0 + +INSERT INTO json_empty VALUES(NULL, 2); + +Affected Rows: 1 + +SELECT json_to_string(j), t FROM json_empty; + ++------------------------------+-------------------------+ +| json_to_string(json_empty.j) | t | ++------------------------------+-------------------------+ +| | 1970-01-01T00:00:00.002 | ++------------------------------+-------------------------+ + +drop table jsons; + +Affected Rows: 0 + +drop table json_empty; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/types/json/json.sql b/tests/cases/standalone/common/types/json/json.sql new file mode 100644 index 000000000000..57fce9a8eabe --- /dev/null +++ b/tests/cases/standalone/common/types/json/json.sql @@ -0,0 +1,96 @@ +CREATE TABLE jsons (j JSON, t timestamp time index); + +--Insert valid json strings-- +INSERT INTO jsons VALUES('[null]', 0), +('[true]', 1), +('[false]', 2), +('[0]', 3), +('["foo"]', 4), +('[]', 5), +('{}', 6), +('[0,1]', 7), +('{"foo":"bar"}', 8), +('{"a":null,"foo":"bar"}', 9), +('[-1]', 10), +('{"entities": { + "description": { + "urls": [ + { + "url": "http://t.co/QMLJeFmfMT", + "expanded_url": "http://www.pixiv.net/member.php?id=4776", + "display_url": "pixiv.net/member.php?id=…", + "indices": [ + 58, + 80 + ] + }, + { + "url": "http://t.co/LU8T7vmU3h", + "expanded_url": "http://ask.fm/KATANA77", + "display_url": "ask.fm/KATANA77", + "indices": [ + 95, + 117 + ] + } + ] + } +}}', 11); + +INSERT INTO jsons VALUES(to_json('[null]'), 12), +(to_json('[true]'), 13), +(to_json('[false]'), 14), +(to_json('[0]'), 15), +(to_json('["foo"]'), 16), +(to_json('[]'), 17), +(to_json('{}'), 18), +(to_json('[0,1]'), 19), +(to_json('{"foo":"bar"}'), 20), +(to_json('{"a":null,"foo":"bar"}'), 21), +(to_json('[-1]'), 22), +(to_json('[-2147483648]'), 23), +(to_json('{"entities": { + "description": { + "urls": [ + { + "url": "http://t.co/QMLJeFmfMT", + "expanded_url": "http://www.pixiv.net/member.php?id=4776", + "display_url": "pixiv.net/member.php?id=…", + "indices": [ + 58, + 80 + ] + }, + { + "url": "http://t.co/LU8T7vmU3h", + "expanded_url": "http://ask.fm/KATANA77", + "display_url": "ask.fm/KATANA77", + "indices": [ + 95, + 117 + ] + } + ] + } + }}'), 24); + +SELECT json_to_string(j), t FROM jsons; + +--Insert invalid json strings-- +DELETE FROM jsons; + +INSERT INTO jsons VALUES(to_json('{"a":1, "b":2, "c":3'), 4); + +INSERT INTO jsons VALUES(to_json('Morning my friends, have a nice day :)'), 5); + +SELECT json_to_string(j), t FROM jsons; + +CREATE TABLE json_empty (j JSON, t timestamp time index); + +INSERT INTO json_empty VALUES(NULL, 2); + +SELECT json_to_string(j), t FROM json_empty; + +drop table jsons; + +drop table json_empty;