Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,43 @@ class TableUpsert:
def __repr__(self) -> str: ...

class TableLookup:
"""Builder for creating a Lookuper.
"""Builder for creating a Lookuper or PrefixLookuper.

Obtain via `FlussTable.new_lookup()`, then call `create_lookuper()`.
Obtain via `FlussTable.new_lookup()`, then call `create_lookuper()`
for primary key lookup, or `lookup_by(columns).create_lookuper()`
for prefix key lookup.

Example:
lookuper = table.new_lookup().create_lookuper()
prefix_lookuper = table.new_lookup().lookup_by(["a", "b"]).create_lookuper()
"""

def create_lookuper(self) -> Lookuper: ...
def lookup_by(self, column_names: List[str]) -> "TablePrefixLookup":
"""Switch to prefix-scan mode for the given lookup columns.

The columns must be the table's partition keys (if any) plus the
bucket keys, in that order.

Args:
column_names: List of column names forming the prefix key.

Returns:
TablePrefixLookup builder. Call `create_lookuper()` to get a PrefixLookuper.
"""
...
def __repr__(self) -> str: ...

class TablePrefixLookup:
"""Builder for creating a PrefixLookuper.

Obtain via `TableLookup.lookup_by(columns)`, then call `create_lookuper()`.

Example:
prefix_lookuper = table.new_lookup().lookup_by(["a", "b"]).create_lookuper()
"""

def create_lookuper(self) -> "PrefixLookuper": ...
def __repr__(self) -> str: ...

class AppendWriter:
Expand Down Expand Up @@ -721,6 +749,29 @@ class Lookuper:
...
def __repr__(self) -> str: ...

class PrefixLookuper:
"""Lookuper for performing prefix key lookups on a Fluss table.

Returns all rows whose primary key starts with the given prefix.
Create via `table.new_lookup().lookup_by(columns).create_lookuper()`.
"""

async def lookup(self, prefix: dict | list | tuple) -> List[Dict[str, object]]:
"""Lookup all rows matching a prefix key.

Args:
prefix: A dict, list, or tuple containing only the prefix key values
(the columns specified in lookup_by()).
For dict: keys are prefix column names.
For list/tuple: values in prefix column order.

Returns:
A list of dicts, each containing the full row data.
Empty list if no matches.
"""
...
def __repr__(self) -> str: ...

class LogScanner:
"""Scanner for reading log data from a Fluss table.

Expand Down
2 changes: 2 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<TableAppend>()?;
m.add_class::<TableUpsert>()?;
m.add_class::<TableLookup>()?;
m.add_class::<TablePrefixLookup>()?;
m.add_class::<AppendWriter>()?;
m.add_class::<UpsertWriter>()?;
m.add_class::<Lookuper>()?;
m.add_class::<PrefixLookuper>()?;
m.add_class::<Schema>()?;
m.add_class::<LogScanner>()?;
m.add_class::<LakeSnapshot>()?;
Expand Down
99 changes: 99 additions & 0 deletions bindings/python/src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,102 @@ impl Lookuper {
})
}
}

/// Lookuper for performing prefix key lookups on a Fluss table.
///
/// Returns all rows whose primary key starts with the given prefix.
/// Create once via `table.new_lookup().lookup_by(columns).create_lookuper()`
/// and reuse for multiple lookups.
#[pyclass]
pub struct PrefixLookuper {
inner: Arc<Mutex<fcore::client::PrefixKeyLookuper>>,
table_info: Arc<fcore::metadata::TableInfo>,
lookup_column_indices: Vec<usize>,
}

#[pymethods]
impl PrefixLookuper {
/// Lookup all rows matching a prefix key.
///
/// Args:
/// prefix: A dict, list, or tuple containing only the prefix key values
/// (the columns specified in lookup_by()).
/// For dict: keys are prefix column names.
/// For list/tuple: values in prefix column order.
///
/// Returns:
/// A list of dicts, each containing the full row data. Empty list if no matches.
pub fn lookup<'py>(
&self,
py: Python<'py>,
prefix: &Bound<'_, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
let generic_row =
python_to_dense_generic_row(prefix, &self.table_info, &self.lookup_column_indices)?;
let inner = self.inner.clone();
let table_info = self.table_info.clone();

future_into_py(py, async move {
let result = {
let mut lookuper = inner.lock().await;
lookuper
.lookup(&generic_row)
.await
.map_err(|e| FlussError::from_core_error(&e))?
};

let rows = result
.get_rows()
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| {
let py_rows: Vec<Py<PyAny>> = rows
.iter()
.map(|row| internal_row_to_dict(py, row, &table_info))
.collect::<PyResult<_>>()?;
Ok(py_rows)
})
})
}

fn __repr__(&self) -> String {
"PrefixLookuper()".to_string()
}
}

impl PrefixLookuper {
pub fn new(
connection: &Arc<fcore::client::FlussConnection>,
metadata: Arc<fcore::client::Metadata>,
table_info: fcore::metadata::TableInfo,
lookup_column_names: Vec<String>,
) -> PyResult<Self> {
let row_type = table_info.row_type();
let lookup_column_indices: Vec<usize> = lookup_column_names
.iter()
.map(|name| {
row_type.get_field_index(name).ok_or_else(|| {
FlussError::new_err(format!("Unknown column name '{name}' for prefix lookup"))
})
})
.collect::<PyResult<_>>()?;

let lookuper = TOKIO_RUNTIME.block_on(async {
let fluss_table =
fcore::client::FlussTable::new(connection, metadata, table_info.clone());
let table_lookup = fluss_table
.new_lookup()
.map_err(|e| FlussError::from_core_error(&e))?;
table_lookup
.lookup_by(lookup_column_names)
.create_lookuper()
.map_err(|e| FlussError::from_core_error(&e))
})?;

Ok(Self {
inner: Arc::new(Mutex::new(lookuper)),
table_info: Arc::new(table_info),
lookup_column_indices,
})
}
}
47 changes: 47 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,58 @@ impl TableLookup {
)
}

/// Switch to prefix-scan mode for the given lookup columns.
///
/// The columns must be the table's partition keys (if any) plus the
/// bucket keys, in that order.
///
/// Args:
/// column_names: List of column names forming the prefix key.
///
/// Returns:
/// TablePrefixLookup builder. Call `create_lookuper()` to get a PrefixLookuper.
pub fn lookup_by(&self, column_names: Vec<String>) -> TablePrefixLookup {
TablePrefixLookup {
connection: self.connection.clone(),
metadata: self.metadata.clone(),
table_info: self.table_info.clone(),
lookup_column_names: column_names,
}
}

fn __repr__(&self) -> String {
"TableLookup()".to_string()
}
}

/// Builder for creating a PrefixLookuper.
///
/// Obtain via `TableLookup.lookup_by(columns)`, then call `create_lookuper()`.
#[pyclass]
pub struct TablePrefixLookup {
connection: Arc<fcore::client::FlussConnection>,
metadata: Arc<fcore::client::Metadata>,
table_info: fcore::metadata::TableInfo,
lookup_column_names: Vec<String>,
}

#[pymethods]
impl TablePrefixLookup {
/// Create a PrefixLookuper from this builder.
pub fn create_lookuper(&self) -> PyResult<crate::PrefixLookuper> {
crate::PrefixLookuper::new(
&self.connection,
self.metadata.clone(),
self.table_info.clone(),
self.lookup_column_names.clone(),
)
}

fn __repr__(&self) -> String {
"TablePrefixLookup()".to_string()
}
}

/// Writer for appending data to a Fluss table
#[pyclass]
pub struct AppendWriter {
Expand Down
Loading
Loading