Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change data type to float64 #1148

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1940b9f
change for static schema
nikhilsinhaparseable Jan 30, 2025
3a5474a
remove println
nikhilsinhaparseable Jan 30, 2025
d7ffaaf
changed compression to SNAPPY
nikhilsinhaparseable Feb 3, 2025
ca52993
converted to Int64
nikhilsinhaparseable Feb 3, 2025
d7c85a9
reverted compression to lz4raw
nikhilsinhaparseable Feb 3, 2025
c10d4fb
url encode
nikhilsinhaparseable Feb 3, 2025
4087877
change statistics to page
nikhilsinhaparseable Feb 3, 2025
799959b
page size to 20mb, add data type for Date
nikhilsinhaparseable Feb 4, 2025
dd0ef2b
reverted Utf8View change
nikhilsinhaparseable Feb 4, 2025
568ff0a
removed some options from query config
nikhilsinhaparseable Feb 4, 2025
0207338
Utf8View changes
nikhilsinhaparseable Feb 4, 2025
e471746
register parquets in tables
nikhilsinhaparseable Feb 6, 2025
737236f
added configs to improve perf
nikhilsinhaparseable Feb 6, 2025
87c1c51
config updated
nikhilsinhaparseable Feb 6, 2025
8c30e30
add runtime config and logical plan
nikhilsinhaparseable Feb 12, 2025
ba5f4ad
remove print
nikhilsinhaparseable Feb 12, 2025
ba7112f
create external table
nikhilsinhaparseable Feb 13, 2025
b8ee321
execute plan
nikhilsinhaparseable Feb 13, 2025
3565bac
simplify
nikhilsinhaparseable Feb 14, 2025
84ae3e8
added coalesceBatchesExec, FilterExec, RepartitionExec
nikhilsinhaparseable Feb 14, 2025
e7593a9
CoalesceBatchesExec and RepartitionExec
nikhilsinhaparseable Feb 16, 2025
8a6e886
optimised
nikhilsinhaparseable Feb 17, 2025
b8d257e
optimised
nikhilsinhaparseable Feb 19, 2025
b59b470
datafusion-cli style
nikhilsinhaparseable Feb 26, 2025
f70a0d1
deleted unused
nikhilsinhaparseable Feb 26, 2025
b06149b
working
nikhilsinhaparseable Feb 26, 2025
74676b9
register parquet
nikhilsinhaparseable Feb 26, 2025
f77d139
object store registered with session context
nikhilsinhaparseable Feb 27, 2025
1f5897a
removed unused
nikhilsinhaparseable Feb 27, 2025
92e42ba
removed unused
nikhilsinhaparseable Feb 27, 2025
4d2b7ec
multi thread in parseable query
nikhilsinhaparseable Mar 2, 2025
d537307
drop cache
nikhilsinhaparseable Mar 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,771 changes: 1,802 additions & 969 deletions Cargo.lock

Large diffs are not rendered by default.

45 changes: 35 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,31 @@ build = "build.rs"

[dependencies]
# Arrow and DataFusion ecosystem
arrow-array = { version = "53.0.0" }
arrow-flight = { version = "53.0.0", features = ["tls"] }
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-json = "53.0.0"
arrow-schema = { version = "53.0.0", features = ["serde"] }
arrow-select = "53.0.0"
datafusion = "44.0.0"
object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] }
parquet = "53.0.0"
arrow = { version = "54.1.0", features = [
"prettyprint",
"chrono-tz",
] }
arrow-array = { version = "54.1.0" }
arrow-json = "54.1.0"
arrow-select = "54.1.0"
arrow-buffer = { version = "54.1.0", default-features = false }
arrow-flight = { version = "54.1.0", features = [
"flight-sql-experimental",
"tls"
] }
arrow-ipc = { version = "54.1.0", default-features = false, features = [
"zstd",
] }
arrow-ord = { version = "54.1.0", default-features = false }
arrow-schema = { version = "54.1.0", features = ["serde"] }
datafusion = {git = "https://github.com/apache/datafusion", branch = "main" }
object_store = { version = "0.11.2", features = ["cloud", "aws", "azure", "gcp", "http"] }
parking_lot = "0.12"
parquet = { version = "54.1.0", default-features = false, features = [
"arrow",
"async",
"object_store",
] }

# Web server and HTTP-related
actix-cors = "0.7.0"
Expand Down Expand Up @@ -56,6 +72,7 @@ tokio = { version = "1.28", default-features = false, features = [
"sync",
"macros",
"fs",
"rt-multi-thread"
] }
tokio-stream = { version = "0.1", features = ["fs"] }

Expand Down Expand Up @@ -116,6 +133,14 @@ static-files = "0.2"
thiserror = "2.0.0"
ulid = { version = "1.0", features = ["serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
structopt-derive = "0.4.18"
dirs = "6.0.0"
aws-config = "1.5.16"
aws-credential-types = "1.2.1"
sqlparser = "0.54.0"
nix = {version = "0.29.0", features = ["fs", "mman"]}
libc = "0.2.169"
rustyline = "15.0.0"

[build-dependencies]
cargo_toml = "0.20.1"
Expand All @@ -128,7 +153,7 @@ zip = { version = "2.2.0", default-features = false, features = ["deflate"] }

[dev-dependencies]
rstest = "0.23.0"
arrow = "53.0.0"


[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.18/build.zip"
Expand Down
16 changes: 8 additions & 8 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Int64Type {
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Utf8Type {
pub struct Utf8ViewType {
pub min: String,
pub max: String,
}
Expand All @@ -54,7 +54,7 @@ pub enum TypedStatistics {
Bool(BoolType),
Int(Int64Type),
Float(Float64Type),
String(Utf8Type),
String(Utf8ViewType),
}

impl TypedStatistics {
Expand All @@ -79,7 +79,7 @@ impl TypedStatistics {
})
}
(TypedStatistics::String(this), TypedStatistics::String(other)) => {
TypedStatistics::String(Utf8Type {
TypedStatistics::String(Utf8ViewType {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
Expand Down Expand Up @@ -110,9 +110,9 @@ impl TypedStatistics {
ScalarValue::Float64(Some(stats.min)),
ScalarValue::Float64(Some(stats.max)),
),
(TypedStatistics::String(stats), DataType::Utf8) => (
ScalarValue::Utf8(Some(stats.min)),
ScalarValue::Utf8(Some(stats.max)),
(TypedStatistics::String(stats), DataType::Utf8View) => (
ScalarValue::Utf8View(Some(stats.min)),
ScalarValue::Utf8View(Some(stats.max)),
),
_ => {
return None;
Expand Down Expand Up @@ -167,7 +167,7 @@ impl TryFrom<&Statistics> for TypedStatistics {
min: *stats.min_opt().expect("Float64 stats min not set"),
max: *stats.max_opt().expect("Float64 stats max not set"),
}),
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8ViewType {
min: stats
.min_opt()
.expect("Utf8 stats min not set")
Expand All @@ -179,7 +179,7 @@ impl TryFrom<&Statistics> for TypedStatistics {
.as_utf8()?
.to_owned(),
}),
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8ViewType {
min: stats
.min_opt()
.expect("Utf8 stats min not set")
Expand Down
76 changes: 68 additions & 8 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#![allow(deprecated)]

use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_array::{RecordBatch, StringArray, StringViewArray};
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
use arrow_schema::{DataType, Field, Fields, Schema};
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
Expand Down Expand Up @@ -107,15 +107,61 @@ impl EventFormat for Event {

// Convert the Data type (defined above) to arrow record batch
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, anyhow::Error> {
// First create a schema with Utf8 instead of Utf8View
let temp_schema = Schema::new(
schema
.fields()
.iter()
.map(|field| {
if matches!(field.data_type(), DataType::Utf8View) {
Arc::new(Field::new(
field.name(),
DataType::Utf8,
field.is_nullable(),
))
} else {
field.clone()
}
})
.collect::<Vec<_>>(),
);

let array_capacity = round_upto_multiple_of_64(data.len());
let mut reader = ReaderBuilder::new(schema)
let mut reader = ReaderBuilder::new(Arc::new(temp_schema))
.with_batch_size(array_capacity)
.with_coerce_primitive(false)
.with_strict_mode(false)
.build_decoder()?;

reader.serialize(&data)?;

match reader.flush() {
Ok(Some(recordbatch)) => Ok(recordbatch),
Ok(Some(temp_batch)) => {
// Convert Utf8 arrays to Utf8View arrays where needed
let new_columns: Vec<Arc<dyn arrow_array::Array>> = temp_batch
.columns()
.iter()
.zip(schema.fields())
.map(|(col, field)| {
if matches!(field.data_type(), DataType::Utf8View) {
let string_array = col
.as_any()
.downcast_ref::<StringArray>()
.expect("Expected StringArray");
Arc::new(StringViewArray::from(
string_array
.iter()
.map(|s| s.map(|s| s.to_string()))
.collect::<Vec<_>>(),
))
} else {
col.clone()
}
})
.collect();

Ok(RecordBatch::try_new(schema, new_columns)?)
}
Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)),
Ok(None) => unreachable!("all records are added to one rb"),
}
Expand Down Expand Up @@ -174,13 +220,12 @@ fn fields_mismatch(schema: &[Arc<Field>], body: &Value, schema_version: SchemaVe
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
match data_type {
DataType::Boolean => value.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
DataType::Int8 | DataType::Int16 | DataType::Int32 => value.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
DataType::Float16 | DataType::Float32 => value.is_f64(),
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
// All numbers can be cast as Float64 from schema version v1
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
DataType::Utf8 => value.is_string(),
DataType::Int64 => value.is_i64() || is_parsable_as_number(value),
DataType::Utf8View => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
if let Value::Array(arr) = value {
Expand Down Expand Up @@ -218,10 +263,25 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
false
}
}
DataType::Date32 => value.is_string() && is_parsable_as_date(value),
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
_ => {
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
unreachable!()
}
}
}

pub fn is_parsable_as_number(value: &Value) -> bool {
let Value::String(s) = value else {
return false;
};
s.parse::<i64>().is_ok()
}

pub fn is_parsable_as_date(value: &Value) -> bool {
let Value::String(s) = value else {
return false;
};
chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok()
}
47 changes: 39 additions & 8 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
use anyhow::{anyhow, Error as AnyError};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use chrono::{DateTime, NaiveDate};
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -242,8 +242,12 @@ pub fn update_field_type_in_schema(

if let Some(log_records) = log_records {
for log_record in log_records {
updated_schema =
override_data_type(updated_schema.clone(), log_record.clone(), schema_version);
updated_schema = override_data_type(
updated_schema.clone(),
log_record.clone(),
schema_version,
existing_schema,
);
}
}

Expand All @@ -258,7 +262,7 @@ pub fn update_field_type_in_schema(
// time_partition field not present in existing schema with string type data as timestamp
if field.name() == time_partition
&& !existing_field_names.contains(field.name())
&& field.data_type() == &DataType::Utf8
&& field.data_type() == &DataType::Utf8View
{
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
Field::new(field.name(), new_data_type, true)
Expand All @@ -276,6 +280,7 @@ pub fn override_data_type(
inferred_schema: Arc<Schema>,
log_record: Value,
schema_version: SchemaVersion,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
) -> Arc<Schema> {
let Value::Object(map) = log_record else {
return inferred_schema;
Expand All @@ -293,7 +298,7 @@ pub fn override_data_type(
if TIME_FIELD_NAME_PARTS
.iter()
.any(|part| field_name.to_lowercase().contains(part))
&& field.data_type() == &DataType::Utf8
&& field.data_type() == &DataType::Utf8View
&& (DateTime::parse_from_rfc3339(s).is_ok()
|| DateTime::parse_from_rfc2822(s).is_ok()) =>
{
Expand All @@ -304,16 +309,42 @@ pub fn override_data_type(
true,
)
}
(SchemaVersion::V1, Some(Value::String(s)))
if existing_schema.is_none()
|| (existing_schema.is_some()
&& existing_schema.unwrap().get(field_name).is_some()
&& existing_schema
.unwrap()
.get(field_name)
.unwrap()
.data_type()
== &DataType::Int64
&& field.data_type() == &DataType::Utf8View
&& s.parse::<i64>().is_ok()) =>
{
// Update the field's data type to Float64
Field::new(field_name, DataType::Int64, true)
}
(SchemaVersion::V1, Some(Value::String(s)))
if TIME_FIELD_NAME_PARTS
.iter()
.any(|part| field_name.to_lowercase().contains(part))
&& field.data_type() == &DataType::Utf8View
&& NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok() =>
{
// Update the field's data type to Timestamp
Field::new(field_name, DataType::Date32, true)
}

// in V1 for new fields in json with inferred type number, cast as float64.
(SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_numeric() => {
(SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_integer() => {
// Update the field's data type to Float64
Field::new(field_name, DataType::Float64, true)
Field::new(field_name, DataType::Int64, true)
}
// Return the original field if no update is needed
_ => Field::new(field_name, field.data_type().clone(), true),
}
})
.collect();

Arc::new(Schema::new(updated_schema))
}
Loading