Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 148 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ rust-version = "1.77.1"
anyhow = "1.0.72"
apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "54.1.0" }
arrow-array = { version = "54.1.0" }
arrow-buffer = { version = "54.1.0" }
arrow-cast = { version = "54.1.0" }
arrow-ord = { version = "54.1.0" }
arrow-schema = { version = "54.1.0" }
arrow-select = { version = "54.1.0" }
arrow-string = { version = "54.1.0" }
arrow-arith = { version = "54.2.1" }
arrow-array = { version = "54.2.1" }
arrow-buffer = { version = "54.2.1" }
arrow-cast = { version = "54.2.1" }
arrow-ord = { version = "54.2.1" }
arrow-schema = { version = "54.2.1" }
arrow-select = { version = "54.2.1" }
arrow-string = { version = "54.2.1" }
async-stream = "0.3.5"
async-trait = "0.1.86"
async-std = "1.12"
aws-config = "1"
aws-sdk-glue = "1.39"
bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.6"
bytes = "1.10.1"
chrono = "0.4.38"
ctor = "0.2.8"
datafusion = "45"
datafusion = "46.0.0"
derive_builder = "0.20"
either = "1"
env_logger = "0.11.0"
Expand All @@ -78,7 +78,7 @@ num-bigint = "0.4.6"
once_cell = "1.20"
opendal = "0.51.2"
ordered-float = "4"
parquet = "54.1.0"
parquet = "54.2.1"
paste = "1.0.15"
pilota = "0.11.2"
pretty_assertions = "1.4"
Expand All @@ -94,12 +94,12 @@ serde_json = "1.0.138"
serde_repr = "0.1.16"
serde_with = "3.4"
tempfile = "3.18"
thrift = "0.17.0"
tokio = { version = "1.36", default-features = false }
thrift = "0.17.0"
tokio = { version = "1.43.0", default-features = false }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, it's better to avoid updating unrelated crates' versions unless we need to use their new APIs.

Our Cargo.toml maintains the minimal version we are using.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to do cargo generate-lockfile with checking minimal version, so either upgrade the tokio version or remove minimal version check.

typed-builder = "0.20"
url = "2.5.4"
urlencoding = "2"
uuid = { version = "1.13.1", features = ["v7"] }
uuid = { version = "1.15.1", features = ["v7"] }
volo-thrift = "0.10"
hive_metastore = "0.1"
tera = "1"
Expand Down
8 changes: 5 additions & 3 deletions crates/catalog/rest/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ impl HttpClient {
cfg.token()
.or_else(|| self.token.into_inner().ok().flatten()),
),
token_endpoint: (!cfg.get_token_endpoint().is_empty())
.then(|| cfg.get_token_endpoint())
.unwrap_or(self.token_endpoint),
token_endpoint: if !cfg.get_token_endpoint().is_empty() {
cfg.get_token_endpoint()
} else {
self.token_endpoint
},
credential: cfg.credential().or(self.credential),
extra_headers,
extra_oauth_params: (!cfg.extra_oauth_params().is_empty())
Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/s3tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "iceberg-catalog-s3tables"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
rust-version = "1.81.0"
rust-version = "1.82.0"

categories = ["database"]
description = "Apache Iceberg Rust S3Tables Catalog"
Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl S3TablesCatalog {
)
})?;

let file_io = FileIO::from_path(&format!("s3://{}", bucket_name))?
let file_io = FileIO::from_path(format!("s3://{}", bucket_name))?
.with_props(&config.properties)
.build()?;

Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl ArrowReader {
f.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|field_id| i32::from_str(field_id).ok())
.map_or(false, |field_id| {
.is_some_and(|field_id| {
projected_fields.insert((*f).clone(), field_id);
leaf_field_ids.contains(&field_id)
})
Expand Down Expand Up @@ -808,7 +808,7 @@ fn project_column(
type PredicateResult =
dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;

impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
impl BoundPredicateVisitor for PredicateConverter<'_> {
type T = Box<PredicateResult>;

fn always_true(&mut self) -> Result<Box<PredicateResult>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ pub enum TableRequirement {
}

/// TableUpdate represents an update to a table in the catalog.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(tag = "action", rename_all = "kebab-case")]
pub enum TableUpdate {
Expand Down Expand Up @@ -748,6 +749,7 @@ pub struct ViewCreation {
}

/// ViewUpdate represents an update to a view in the catalog.
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "kebab-case")]
pub enum ViewUpdate {
Expand Down
1 change: 0 additions & 1 deletion crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl PopulatedDeleteFileIndex {
/// 2. If the partition is empty and the delete file is not a positional delete,
/// it is added to the `global_delees` vector
/// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.

fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/inspect/metadata_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<'a> MetadataTable<'a> {
ManifestsTable::new(self.0)
}
}

#[allow(missing_docs)]
#[cfg(test)]
pub mod tests {
use expect_test::Expect;
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/io/storage_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access";
/// S3 Server Side Encryption Type.
pub const S3_SSE_TYPE: &str = "s3.sse.type";
/// S3 Server Side Encryption Key.
///
/// If S3 encryption type is kms, input is a KMS Key ID.
/// In case this property is not set, default key "aws/s3" is used.
/// If encryption type is custom, input is a custom base-64 AES256 symmetric key.
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/puffin/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ pub(crate) fn empty_footer_payload() -> FileMetadata {
}

pub(crate) fn empty_footer_payload_bytes() -> Vec<u8> {
return serde_json::to_string::<FileMetadata>(&empty_footer_payload())
serde_json::to_string::<FileMetadata>(&empty_footer_payload())
.unwrap()
.as_bytes()
.to_vec();
.to_vec()
}

pub(crate) fn empty_footer_payload_bytes_length_bytes() -> [u8; 4] {
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl TableScan {
}
});

return Ok(file_scan_task_rx.boxed());
Ok(file_scan_task_rx.boxed())
}

/// Returns an [`ArrowRecordBatchStream`].
Expand Down Expand Up @@ -1115,6 +1115,7 @@ impl FileScanTask {
}
}

#[allow(missing_docs)]
#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
7 changes: 5 additions & 2 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ impl fmt::Display for StructType {
#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
#[serde(from = "SerdeNestedField", into = "SerdeNestedField")]
/// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema.
///
/// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type.
/// Fields may have an optional comment or doc string. Fields can have default values.
pub struct NestedField {
Expand Down Expand Up @@ -699,6 +700,7 @@ impl fmt::Display for NestedField {

#[derive(Debug, PartialEq, Eq, Clone)]
/// A list is a collection of values with some element type. The element field has an integer id that is unique in the table schema.
///
/// Elements can be either optional or required. Element types may be any type.
pub struct ListType {
/// Element field of list type.
Expand Down Expand Up @@ -736,7 +738,7 @@ pub(super) mod _serde {
},
Struct {
r#type: String,
fields: Cow<'a, Vec<NestedFieldRef>>,
fields: Cow<'a, [NestedFieldRef]>,
},
#[serde(rename_all = "kebab-case")]
Map {
Expand All @@ -750,7 +752,7 @@ pub(super) mod _serde {
Primitive(PrimitiveType),
}

impl<'a> From<SerdeType<'a>> for Type {
impl From<SerdeType<'_>> for Type {
fn from(value: SerdeType) -> Self {
match value {
SerdeType::List {
Expand Down Expand Up @@ -819,6 +821,7 @@ pub(super) mod _serde {

#[derive(Debug, PartialEq, Eq, Clone)]
/// A map is a collection of key-value pairs with a key type and a value type.
///
/// Both the key field and value field each have an integer id that is unique in the table schema.
/// Map keys are required and map values can be either optional or required.
/// Both map keys and map values may be any type, including nested types.
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ pub struct UnboundPartitionField {
}

/// Unbound partition spec can be built without a schema and later bound to a schema.
///
/// They are used to transport schema information as part of the REST specification.
/// The main difference to [`PartitionSpec`] is that the field ids are optional.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/spec/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl SortOrderBuilder {
/// Creates a new unbound sort order.
pub fn build_unbound(&self) -> Result<SortOrder> {
let fields = self.fields.clone().unwrap_or_default();
return match (self.order_id, fields.as_slice()) {
match (self.order_id, fields.as_slice()) {
(Some(SortOrder::UNSORTED_ORDER_ID) | None, []) => Ok(SortOrder::unsorted_order()),
(_, []) => Err(Error::new(
ErrorKind::Unexpected,
Expand All @@ -164,7 +164,7 @@ impl SortOrderBuilder {
order_id: maybe_order_id.unwrap_or(1),
fields: fields.to_vec(),
}),
};
}
}

/// Creates a new bound sort order.
Expand Down
6 changes: 4 additions & 2 deletions crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,9 @@ impl Datum {
}

fn string_to_i128<S: AsRef<str>>(s: S) -> Result<i128> {
return s.as_ref().parse::<i128>().map_err(|e| {
s.as_ref().parse::<i128>().map_err(|e| {
Error::new(ErrorKind::DataInvalid, "Can't parse string to i128.").with_source(e)
});
})
}

/// Convert the datum to `target_type`.
Expand Down Expand Up @@ -1225,6 +1225,7 @@ impl Datum {
}

/// Map is a collection of key-value pairs with a key type and a value type.
///
/// It used in Literal::Map, to make it hashable, the order of key-value pairs is stored in a separate vector
/// so that we can hash the map in a deterministic way. But it also means that the order of key-value pairs is matter
/// for the hash value.
Expand Down Expand Up @@ -1721,6 +1722,7 @@ impl Literal {
}

/// The partition struct stores the tuple of partition values for each file.
///
/// Its type is derived from the partition fields of the partition spec used to write the manifest file.
/// In v2, the partition struct’s field ids must match the ids from the partition spec.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
6 changes: 4 additions & 2 deletions crates/iceberg/src/writer/file_writer/location_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ pub trait FileNameGenerator: Clone + Send + 'static {
fn generate_file_name(&self) -> String;
}

/// `DefaultFileNameGenerator` used to generate file name for data file. The file name can be
/// passed to `LocationGenerator` to generate the location of the file.
/// `DefaultFileNameGenerator` used to generate file name for data file.
///
/// The file name can be passed to `LocationGenerator`
/// to generate the location of the file.
/// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}".
#[derive(Clone, Debug)]
pub struct DefaultFileNameGenerator {
Expand Down
3 changes: 1 addition & 2 deletions crates/iceberg/src/writer/file_writer/track_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ impl TrackWriter {
impl FileWrite for TrackWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
self.inner.write(bs).await.map(|v| {
self.inner.write(bs).await.inspect(|_v| {
self.written_size
.fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
v
})
}

Expand Down
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ homepage = { workspace = true }
# kept the same as DataFusion's MSRV
# https://github.com/apache/datafusion?tab=readme-ov-file#rust-version-compatibility-policy
# https://github.com/apache/datafusion/blob/main/Cargo.toml#L68
rust-version = "1.80.1"
rust-version = "1.82.0"

categories = ["database"]
description = "Apache Iceberg DataFusion Integration"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.

[toolchain]
channel = "nightly-2024-06-10"
channel = "nightly-2024-10-17"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe those changes are not related. datafusion is only used in iceberg-datafusion,.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but there is a python binding ci job which should check clippy, so either we upgrade the toolchain version or ignore clippy check for datafusion integration

components = ["rustfmt", "clippy"]
Loading