diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs index 09465263..f7bd09a5 100644 --- a/bindings/python/src/lookup.rs +++ b/bindings/python/src/lookup.rs @@ -74,7 +74,7 @@ impl Lookuper { // Convert to Python with GIL Python::attach(|py| match row_opt { - Some(compacted_row) => internal_row_to_dict(py, &compacted_row, &table_info), + Some(row) => internal_row_to_dict(py, &row, &table_info), None => Ok(py.None()), }) }) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 7f1f64ef..0828b83b 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -19,13 +19,13 @@ use crate::client::metadata::Metadata; use crate::cluster::ServerNode; use crate::metadata::{ DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec, - PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath, + PhysicalTablePath, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, }; use crate::rpc::message::{ CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, - GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest, - ListTablesRequest, TableExistsRequest, + GetLatestLakeSnapshotRequest, GetTableRequest, GetTableSchemaRequestMsg, ListDatabasesRequest, + ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; @@ -111,6 +111,27 @@ impl FlussAdmin { Ok(()) } + /// Fetch the schema for `table_path` at the given `schema_id`. Pass + /// `None` to request the latest. + pub async fn get_table_schema( + &self, + table_path: &TablePath, + schema_id: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetTableSchemaRequestMsg::new(table_path, schema_id)) + .await?; + + let schema_node: serde_json::Value = serde_json::from_slice(&response.schema_json) + .map_err(|e| Error::JsonSerdeError { + message: format!("Failed to parse schema_json: {e}"), + })?; + let schema = Schema::deserialize_json(&schema_node)?; + Ok(SchemaInfo::new(schema, response.schema_id)) + } + pub async fn get_table_info(&self, table_path: &TablePath) -> Result { let response = self .admin_gateway() diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs index 77c30203..f8027948 100644 --- a/crates/fluss/src/client/mod.rs +++ b/crates/fluss/src/client/mod.rs @@ -20,6 +20,7 @@ mod connection; mod credentials; pub mod lookup; mod metadata; +mod schema_getter; mod table; mod write; @@ -28,5 +29,6 @@ pub use connection::*; pub use credentials::*; pub use lookup::LookupClient; pub use metadata::*; +pub(crate) use schema_getter::ClientSchemaGetter; pub use table::*; pub use write::*; diff --git a/crates/fluss/src/client/schema_getter.rs b/crates/fluss/src/client/schema_getter.rs new file mode 100644 index 00000000..4b643c0b --- /dev/null +++ b/crates/fluss/src/client/schema_getter.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-table schema cache that lazily fetches missing schema versions +//! from the coordinator. Used by the lookup path to decode rows that +//! predate the table's current schema. + +use crate::client::admin::FlussAdmin; +use crate::error::{Error, Result}; +use crate::metadata::{Schema, SchemaInfo, TablePath}; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; + +pub(crate) struct ClientSchemaGetter { + table_path: TablePath, + admin: Arc, + /// Pre-seeded with the table's current schema so the dominant case + /// (every row written under the latest schema) needs zero RPCs. + cache: RwLock>>, +} + +impl ClientSchemaGetter { + pub fn new(table_path: TablePath, admin: Arc, latest: SchemaInfo) -> Self { + let mut map = HashMap::new(); + let (schema, schema_id) = latest.into_parts(); + map.insert(schema_id, Arc::new(schema)); + Self { + table_path, + admin, + cache: RwLock::new(map), + } + } + + /// Concurrent fetches for the same id are not deduplicated; we + /// accept one redundant RPC in exchange for staying off + /// `tokio::sync` machinery. Schemas are immutable per id, so + /// last-write-wins on the cache insert is correct. + pub async fn get_schema(&self, schema_id: i32) -> Result> { + if let Some(schema) = self.cache.read().get(&schema_id).cloned() { + return Ok(schema); + } + + let info = self + .admin + .get_table_schema(&self.table_path, Some(schema_id)) + .await?; + let (schema, fetched_id) = info.into_parts(); + if fetched_id != schema_id { + return Err(Error::UnexpectedError { + message: format!( + "Requested schema id {schema_id}, but server returned schema id {fetched_id}" + ), + source: None, + }); + } + let schema = Arc::new(schema); + + self.cache.write().insert(schema_id, Arc::clone(&schema)); + Ok(schema) + } +} diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 2adb53e4..51a0a071 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -16,73 +16,126 @@ // under the License. use crate::bucketing::BucketingFunction; +use crate::client::ClientSchemaGetter; use crate::client::lookup::LookupClient; use crate::client::metadata::Metadata; use crate::client::table::partition_getter::PartitionGetter; use crate::error::{Error, Result}; -use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; +use crate::metadata::{ + KvFormat, PhysicalTablePath, RowType, Schema, TableBucket, TableInfo, TablePath, +}; use crate::record::RowAppendRecordBatchBuilder; use crate::record::kv::SCHEMA_ID_LENGTH; -use crate::row::InternalRow; -use crate::row::compacted::CompactedRow; use crate::row::encode::{KeyEncoder, KeyEncoderFactory}; +use crate::row::{FixedSchemaDecoder, InternalRow, LookupRow}; use arrow::array::RecordBatch; +use byteorder::{ByteOrder, LittleEndian}; +use futures::future::try_join_all; +use parking_lot::RwLock; +use std::collections::HashMap; use std::sync::Arc; -/// The result of a lookup operation. -/// -/// Contains the rows returned from a lookup. For primary-key lookups, -/// this will contain at most one row. For prefix-key lookups, it may -/// contain multiple rows. +/// Per-Lookuper decoder cache. The target-schema decoder is held +/// directly so the dominant decode path is a single field access; older +/// schemas are populated lazily on first observation. +struct DecoderCache { + target_id: i16, + target_decoder: Arc, + others: RwLock>>, +} + +impl DecoderCache { + fn new(target_id: i16, target_decoder: Arc) -> Self { + Self { + target_id, + target_decoder, + others: RwLock::new(HashMap::new()), + } + } + + fn decode<'a>(&self, schema_id: i16, bytes: &'a [u8]) -> Result> { + if schema_id == self.target_id { + return self.target_decoder.decode(bytes); + } + let decoder = + self.others + .read() + .get(&schema_id) + .cloned() + .ok_or_else(|| Error::RowConvertError { + message: format!("No decoder available for schema id {schema_id}"), + })?; + decoder.decode(bytes) + } + + fn contains(&self, schema_id: i16) -> bool { + schema_id == self.target_id || self.others.read().contains_key(&schema_id) + } + + fn insert(&self, schema_id: i16, decoder: Arc) { + self.others.write().insert(schema_id, decoder); + } + + #[cfg(test)] + fn get(&self, schema_id: i16) -> Option> { + if schema_id == self.target_id { + return Some(Arc::clone(&self.target_decoder)); + } + self.others.read().get(&schema_id).cloned() + } +} + +/// Rows returned from a lookup. Primary-key lookups produce at most one +/// row; prefix-key lookups may produce many. Rows written under older +/// schemas are decoded with their original schema and projected to the +/// schema captured when the `Lookuper` was created — schema evolutions +/// that land after that point are not picked up by an existing +/// `Lookuper`; create a new one to see them. pub struct LookupResult { rows: Vec>, - row_type: Arc, + target_row_type: Arc, + decoders: Arc, } impl LookupResult { - /// Creates a new LookupResult from a list of row bytes. - fn new(rows: Vec>, row_type: Arc) -> Self { - Self { rows, row_type } - } - - /// Creates an empty LookupResult. - fn empty(row_type: Arc) -> Self { + fn new(rows: Vec>, target_row_type: Arc, decoders: Arc) -> Self { Self { - rows: Vec::new(), - row_type, + rows, + target_row_type, + decoders, } } - /// Extracts the row payload by stripping the schema id prefix. - fn extract_payload(bytes: &[u8]) -> Result<&[u8]> { - bytes - .get(SCHEMA_ID_LENGTH..) - .ok_or_else(|| Error::RowConvertError { + fn read_schema_id(bytes: &[u8]) -> Result { + if bytes.len() < SCHEMA_ID_LENGTH { + return Err(Error::RowConvertError { message: format!( "Row payload too short: {} bytes, need at least {} for schema id", bytes.len(), SCHEMA_ID_LENGTH ), - }) + }); + } + let schema_id = LittleEndian::read_i16(&bytes[..SCHEMA_ID_LENGTH]); + if schema_id < 0 { + return Err(Error::RowConvertError { + message: format!("Invalid negative schema id {schema_id}; row prefix is corrupt"), + }); + } + Ok(schema_id) } - /// Returns the only row in the result set as a [`CompactedRow`]. - /// - /// This method provides a zero-copy view of the row data, which means the returned - /// `CompactedRow` borrows from this result set and cannot outlive it. - /// - /// # Returns - /// - `Ok(Some(row))`: If exactly one row exists. - /// - `Ok(None)`: If the result set is empty. - /// - `Err(Error::UnexpectedError)`: If the result set contains more than one row. - /// - `Err(Error)`: If the row payload is too short to contain a schema id. - pub fn get_single_row(&self) -> Result>> { + fn decode<'a>(&self, bytes: &'a [u8]) -> Result> { + let schema_id = Self::read_schema_id(bytes)?; + self.decoders.decode(schema_id, bytes) + } + + /// Returns the single row when exactly one is present, `None` for + /// empty, or an error if the result holds more than one row. + pub fn get_single_row(&self) -> Result>> { match self.rows.len() { 0 => Ok(None), - 1 => { - let payload = Self::extract_payload(&self.rows[0])?; - Ok(Some(CompactedRow::from_bytes(&self.row_type, payload))) - } + 1 => Ok(Some(self.decode(&self.rows[0])?)), _ => Err(Error::UnexpectedError { message: "LookupResult contains multiple rows, use get_rows() instead".to_string(), source: None, @@ -90,44 +143,105 @@ impl LookupResult { } } - /// Returns all rows in the result set as [`CompactedRow`]s. - /// - /// # Returns - /// - `Ok(rows)` - All rows in the result set. - /// - `Err(Error)` - If any row payload is too short to contain a schema id. - pub fn get_rows(&self) -> Result>> { - self.rows - .iter() - // TODO Add schema id check and fetch when implementing prefix lookup - .map(|bytes| { - let payload = Self::extract_payload(bytes)?; - Ok(CompactedRow::from_bytes(&self.row_type, payload)) - }) - .collect() + pub fn get_rows(&self) -> Result>> { + self.rows.iter().map(|bytes| self.decode(bytes)).collect() } - /// Converts all rows in this result into an Arrow [`RecordBatch`]. - /// - /// This is useful for integration with DataFusion or other Arrow-based tools. - /// - /// # Returns - /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty - /// batch (with the correct schema) if the result set is empty. - /// - `Err(Error)` - If the conversion fails. pub fn to_record_batch(&self) -> Result { - let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?; - + let mut builder = RowAppendRecordBatchBuilder::new(&self.target_row_type)?; for bytes in &self.rows { - let payload = Self::extract_payload(bytes)?; - - let row = CompactedRow::from_bytes(&self.row_type, payload); + let row = self.decode(bytes)?; builder.append(&row)?; } - builder.build_arrow_record_batch().map(Arc::unwrap_or_clone) } } +struct LookupSchemaCtx { + target_schema: Arc, + target_row_type: Arc, + kv_format: KvFormat, + schema_getter: Arc, + decoders: Arc, +} + +impl LookupSchemaCtx { + fn new(table_info: &TableInfo, schema_getter: Arc) -> Result { + let target_schema_i32 = table_info.get_schema_id(); + if !(0..=i16::MAX as i32).contains(&target_schema_i32) { + return Err(Error::UnexpectedError { + message: format!( + "Schema id {target_schema_i32} does not fit in 16 bits — wire format violated" + ), + source: None, + }); + } + let target_schema = Arc::new(table_info.get_schema().clone()); + let target_row_type = Arc::new(table_info.row_type().clone()); + let kv_format = table_info.get_table_config().get_kv_format()?; + let target_decoder = Arc::new(FixedSchemaDecoder::new_no_projection( + kv_format, + target_schema.as_ref(), + )?); + let decoders = Arc::new(DecoderCache::new(target_schema_i32 as i16, target_decoder)); + Ok(Self { + target_schema, + target_row_type, + kv_format, + schema_getter, + decoders, + }) + } + + async fn ensure_decoders(&self, rows: &[Vec]) -> Result<()> { + let mut missing: Vec = Vec::new(); + for bytes in rows { + let schema_id = LookupResult::read_schema_id(bytes)?; + if !self.decoders.contains(schema_id) && !missing.contains(&schema_id) { + missing.push(schema_id); + } + } + if missing.is_empty() { + return Ok(()); + } + + let fetches = missing.into_iter().map(|schema_id| { + let cache = Arc::clone(&self.decoders); + let schema_getter = Arc::clone(&self.schema_getter); + let target_schema = Arc::clone(&self.target_schema); + let kv_format = self.kv_format; + async move { + let source = schema_getter.get_schema(schema_id as i32).await?; + let decoder = + FixedSchemaDecoder::new(kv_format, source.as_ref(), target_schema.as_ref())?; + cache.insert(schema_id, Arc::new(decoder)); + Ok::<_, Error>(()) + } + }); + try_join_all(fetches).await?; + Ok(()) + } + + async fn build_result(&self, rows: Vec>) -> Result { + if !rows.is_empty() { + self.ensure_decoders(&rows).await?; + } + Ok(LookupResult::new( + rows, + Arc::clone(&self.target_row_type), + Arc::clone(&self.decoders), + )) + } + + fn empty_result(&self) -> LookupResult { + LookupResult::new( + Vec::new(), + Arc::clone(&self.target_row_type), + Arc::clone(&self.decoders), + ) + } +} + /// Builder for lookup operations. `create_lookuper()` builds a primary-key /// `Lookuper`; `lookup_by(columns).create_lookuper()` builds a /// `PrefixKeyLookuper` for prefix scans. @@ -136,6 +250,7 @@ pub struct TableLookup { lookup_client: Arc, table_info: TableInfo, metadata: Arc, + schema_getter: Arc, } impl TableLookup { @@ -143,11 +258,13 @@ impl TableLookup { lookup_client: Arc, table_info: TableInfo, metadata: Arc, + schema_getter: Arc, ) -> Self { Self { lookup_client, table_info, metadata, + schema_getter, } } @@ -161,6 +278,7 @@ impl TableLookup { lookup_client: self.lookup_client, table_info: self.table_info, metadata: self.metadata, + schema_getter: self.schema_getter, lookup_column_names, } } @@ -208,11 +326,10 @@ impl TableLookup { None }; - let row_type = Arc::new(self.table_info.row_type().clone()); + let schema_ctx = LookupSchemaCtx::new(&self.table_info, self.schema_getter)?; Ok(Lookuper { table_path: Arc::new(self.table_info.table_path.clone()), - row_type, table_info: self.table_info, metadata: self.metadata, lookup_client: self.lookup_client, @@ -221,6 +338,7 @@ impl TableLookup { bucket_key_encoder, partition_getter, num_buckets, + schema_ctx, }) } } @@ -239,7 +357,6 @@ impl TableLookup { pub struct Lookuper { table_path: Arc, table_info: TableInfo, - row_type: Arc, metadata: Arc, lookup_client: Arc, bucketing_function: Box, @@ -247,6 +364,7 @@ pub struct Lookuper { bucket_key_encoder: Option>, partition_getter: Option, num_buckets: i32, + schema_ctx: LookupSchemaCtx, } impl Lookuper { @@ -281,7 +399,7 @@ impl Lookuper { .await? { Some(id) => Some(id), - None => return Ok(LookupResult::empty(Arc::clone(&self.row_type))), + None => return Ok(self.schema_ctx.empty_result()), } } else { None @@ -300,13 +418,11 @@ impl Lookuper { .lookup(self.table_path.as_ref().clone(), table_bucket, pk_bytes) .await?; - match result { - Some(value_bytes) => Ok(LookupResult::new( - vec![value_bytes], - Arc::clone(&self.row_type), - )), - None => Ok(LookupResult::empty(Arc::clone(&self.row_type))), - } + let rows = match result { + Some(value_bytes) => vec![value_bytes], + None => Vec::new(), + }; + self.schema_ctx.build_result(rows).await } /// Returns a reference to the table info. @@ -319,6 +435,7 @@ pub struct TablePrefixLookup { lookup_client: Arc, table_info: TableInfo, metadata: Arc, + schema_getter: Arc, lookup_column_names: Vec, } @@ -346,11 +463,10 @@ impl TablePrefixLookup { None }; - let full_row_type = Arc::new(self.table_info.row_type().clone()); + let schema_ctx = LookupSchemaCtx::new(&self.table_info, self.schema_getter)?; Ok(PrefixKeyLookuper { table_path: Arc::new(self.table_info.table_path.clone()), - row_type: full_row_type, table_info: self.table_info, metadata: self.metadata, lookup_client: self.lookup_client, @@ -358,6 +474,7 @@ impl TablePrefixLookup { prefix_key_encoder, partition_getter, num_buckets, + schema_ctx, }) } } @@ -454,13 +571,13 @@ fn validate_prefix_lookup(table_info: &TableInfo, lookup_columns: &[String]) -> pub struct PrefixKeyLookuper { table_path: Arc, table_info: TableInfo, - row_type: Arc, metadata: Arc, lookup_client: Arc, bucketing_function: Box, prefix_key_encoder: Box, partition_getter: Option, num_buckets: i32, + schema_ctx: LookupSchemaCtx, } impl PrefixKeyLookuper { @@ -479,7 +596,7 @@ impl PrefixKeyLookuper { .await? { Some(id) => Some(id), - None => return Ok(LookupResult::empty(Arc::clone(&self.row_type))), + None => return Ok(self.schema_ctx.empty_result()), } } else { None @@ -497,7 +614,7 @@ impl PrefixKeyLookuper { .prefix_lookup(self.table_path.as_ref().clone(), table_bucket, prefix_bytes) .await?; - Ok(LookupResult::new(rows, Arc::clone(&self.row_type))) + self.schema_ctx.build_result(rows).await } pub fn table_info(&self) -> &TableInfo { @@ -508,7 +625,7 @@ impl PrefixKeyLookuper { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataField, DataTypes}; + use crate::metadata::{Column, DataTypes, Schema}; use crate::row::binary::BinaryWriter; use crate::row::compacted::CompactedRowWriter; use arrow::array::Int32Array; @@ -520,34 +637,56 @@ mod tests { bytes } + fn schema_with_ids(columns: &[(i32, &str, crate::metadata::DataType)]) -> Schema { + let cols: Vec = columns + .iter() + .map(|(id, name, dt)| Column::new(*name, dt.clone()).with_id(*id)) + .collect(); + Schema::builder().with_columns(cols).build().unwrap() + } + + fn cache_with( + target_id: i16, + target_decoder: FixedSchemaDecoder, + others: Vec<(i16, FixedSchemaDecoder)>, + ) -> Arc { + let cache = DecoderCache::new(target_id, Arc::new(target_decoder)); + for (id, decoder) in others { + cache.insert(id, Arc::new(decoder)); + } + Arc::new(cache) + } + + fn lookup_result_from( + rows: Vec>, + target_schema: &Schema, + decoders: Arc, + ) -> LookupResult { + LookupResult::new(rows, Arc::new(target_schema.row_type().clone()), decoders) + } + #[test] fn test_to_record_batch_empty() { - let row_type = Arc::new(RowType::new(vec![DataField::new( - "id", - DataTypes::int(), - None, - )])); - let result = LookupResult::empty(row_type); + let target = schema_with_ids(&[(0, "id", DataTypes::int())]); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(); + let result = lookup_result_from(Vec::new(), &target, cache_with(0, decoder, vec![])); let batch = result.to_record_batch().unwrap(); assert_eq!(batch.num_rows(), 0); assert_eq!(batch.num_columns(), 1); } #[test] - fn test_to_record_batch_with_row() { - let row_type = Arc::new(RowType::new(vec![DataField::new( - "id", - DataTypes::int(), - None, - )])); + fn test_to_record_batch_with_row_at_target_schema() { + let target = schema_with_ids(&[(0, "id", DataTypes::int())]); let mut writer = CompactedRowWriter::new(1); writer.write_int(42); let row_bytes = make_row_bytes(0, writer.buffer()); - let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type)); - let batch = result.to_record_batch().unwrap(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(); + let result = lookup_result_from(vec![row_bytes], &target, cache_with(0, decoder, vec![])); + let batch = result.to_record_batch().unwrap(); assert_eq!(batch.num_rows(), 1); let col = batch .column(0) @@ -557,15 +696,79 @@ mod tests { assert_eq!(col.value(0), 42); } + #[test] + fn test_get_rows_decodes_per_row_schema_id_with_projection() { + let source = schema_with_ids(&[(0, "a", DataTypes::int())]); + let target = schema_with_ids(&[(0, "a", DataTypes::int()), (1, "b", DataTypes::string())]); + + let mut w = CompactedRowWriter::new(1); + w.write_int(7); + let old_row = make_row_bytes(3, w.buffer()); + + let mut w = CompactedRowWriter::new(2); + w.write_int(8); + w.write_string("eight"); + let new_row = make_row_bytes(7, w.buffer()); + + let target_decoder = + FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(); + let projection_decoder = + FixedSchemaDecoder::new(KvFormat::COMPACTED, &source, &target).unwrap(); + let cache = cache_with(7, target_decoder, vec![(3, projection_decoder)]); + let result = lookup_result_from(vec![old_row, new_row], &target, cache); + + let rows = result.get_rows().unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].get_int(0).unwrap(), 7); + assert!(rows[0].is_null_at(1).unwrap()); + assert_eq!(rows[1].get_int(0).unwrap(), 8); + assert_eq!(rows[1].get_string(1).unwrap(), "eight"); + } + #[test] fn test_to_record_batch_payload_too_short() { - let row_type = Arc::new(RowType::new(vec![DataField::new( - "id", - DataTypes::int(), - None, - )])); - // Only 1 byte — shorter than SCHEMA_ID_LENGTH (2) - let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type)); + let target = schema_with_ids(&[(0, "id", DataTypes::int())]); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(); + let result = lookup_result_from(vec![vec![0u8]], &target, cache_with(0, decoder, vec![])); assert!(result.to_record_batch().is_err()); } + + #[test] + fn test_get_rows_errors_when_no_decoder_for_schema_id() { + let target = schema_with_ids(&[(0, "id", DataTypes::int())]); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(); + let mut w = CompactedRowWriter::new(1); + w.write_int(1); + let row = make_row_bytes(99, w.buffer()); + let result = lookup_result_from(vec![row], &target, cache_with(0, decoder, vec![])); + + let err = result + .get_rows() + .map(|_| ()) + .map_err(|e| e.to_string()) + .unwrap_err(); + assert!(err.contains("schema id 99"), "{err}"); + } + + #[test] + fn test_read_schema_id_rejects_negative() { + let bytes = [0xFFu8, 0xFFu8, 0u8]; + let err = LookupResult::read_schema_id(&bytes).unwrap_err(); + assert!( + err.to_string().contains("Invalid negative schema id"), + "{err}" + ); + } + + #[test] + fn test_decoder_cache_target_lookup_skips_lock() { + let target = schema_with_ids(&[(0, "a", DataTypes::int())]); + let target_decoder = + Arc::new(FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap()); + let cache = DecoderCache::new(7, Arc::clone(&target_decoder)); + + let returned = cache.get(7).expect("target id must hit the cache"); + assert!(Arc::ptr_eq(&returned, &target_decoder)); + assert!(cache.get(99).is_none()); + } } diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 2d3d0171..ba1edd2f 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -17,8 +17,9 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; +use crate::client::schema_getter::ClientSchemaGetter; use crate::error::{Error, Result}; -use crate::metadata::{TableInfo, TablePath}; +use crate::metadata::{SchemaInfo, TableInfo, TablePath}; use std::sync::Arc; pub const EARLIEST_OFFSET: i64 = -2; @@ -121,10 +122,22 @@ impl<'a> FlussTable<'a> { }); } let lookup_client = self.conn.get_or_create_lookup_client()?; + // Pre-seed the schema getter with the table's current schema — + // rows written under it (the dominant case) never trigger an RPC. + let latest = SchemaInfo::new( + self.table_info.get_schema().clone(), + self.table_info.get_schema_id(), + ); + let schema_getter = Arc::new(ClientSchemaGetter::new( + self.table_path.clone(), + self.conn.get_admin()?, + latest, + )); Ok(TableLookup::new( lookup_client, self.table_info.clone(), self.metadata.clone(), + schema_getter, )) } diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index d9eb56b5..4103aa3d 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -93,6 +93,60 @@ impl DataType { DataType::Bytes(v) => DataType::Bytes(v.as_non_nullable()), } } + + /// Structural equality ignoring the outermost nullability flag at + /// every level. Equivalent to comparing `as_non_nullable()` on both + /// sides but without the recursive clone. + pub(crate) fn eq_ignore_nullable(&self, other: &DataType) -> bool { + match self { + DataType::Boolean(_) => matches!(other, DataType::Boolean(_)), + DataType::TinyInt(_) => matches!(other, DataType::TinyInt(_)), + DataType::SmallInt(_) => matches!(other, DataType::SmallInt(_)), + DataType::Int(_) => matches!(other, DataType::Int(_)), + DataType::BigInt(_) => matches!(other, DataType::BigInt(_)), + DataType::Float(_) => matches!(other, DataType::Float(_)), + DataType::Double(_) => matches!(other, DataType::Double(_)), + DataType::Date(_) => matches!(other, DataType::Date(_)), + DataType::String(_) => matches!(other, DataType::String(_)), + DataType::Bytes(_) => matches!(other, DataType::Bytes(_)), + DataType::Char(a) => { + matches!(other, DataType::Char(b) if a.length() == b.length()) + } + DataType::Binary(a) => { + matches!(other, DataType::Binary(b) if a.length() == b.length()) + } + DataType::Decimal(a) => matches!( + other, + DataType::Decimal(b) if a.precision() == b.precision() && a.scale() == b.scale() + ), + DataType::Time(a) => { + matches!(other, DataType::Time(b) if a.precision() == b.precision()) + } + DataType::Timestamp(a) => { + matches!(other, DataType::Timestamp(b) if a.precision() == b.precision()) + } + DataType::TimestampLTz(a) => { + matches!(other, DataType::TimestampLTz(b) if a.precision() == b.precision()) + } + DataType::Array(a) => matches!( + other, + DataType::Array(b) if a.get_element_type().eq_ignore_nullable(b.get_element_type()) + ), + DataType::Map(a) => matches!( + other, + DataType::Map(b) + if a.key_type().eq_ignore_nullable(b.key_type()) + && a.value_type().eq_ignore_nullable(b.value_type()) + ), + DataType::Row(a) => matches!( + other, + DataType::Row(b) if a.fields().len() == b.fields().len() + && a.fields().iter().zip(b.fields().iter()).all(|(x, y)| { + x.name() == y.name() && x.data_type().eq_ignore_nullable(y.data_type()) + }) + ), + } + } } impl Display for DataType { @@ -1695,3 +1749,106 @@ fn test_row_type_project_duplicate_indices() { assert_eq!(projected.fields()[1].name, "id"); assert_eq!(projected.fields()[2].name, "name"); } + +#[cfg(test)] +mod eq_ignore_nullable_tests { + use super::*; + + #[test] + fn ignores_nullability_at_top_level() { + let nullable = DataType::Int(IntType::new()); + let non_nullable = DataType::Int(IntType::with_nullable(false)); + assert_ne!(nullable, non_nullable, "PartialEq still distinguishes"); + assert!(nullable.eq_ignore_nullable(&non_nullable)); + assert!(non_nullable.eq_ignore_nullable(&nullable)); + } + + #[test] + fn rejects_different_kinds() { + assert!( + !DataType::Int(IntType::new()).eq_ignore_nullable(&DataType::BigInt(BigIntType::new())) + ); + } + + #[test] + fn compares_parameterized_types() { + // Char length must match. + assert!( + DataType::Char(CharType::with_nullable(10, true)) + .eq_ignore_nullable(&DataType::Char(CharType::with_nullable(10, false))) + ); + assert!( + !DataType::Char(CharType::with_nullable(10, true)) + .eq_ignore_nullable(&DataType::Char(CharType::with_nullable(11, true))) + ); + + // Decimal precision + scale must match. + let a = DataType::Decimal(DecimalType::with_nullable(true, 10, 2).unwrap()); + let b = DataType::Decimal(DecimalType::with_nullable(false, 10, 2).unwrap()); + let c = DataType::Decimal(DecimalType::with_nullable(true, 10, 3).unwrap()); + assert!(a.eq_ignore_nullable(&b)); + assert!(!a.eq_ignore_nullable(&c)); + } + + #[test] + fn recurses_into_array_and_map() { + // Array ~ Array + let a = DataType::Array(ArrayType::with_nullable( + true, + DataType::Int(IntType::new()), + )); + let b = DataType::Array(ArrayType::with_nullable( + false, + DataType::Int(IntType::with_nullable(false)), + )); + assert!(a.eq_ignore_nullable(&b)); + + // Map on both sides, mixed nullability. + let m1 = DataType::Map(MapType::with_nullable( + true, + DataType::String(StringType::new()), + DataType::Int(IntType::new()), + )); + let m2 = DataType::Map(MapType::with_nullable( + false, + DataType::String(StringType::with_nullable(false)), + DataType::Int(IntType::with_nullable(false)), + )); + assert!(m1.eq_ignore_nullable(&m2)); + + // Map element-type mismatch is still caught. + let m3 = DataType::Map(MapType::with_nullable( + true, + DataType::String(StringType::new()), + DataType::BigInt(BigIntType::new()), + )); + assert!(!m1.eq_ignore_nullable(&m3)); + } + + #[test] + fn recurses_into_row_fields() { + let r1 = DataType::Row(RowType::new(vec![ + DataField::new("a", DataType::Int(IntType::new()), None), + DataField::new("b", DataType::String(StringType::new()), None), + ])); + let r2 = DataType::Row(RowType::with_nullable( + false, + vec![ + DataField::new("a", DataType::Int(IntType::with_nullable(false)), None), + DataField::new( + "b", + DataType::String(StringType::with_nullable(false)), + None, + ), + ], + )); + assert!(r1.eq_ignore_nullable(&r2)); + + // Field name mismatch must fail. + let r3 = DataType::Row(RowType::new(vec![ + DataField::new("renamed_a", DataType::Int(IntType::new()), None), + DataField::new("b", DataType::String(StringType::new()), None), + ])); + assert!(!r1.eq_ignore_nullable(&r3)); + } +} diff --git a/crates/fluss/src/metadata/json_serde.rs b/crates/fluss/src/metadata/json_serde.rs index 20f5d097..efbce84d 100644 --- a/crates/fluss/src/metadata/json_serde.rs +++ b/crates/fluss/src/metadata/json_serde.rs @@ -377,6 +377,7 @@ impl Column { const NAME: &'static str = "name"; const DATA_TYPE: &'static str = "data_type"; const COMMENT: &'static str = "comment"; + const ID: &'static str = "id"; } impl JsonSerde for Column { @@ -394,6 +395,9 @@ impl JsonSerde for Column { obj.insert(Self::COMMENT.to_string(), json!(comment)); } + // The Java client requires `id` on input. + obj.insert(Self::ID.to_string(), json!(self.id())); + Ok(Value::Object(obj)) } @@ -419,6 +423,15 @@ impl JsonSerde for Column { column = column.with_comment(comment); } + // Pre-id JSON is treated as unassigned; SchemaBuilder will + // auto-assign on build. + if let Some(id) = node.get(Self::ID).and_then(|v| v.as_i64()) { + let id = i32::try_from(id).map_err(|_| Error::JsonSerdeError { + message: format!("Column id {id} does not fit in i32"), + })?; + column = column.with_id(id); + } + Ok(column) } } @@ -671,6 +684,95 @@ mod tests { use super::*; use crate::metadata::DataTypes; + #[test] + fn column_id_round_trip_through_json() { + use crate::metadata::Column; + + let col = Column::new("a", DataTypes::int()) + .with_id(7) + .with_comment("desc"); + let json = col.serialize_json().unwrap(); + assert_eq!(json.get("id").and_then(|v| v.as_i64()), Some(7)); + let round_tripped = Column::deserialize_json(&json).unwrap(); + assert_eq!(round_tripped, col); + } + + #[test] + fn schema_assigns_ids_when_absent_and_preserves_when_present() { + use crate::metadata::{Column, Schema}; + + let auto = Schema::builder() + .column("a", DataTypes::int()) + .column("b", DataTypes::string()) + .build() + .unwrap(); + let ids: Vec = auto.columns().iter().map(|c| c.id()).collect(); + assert_eq!(ids, vec![0, 1]); + + let preserved = Schema::builder() + .with_columns(vec![ + Column::new("a", DataTypes::int()).with_id(3), + Column::new("b", DataTypes::string()).with_id(7), + ]) + .build() + .unwrap(); + let ids: Vec = preserved.columns().iter().map(|c| c.id()).collect(); + assert_eq!(ids, vec![3, 7]); + } + + #[test] + fn schema_rejects_duplicate_ids() { + use crate::metadata::Column; + let err = Schema::builder() + .with_columns(vec![ + Column::new("a", DataTypes::int()).with_id(7), + Column::new("b", DataTypes::string()).with_id(7), + ]) + .build() + .unwrap_err(); + assert!(err.to_string().contains("Duplicate column id 7"), "{err}"); + } + + #[test] + fn schema_rejects_negative_non_sentinel_ids() { + use crate::metadata::Column; + let err = Schema::builder() + .with_columns(vec![Column::new("a", DataTypes::int()).with_id(-7)]) + .build() + .unwrap_err(); + assert!(err.to_string().contains("invalid id -7"), "{err}"); + } + + #[test] + fn column_json_id_overflow_errors() { + use crate::metadata::Column; + let json = serde_json::json!({ + "name": "a", + "data_type": Column::new("a", DataTypes::int()).serialize_json().unwrap() + .get("data_type").unwrap(), + "id": (i32::MAX as i64) + 1, + }); + let err = Column::deserialize_json(&json).unwrap_err(); + assert!(err.to_string().contains("does not fit in i32"), "{err}"); + } + + #[test] + fn schema_rejects_partially_assigned_ids() { + use crate::metadata::Column; + + let err = Schema::builder() + .with_columns(vec![ + Column::new("a", DataTypes::int()).with_id(0), + Column::new("b", DataTypes::string()), + ]) + .build() + .unwrap_err(); + assert!( + err.to_string().contains("All columns must have an id"), + "{err}" + ); + } + #[test] fn test_datatype_json_serde() { let data_types = vec![ diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 0ca654a6..c1d1b72c 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -20,6 +20,7 @@ mod database; mod datatype; mod json_serde; mod partition; +mod schema_util; mod table; pub use data_lake_format::*; @@ -27,4 +28,5 @@ pub use database::*; pub use datatype::*; pub use json_serde::*; pub use partition::*; +pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use table::*; diff --git a/crates/fluss/src/metadata/schema_util.rs b/crates/fluss/src/metadata/schema_util.rs new file mode 100644 index 00000000..498a526e --- /dev/null +++ b/crates/fluss/src/metadata/schema_util.rs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::metadata::{Schema, UNKNOWN_COLUMN_ID}; +use std::collections::{HashMap, HashSet}; + +/// Sentinel for an expected column that does not exist in the origin +/// schema. Used by [`index_mapping`] and [`crate::row::ProjectedRow`]. +pub(crate) const UNEXIST_MAPPING: i32 = -1; + +/// For each column in `expected_schema`, return the index of the column +/// with the same id in `origin_schema`, or [`UNEXIST_MAPPING`] if absent. +/// Matching by id keeps mappings stable across `ALTER TABLE … RENAME`. +pub(crate) fn index_mapping(origin_schema: &Schema, expected_schema: &Schema) -> Result> { + let origin_columns = origin_schema.columns(); + let mut origin_id_to_index: HashMap = HashMap::with_capacity(origin_columns.len()); + for (i, col) in origin_columns.iter().enumerate() { + if col.id() == UNKNOWN_COLUMN_ID { + return Err(Error::RowConvertError { + message: format!( + "origin schema column '{}' has no assigned id; cannot build index mapping", + col.name() + ), + }); + } + if origin_id_to_index.insert(col.id(), i).is_some() { + return Err(Error::RowConvertError { + message: format!("duplicate column id {} in origin schema", col.id()), + }); + } + } + + let expected_columns = expected_schema.columns(); + let mut mapping = Vec::with_capacity(expected_columns.len()); + let mut expected_seen: HashSet = HashSet::with_capacity(expected_columns.len()); + + for expected in expected_columns { + if expected.id() == UNKNOWN_COLUMN_ID { + return Err(Error::RowConvertError { + message: format!( + "expected schema column '{}' has no assigned id; cannot build index mapping", + expected.name() + ), + }); + } + if !expected_seen.insert(expected.id()) { + return Err(Error::RowConvertError { + message: format!("duplicate column id {} in expected schema", expected.id()), + }); + } + match origin_id_to_index.get(&expected.id()) { + None => mapping.push(UNEXIST_MAPPING), + Some(&idx) => { + let origin = &origin_columns[idx]; + if !origin.data_type().eq_ignore_nullable(expected.data_type()) { + return Err(Error::RowConvertError { + message: format!( + "Expected datatype of column(id={},name={}) is [{}], while the actual datatype is [{}]", + expected.id(), + expected.name(), + expected.data_type(), + origin.data_type() + ), + }); + } + mapping.push(idx as i32); + } + } + } + + Ok(mapping) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{Column, DataType, DataTypes}; + + fn schema_auto(columns: &[(&str, DataType)]) -> Schema { + let mut b = Schema::builder(); + for (name, dt) in columns { + b = b.column(*name, dt.clone()); + } + b.build().expect("schema build") + } + + fn schema_with_ids(columns: &[(i32, &str, DataType)]) -> Schema { + let cols: Vec = columns + .iter() + .map(|(id, name, dt)| Column::new(*name, dt.clone()).with_id(*id)) + .collect(); + Schema::builder() + .with_columns(cols) + .build() + .expect("schema build") + } + + #[test] + fn identity_mapping_when_schemas_equal() { + let s = schema_auto(&[ + ("a", DataTypes::bigint()), + ("b", DataTypes::string()), + ("c", DataTypes::int()), + ]); + assert_eq!(index_mapping(&s, &s).unwrap(), vec![0, 1, 2]); + } + + #[test] + fn projection_subset_in_order() { + let origin = schema_auto(&[ + ("a", DataTypes::bigint()), + ("b", DataTypes::string()), + ("c", DataTypes::int()), + ]); + let expected = + schema_with_ids(&[(0, "a", DataTypes::bigint()), (2, "c", DataTypes::int())]); + assert_eq!(index_mapping(&origin, &expected).unwrap(), vec![0, 2]); + } + + #[test] + fn reorder_mapping() { + let origin = schema_auto(&[ + ("a", DataTypes::bigint()), + ("b", DataTypes::string()), + ("c", DataTypes::int()), + ]); + let expected = schema_with_ids(&[ + (2, "c", DataTypes::int()), + (0, "a", DataTypes::bigint()), + (1, "b", DataTypes::string()), + ]); + assert_eq!(index_mapping(&origin, &expected).unwrap(), vec![2, 0, 1]); + } + + #[test] + fn missing_column_returns_sentinel() { + let origin = schema_auto(&[("a", DataTypes::bigint())]); + let expected = schema_with_ids(&[ + (0, "a", DataTypes::bigint()), + (1, "new_col", DataTypes::string()), + ]); + assert_eq!( + index_mapping(&origin, &expected).unwrap(), + vec![0, UNEXIST_MAPPING] + ); + } + + #[test] + fn rename_preserves_mapping_when_id_matches() { + let origin = schema_with_ids(&[(0, "old_name", DataTypes::int())]); + let expected = schema_with_ids(&[(0, "new_name", DataTypes::int())]); + assert_eq!(index_mapping(&origin, &expected).unwrap(), vec![0]); + } + + #[test] + fn drop_then_add_with_same_name_does_not_alias() { + let origin = schema_with_ids(&[(0, "a", DataTypes::int())]); + let expected = schema_with_ids(&[(5, "a", DataTypes::int())]); + assert_eq!( + index_mapping(&origin, &expected).unwrap(), + vec![UNEXIST_MAPPING] + ); + } + + #[test] + fn datatype_mismatch_returns_error() { + let origin = schema_auto(&[("a", DataTypes::bigint())]); + let expected = schema_with_ids(&[(0, "a", DataTypes::int())]); + let err = index_mapping(&origin, &expected).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("id=0"), "{msg}"); + assert!(msg.contains("name=a"), "{msg}"); + assert!(msg.contains("INT"), "{msg}"); + assert!(msg.contains("BIGINT"), "{msg}"); + } + + #[test] + fn nullability_difference_does_not_error() { + // Primary-key normalization makes the origin non-nullable while + // the expected is nullable. + let origin = Schema::builder() + .column("a", DataTypes::int()) + .primary_key(["a"]) + .build() + .unwrap(); + let expected = schema_with_ids(&[(0, "a", DataTypes::int())]); + assert_eq!(index_mapping(&origin, &expected).unwrap(), vec![0]); + } +} diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index dba9a523..ebaf085e 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -28,11 +28,15 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use strum_macros::EnumString; +/// Sentinel for a column whose stable id has not yet been assigned. +pub const UNKNOWN_COLUMN_ID: i32 = -1; + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Column { name: String, data_type: DataType, comment: Option, + id: i32, } impl Column { @@ -41,6 +45,7 @@ impl Column { name: name.into(), data_type, comment: None, + id: UNKNOWN_COLUMN_ID, } } @@ -54,9 +59,15 @@ impl Column { name: self.name.clone(), data_type: data_type.clone(), comment: self.comment.clone(), + id: self.id, } } + pub fn with_id(mut self, id: i32) -> Self { + self.id = id; + self + } + // Getters... pub fn name(&self) -> &str { &self.name @@ -69,6 +80,12 @@ impl Column { pub fn comment(&self) -> Option<&str> { self.comment.as_deref() } + + /// Returns the stable column id, or [`UNKNOWN_COLUMN_ID`] when the + /// id has not yet been assigned by a [`SchemaBuilder`]. + pub fn id(&self) -> i32 { + self.id + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -152,6 +169,31 @@ impl Schema { } } +/// A schema together with its server-assigned version id. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SchemaInfo { + schema: Schema, + schema_id: i32, +} + +impl SchemaInfo { + pub fn new(schema: Schema, schema_id: i32) -> Self { + Self { schema, schema_id } + } + + pub fn schema(&self) -> &Schema { + &self.schema + } + + pub fn schema_id(&self) -> i32 { + self.schema_id + } + + pub fn into_parts(self) -> (Schema, i32) { + (self.schema, self.schema_id) + } +} + #[derive(Debug, Default)] pub struct SchemaBuilder { columns: Vec, @@ -236,6 +278,7 @@ impl SchemaBuilder { pub fn build(&self) -> Result { let columns = Self::normalize_columns(&self.columns, self.primary_key.as_ref())?; + let columns = Self::assign_column_ids(columns)?; let column_names: HashSet<_> = columns.iter().map(|c| &c.name).collect(); for auto_inc_col in &self.auto_increment_col_names { @@ -265,6 +308,42 @@ impl SchemaBuilder { }) } + /// All-or-none: preserve ids if every column has one, auto-assign + /// 0..N-1 if none do, error on mixed input. When preserving ids, + /// also reject duplicates and negative-but-not-sentinel values. + fn assign_column_ids(columns: Vec) -> Result> { + let with_id = columns.iter().filter(|c| c.id != UNKNOWN_COLUMN_ID).count(); + if with_id == 0 { + return Ok(columns + .into_iter() + .enumerate() + .map(|(i, c)| c.with_id(i as i32)) + .collect()); + } + if with_id != columns.len() { + return Err(IllegalArgument { + message: "All columns must have an id assigned, or none of them must.".to_string(), + }); + } + let mut seen: HashSet = HashSet::with_capacity(columns.len()); + for col in &columns { + if col.id < 0 { + return Err(IllegalArgument { + message: format!( + "Column '{}' has invalid id {}; ids must be non-negative", + col.name, col.id + ), + }); + } + if !seen.insert(col.id) { + return Err(IllegalArgument { + message: format!("Duplicate column id {} in schema", col.id), + }); + } + } + Ok(columns) + } + fn normalize_columns( columns: &[Column], primary_key: Option<&PrimaryKey>, @@ -662,7 +741,7 @@ impl LogFormat { } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, EnumString)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, EnumString)] pub enum KvFormat { INDEXED, COMPACTED, diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index f32d6b24..a544906b 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -184,6 +184,18 @@ message GetTableInfoResponse { required int64 modified_time = 5; } +// get table schema request and response. Mirrors the Java RPC at api key 1011. +// Omitting `schema_id` requests the latest schema. +message GetTableSchemaRequest { + required PbTablePath table_path = 1; + optional int32 schema_id = 2; +} + +message GetTableSchemaResponse { + required int32 schema_id = 1; + required bytes schema_json = 2; +} + message ListTablesRequest { required string database_name = 1; } diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs b/crates/fluss/src/record/kv/kv_record_read_context.rs index 77cdbcb1..4200e044 100644 --- a/crates/fluss/src/record/kv/kv_record_read_context.rs +++ b/crates/fluss/src/record/kv/kv_record_read_context.rs @@ -88,7 +88,7 @@ impl ReadContext for KvRecordReadContext { let row_type = schema.row_type().clone(); // Create decoder outside lock - let decoder = RowDecoderFactory::create(self.kv_format.clone(), row_type)?; + let decoder = RowDecoderFactory::create(self.kv_format, row_type)?; // Second check: insert only if another thread didn't beat us to it { diff --git a/crates/fluss/src/record/kv/test_util.rs b/crates/fluss/src/record/kv/test_util.rs index 50ab911d..54eaac8f 100644 --- a/crates/fluss/src/record/kv/test_util.rs +++ b/crates/fluss/src/record/kv/test_util.rs @@ -45,6 +45,6 @@ impl ReadContext for TestReadContext { fn get_row_decoder(&self, _schema_id: i16) -> Result> { // Directly create decoder from data types - no Schema needed! let row_type = RowType::with_data_types(self.data_types.clone()); - RowDecoderFactory::create(self.kv_format.clone(), row_type) + RowDecoderFactory::create(self.kv_format, row_type) } } diff --git a/crates/fluss/src/row/fixed_schema_decoder.rs b/crates/fluss/src/row/fixed_schema_decoder.rs new file mode 100644 index 00000000..eec83f3d --- /dev/null +++ b/crates/fluss/src/row/fixed_schema_decoder.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Decode a `[schema_id (2 bytes) | row]` value into an [`InternalRow`] +//! conforming to a fixed target schema, projecting across schema +//! versions when needed. + +use crate::error::{Error, Result}; +use crate::metadata::{KvFormat, Schema, index_mapping}; +use crate::record::kv::SCHEMA_ID_LENGTH; +use crate::row::{LookupRow, ProjectedRow, RowDecoder, RowDecoderFactory}; +use std::sync::Arc; + +pub(crate) struct FixedSchemaDecoder { + row_decoder: Arc, + index_mapping: Option>, +} + +impl FixedSchemaDecoder { + pub fn new_no_projection(kv_format: KvFormat, schema: &Schema) -> Result { + let row_decoder = RowDecoderFactory::create(kv_format, schema.row_type().clone())?; + Ok(Self { + row_decoder, + index_mapping: None, + }) + } + + pub fn new( + kv_format: KvFormat, + source_schema: &Schema, + target_schema: &Schema, + ) -> Result { + let mapping = index_mapping(source_schema, target_schema)?; + let row_decoder = RowDecoderFactory::create(kv_format, source_schema.row_type().clone())?; + Ok(Self { + row_decoder, + index_mapping: Some(Arc::from(mapping.into_boxed_slice())), + }) + } + + pub fn decode<'a>(&self, value_bytes: &'a [u8]) -> Result> { + let payload = + value_bytes + .get(SCHEMA_ID_LENGTH..) + .ok_or_else(|| Error::RowConvertError { + message: format!( + "Row payload too short: {} bytes, need at least {} for schema id", + value_bytes.len(), + SCHEMA_ID_LENGTH + ), + })?; + let row = self.row_decoder.decode(payload); + match &self.index_mapping { + None => Ok(LookupRow::raw(row)), + Some(mapping) => Ok(LookupRow::projected(ProjectedRow::new( + row, + Arc::clone(mapping), + ))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{Column, DataTypes, Schema}; + use crate::record::kv::SCHEMA_ID_LENGTH; + use crate::row::InternalRow; + use crate::row::binary::BinaryWriter; + use crate::row::compacted::CompactedRowWriter; + + fn schema_with_ids(columns: &[(i32, &str, crate::metadata::DataType)]) -> Schema { + let cols: Vec = columns + .iter() + .map(|(id, name, dt)| Column::new(*name, dt.clone()).with_id(*id)) + .collect(); + Schema::builder().with_columns(cols).build().unwrap() + } + + fn write_value(schema_id: i16, writer: CompactedRowWriter) -> Vec { + let row_bytes = writer.to_bytes(); + let mut out = Vec::with_capacity(SCHEMA_ID_LENGTH + row_bytes.len()); + out.extend_from_slice(&schema_id.to_le_bytes()); + out.extend_from_slice(row_bytes.as_ref()); + out + } + + #[test] + fn decode_no_projection_strips_schema_id_and_returns_row() { + let schema = schema_with_ids(&[(0, "a", DataTypes::int()), (1, "b", DataTypes::string())]); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + + let mut writer = CompactedRowWriter::new(2); + writer.write_int(42); + writer.write_string("hi"); + let value = write_value(7, writer); + + let row = decoder.decode(&value).unwrap(); + assert_eq!(row.get_field_count(), 2); + assert_eq!(row.get_int(0).unwrap(), 42); + assert_eq!(row.get_string(1).unwrap(), "hi"); + } + + #[test] + fn decode_with_projection_pads_missing_field_with_null() { + // Source schema (older): [a:int, b:string] + let source = schema_with_ids(&[(0, "a", DataTypes::int()), (1, "b", DataTypes::string())]); + // Target schema (newer): added column c at id=2 + let target = schema_with_ids(&[ + (0, "a", DataTypes::int()), + (1, "b", DataTypes::string()), + (2, "c", DataTypes::bigint()), + ]); + let decoder = FixedSchemaDecoder::new(KvFormat::COMPACTED, &source, &target).unwrap(); + + let mut writer = CompactedRowWriter::new(2); + writer.write_int(7); + writer.write_string("seven"); + let value = write_value(0, writer); + + let row = decoder.decode(&value).unwrap(); + assert_eq!(row.get_field_count(), 3); + assert_eq!(row.get_int(0).unwrap(), 7); + assert_eq!(row.get_string(1).unwrap(), "seven"); + assert!( + row.is_null_at(2).unwrap(), + "added-but-missing column must read as null" + ); + } + + #[test] + fn decode_with_projection_drops_removed_field() { + // Source schema (older): [a, b, c] + let source = schema_with_ids(&[ + (0, "a", DataTypes::int()), + (1, "b", DataTypes::string()), + (2, "c", DataTypes::bigint()), + ]); + // Target schema (newer): dropped b + let target = schema_with_ids(&[(0, "a", DataTypes::int()), (2, "c", DataTypes::bigint())]); + let decoder = FixedSchemaDecoder::new(KvFormat::COMPACTED, &source, &target).unwrap(); + + let mut writer = CompactedRowWriter::new(3); + writer.write_int(1); + writer.write_string("dropped"); + writer.write_long(99); + let value = write_value(0, writer); + + let row = decoder.decode(&value).unwrap(); + assert_eq!(row.get_field_count(), 2); + assert_eq!(row.get_int(0).unwrap(), 1); + assert_eq!(row.get_long(1).unwrap(), 99); + } + + #[test] + fn decode_with_projection_reorders_fields() { + let source = schema_with_ids(&[(0, "a", DataTypes::int()), (1, "b", DataTypes::string())]); + // Target reorders: b first, then a. + let target = schema_with_ids(&[(1, "b", DataTypes::string()), (0, "a", DataTypes::int())]); + let decoder = FixedSchemaDecoder::new(KvFormat::COMPACTED, &source, &target).unwrap(); + + let mut writer = CompactedRowWriter::new(2); + writer.write_int(123); + writer.write_string("xyz"); + let value = write_value(0, writer); + + let row = decoder.decode(&value).unwrap(); + assert_eq!(row.get_string(0).unwrap(), "xyz"); + assert_eq!(row.get_int(1).unwrap(), 123); + } + + #[test] + fn decode_payload_too_short_errors() { + let schema = schema_with_ids(&[(0, "a", DataTypes::int())]); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + // Only 1 byte — short of the schema id. + match decoder.decode(&[0u8]) { + Ok(_) => panic!("expected error for short payload"), + Err(e) => assert!(e.to_string().contains("too short"), "got: {e}"), + } + } +} diff --git a/crates/fluss/src/row/lookup_row.rs b/crates/fluss/src/row/lookup_row.rs new file mode 100644 index 00000000..c002369d --- /dev/null +++ b/crates/fluss/src/row/lookup_row.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Return type of [`crate::client::table::LookupResult`] getters: a row +//! decoded under the table's current schema, possibly via projection +//! over an older schema's bytes. + +use crate::client::WriteFormat; +use crate::error::Result; +use crate::row::compacted::CompactedRow; +use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; +use crate::row::projected_row::ProjectedRow; +use crate::row::{Decimal, FlussArray, InternalRow}; + +pub struct LookupRow<'a> { + inner: Inner<'a>, +} + +enum Inner<'a> { + Raw(CompactedRow<'a>), + Projected(ProjectedRow>), +} + +impl<'a> LookupRow<'a> { + pub(crate) fn raw(row: CompactedRow<'a>) -> Self { + Self { + inner: Inner::Raw(row), + } + } + + pub(crate) fn projected(row: ProjectedRow>) -> Self { + Self { + inner: Inner::Projected(row), + } + } +} + +macro_rules! delegate { + ($self:ident, $method:ident $(, $arg:expr)*) => { + match &$self.inner { + Inner::Raw(r) => r.$method($($arg),*), + Inner::Projected(r) => r.$method($($arg),*), + } + }; +} + +impl<'a> InternalRow for LookupRow<'a> { + fn get_field_count(&self) -> usize { + delegate!(self, get_field_count) + } + fn is_null_at(&self, pos: usize) -> Result { + delegate!(self, is_null_at, pos) + } + fn get_boolean(&self, pos: usize) -> Result { + delegate!(self, get_boolean, pos) + } + fn get_byte(&self, pos: usize) -> Result { + delegate!(self, get_byte, pos) + } + fn get_short(&self, pos: usize) -> Result { + delegate!(self, get_short, pos) + } + fn get_int(&self, pos: usize) -> Result { + delegate!(self, get_int, pos) + } + fn get_long(&self, pos: usize) -> Result { + delegate!(self, get_long, pos) + } + fn get_float(&self, pos: usize) -> Result { + delegate!(self, get_float, pos) + } + fn get_double(&self, pos: usize) -> Result { + delegate!(self, get_double, pos) + } + fn get_char(&self, pos: usize, length: usize) -> Result<&str> { + delegate!(self, get_char, pos, length) + } + fn get_string(&self, pos: usize) -> Result<&str> { + delegate!(self, get_string, pos) + } + fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result { + delegate!(self, get_decimal, pos, precision, scale) + } + fn get_date(&self, pos: usize) -> Result { + delegate!(self, get_date, pos) + } + fn get_time(&self, pos: usize) -> Result