Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> {
}));
Ok(conn)
}
Err(e) => Err(format!("Failed to connect: {}", e)),
Err(e) => Err(format!("Failed to connect: {e}")),
}
}

Expand All @@ -264,7 +264,7 @@ impl Connection {
let admin = Box::into_raw(Box::new(Admin { inner: admin }));
Ok(admin)
}
Err(e) => Err(format!("Failed to get admin: {}", e)),
Err(e) => Err(format!("Failed to get admin: {e}")),
}
}

Expand All @@ -287,7 +287,7 @@ impl Connection {
}));
Ok(table)
}
Err(e) => Err(format!("Failed to get table: {}", e)),
Err(e) => Err(format!("Failed to get table: {e}")),
}
}
}
Expand Down Expand Up @@ -398,7 +398,7 @@ impl Table {

let table_append = match fluss_table.new_append() {
Ok(a) => a,
Err(e) => return Err(format!("Failed to create append: {}", e)),
Err(e) => return Err(format!("Failed to create append: {e}")),
};

let writer = table_append.create_writer();
Expand All @@ -413,7 +413,10 @@ impl Table {
self.table_info.clone(),
);

let scanner = fluss_table.new_scan().create_log_scanner();
let scanner = match fluss_table.new_scan().create_log_scanner() {
Ok(a) => a,
Err(e) => return Err(format!("Failed to create log scanner: {e}")),
};
let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
Ok(scanner)
}
Expand All @@ -431,9 +434,12 @@ impl Table {
let scan = fluss_table.new_scan();
let scan = match scan.project(&column_indices) {
Ok(s) => s,
Err(e) => return Err(format!("Failed to project columns: {}", e)),
Err(e) => return Err(format!("Failed to project columns: {e}")),
};
let scanner = match scan.create_log_scanner() {
Ok(a) => a,
Err(e) => return Err(format!("Failed to create log scanner: {e}")),
};
let scanner = scan.create_log_scanner();
let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
Ok(scanner)
}
Expand Down
15 changes: 4 additions & 11 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()),
_ => Err(anyhow!("Unknown data type: {}", dt)),
_ => Err(anyhow!("Unknown data type: {dt}")),
}
}

Expand Down Expand Up @@ -423,10 +423,7 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> {
datum.i32_val = array.value(row_id);
datum
}
_ => panic!(
"Will never come here. Unsupported Time32 unit for column {}",
i
),
_ => panic!("Will never come here. Unsupported Time32 unit for column {i}"),
},
ArrowDataType::Time64(unit) => match unit {
TimeUnit::Microsecond => {
Expand All @@ -449,14 +446,10 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> {
datum.i64_val = array.value(row_id);
datum
}
_ => panic!(
"Will never come here. Unsupported Time64 unit for column {}",
i
),
_ => panic!("Will never come here. Unsupported Time64 unit for column {i}"),
},
other => panic!(
"Will never come here. Unsupported Arrow data type for column {}: {:?}",
i, other
"Will never come here. Unsupported Arrow data type for column {i}: {other:?}"
),
};

Expand Down
6 changes: 5 additions & 1 deletion bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ impl FlussTable {

let table_scan = fluss_table.new_scan();

let rust_scanner = table_scan.create_log_scanner();
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to create log scanner: {e:?}"
))
})?;

let admin = conn
.get_admin()
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn main() -> Result<()> {
try_join!(f1, f2, append_writer.flush())?;

// scan rows
let log_scanner = table.new_scan().create_log_scanner();
let log_scanner = table.new_scan().create_log_scanner()?;
log_scanner.subscribe(0, 0).await?;

loop {
Expand Down
17 changes: 13 additions & 4 deletions crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ version = { workspace = true }
name = "fluss"
build = "src/build.rs"

[features]
default = ["storage-memory", "storage-fs"]
storage-all = ["storage-memory", "storage-fs"]

storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
integration_tests = []

[dependencies]
arrow = { workspace = true }
arrow-schema = "57.0.0"
Expand All @@ -45,16 +53,17 @@ ordered-float = { version = "4", features = ["serde"] }
parse-display = "0.10"
ref-cast = "1.0"
chrono = { workspace = true }
oneshot = "0.1.11"
opendal = "0.53.3"
url = "2.5.7"
async-trait = "0.1.89"
uuid = { version = "1.10", features = ["v4"] }
tempfile= "3.23.0"

[dev-dependencies]
testcontainers = "0.25.0"
once_cell = "1.19"
test-env-helpers = "0.2.2"

[features]
integration_tests = []


[build-dependencies]
prost-build = { version = "0.13.5" }
1 change: 1 addition & 0 deletions crates/fluss/src/client/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;

mod append;

mod remote_log;
mod scanner;
mod writer;

Expand Down
Loading
Loading