Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ fn python_to_generic_row(
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!(
"Row must be a dict, list, or tuple; got {}",
type_name
"Row must be a dict, list, or tuple; got {type_name}"
))
})?;
let schema = table_info.row_type();
Expand All @@ -357,7 +356,7 @@ fn python_to_generic_row(
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!("Row dict keys must be strings; got {}", key_type))
FlussError::new_err(format!("Row dict keys must be strings; got {key_type}"))
})?;

if fields.iter().all(|f| f.name() != key_str) {
Expand All @@ -367,8 +366,7 @@ fn python_to_generic_row(
.collect::<Vec<_>>()
.join(", ");
return Err(FlussError::new_err(format!(
"Unknown field '{}'. Expected fields: {}",
key_str, expected
"Unknown field '{key_str}'. Expected fields: {expected}"
)));
}
}
Expand Down Expand Up @@ -476,8 +474,7 @@ fn python_value_to_datum(
}
}
_ => Err(FlussError::new_err(format!(
"Unsupported data type for row-level operations: {:?}",
data_type
"Unsupported data type for row-level operations: {data_type}"
))),
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/fluss/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::io::Result;

fn main() -> Result<()> {
let mut config = prost_build::Config::new();
config.bytes([".proto.PbProduceLogReqForBucket.records"]);
config.bytes([
".proto.PbProduceLogReqForBucket.records",
".proto.PbPutKvReqForBucket.records",
]);
config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?;
Ok(())
}
11 changes: 9 additions & 2 deletions crates/fluss/src/client/table/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,32 @@ impl TableAppend {
AppendWriter {
table_path: Arc::new(self.table_path.clone()),
writer_client: self.writer_client.clone(),
table_info: Arc::new(self.table_info.clone()),
}
}
}

pub struct AppendWriter {
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
table_info: Arc<TableInfo>,
}

impl AppendWriter {
pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
let record = WriteRecord::new(self.table_path.clone(), row);
let record =
WriteRecord::for_append(self.table_path.clone(), self.table_info.schema_id, row);
let result_handle = self.writer_client.send(&record).await?;
let result = result_handle.wait().await?;
result_handle.result(result)
}

pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
let record = WriteRecord::new_record_batch(self.table_path.clone(), batch);
let record = WriteRecord::for_append_record_batch(
self.table_path.clone(),
self.table_info.schema_id,
batch,
);
let result_handle = self.writer_client.send(&record).await?;
let result = result_handle.wait().await?;
result_handle.result(result)
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ mod tests {
let mut row = GenericRow::new();
row.set_field(0, 1_i32);
row.set_field(1, "alice");
let record = WriteRecord::new(table_path, row);
let record = WriteRecord::for_append(table_path, 1, row);
builder.append(&record)?;

let data = builder.build()?;
Expand Down
3 changes: 2 additions & 1 deletion crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1446,8 +1446,9 @@ mod tests {
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
);
let record = WriteRecord::new(
let record = WriteRecord::for_append(
table_path,
1,
GenericRow {
values: vec![Datum::Int32(1)],
},
Expand Down
5 changes: 4 additions & 1 deletion crates/fluss/src/client/table/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct AbstractTableWriter {
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
field_count: i32,
schema_id: i32,
}

#[allow(dead_code)]
Expand All @@ -57,6 +58,7 @@ impl AbstractTableWriter {
table_path: Arc::new(table_path),
writer_client,
field_count: table_info.row_type().fields().len() as i32,
schema_id: table_info.schema_id,
}
}

Expand All @@ -82,7 +84,8 @@ pub struct AppendWriterImpl {
#[allow(dead_code)]
impl AppendWriterImpl {
pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
let record = WriteRecord::new(self.base.table_path.clone(), row);
let record =
WriteRecord::for_append(self.base.table_path.clone(), self.base.schema_id, row);
self.base.send(&record).await
}
}
7 changes: 4 additions & 3 deletions crates/fluss/src/client/write/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::client::write::batch::WriteBatch::ArrowLog;
use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
use crate::client::{Record, ResultHandle, WriteRecord};
use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
use crate::error::Result;
Expand Down Expand Up @@ -110,7 +110,7 @@ impl RecordAccumulator {
row_type,
bucket_id,
current_time_ms(),
matches!(record.row, Record::RecordBatch(_)),
matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))),
));

let batch_id = batch.batch_id();
Expand Down Expand Up @@ -541,8 +541,9 @@ mod tests {
let accumulator = RecordAccumulator::new(config);
let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string()));
let cluster = Arc::new(build_cluster(table_path.as_ref(), 1, 1));
let record = WriteRecord::new(
let record = WriteRecord::for_append(
table_path.clone(),
1,
GenericRow {
values: vec![Datum::Int32(1)],
},
Expand Down
133 changes: 116 additions & 17 deletions crates/fluss/src/client/write/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

use crate::BucketId;
use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
use crate::client::{ResultHandle, WriteRecord};
use crate::client::{Record, ResultHandle, WriteRecord};
use crate::compression::ArrowCompressionInfo;
use crate::error::Result;
use crate::metadata::{DataType, TablePath};
use crate::error::{Error, Result};
use crate::metadata::{DataType, KvFormat, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
use crate::record::kv::KvRecordBatchBuilder;
use bytes::Bytes;
use parking_lot::Mutex;
use std::cmp::max;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};

Expand Down Expand Up @@ -92,30 +92,42 @@ impl InnerWriteBatch {

pub enum WriteBatch {
ArrowLog(ArrowLogWriteBatch),
Kv(KvWriteBatch),
}

impl WriteBatch {
pub fn inner_batch(&self) -> &InnerWriteBatch {
match self {
WriteBatch::ArrowLog(batch) => &batch.write_batch,
WriteBatch::Kv(batch) => &batch.write_batch,
}
}

pub fn inner_batch_mut(&mut self) -> &mut InnerWriteBatch {
match self {
WriteBatch::ArrowLog(batch) => &mut batch.write_batch,
WriteBatch::Kv(batch) => &mut batch.write_batch,
}
}

pub fn try_append(&mut self, write_record: &WriteRecord) -> Result<Option<ResultHandle>> {
match self {
WriteBatch::ArrowLog(batch) => batch.try_append(write_record),
WriteBatch::Kv(batch) => batch.try_append(write_record),
}
}

pub fn waited_time_ms(&self, now: i64) -> i64 {
self.inner_batch().waited_time_ms(now)
}

pub fn close(&mut self) {
pub fn close(&mut self) -> Result<()> {
match self {
WriteBatch::ArrowLog(batch) => {
batch.close();
Ok(())
}
WriteBatch::Kv(batch) => batch.close(),
}
}

Expand All @@ -127,20 +139,18 @@ impl WriteBatch {
pub fn is_closed(&self) -> bool {
match self {
WriteBatch::ArrowLog(batch) => batch.is_closed(),
WriteBatch::Kv(batch) => batch.is_closed(),
}
}

pub fn drained(&mut self, now_ms: i64) {
match self {
WriteBatch::ArrowLog(batch) => {
batch.write_batch.drained(now_ms);
}
}
self.inner_batch_mut().drained(now_ms);
}

pub fn build(&self) -> Result<Bytes> {
pub fn build(&mut self) -> Result<Bytes> {
match self {
WriteBatch::ArrowLog(batch) => batch.build(),
WriteBatch::Kv(batch) => batch.build(),
}
}

Expand Down Expand Up @@ -172,7 +182,7 @@ impl WriteBatch {
pub struct ArrowLogWriteBatch {
pub write_batch: InnerWriteBatch,
pub arrow_builder: MemoryLogRecordsArrowBuilder,
built_records: Mutex<Option<Bytes>>,
built_records: Option<Bytes>,
}

impl ArrowLogWriteBatch {
Expand All @@ -196,7 +206,7 @@ impl ArrowLogWriteBatch {
to_append_record_batch,
arrow_compression_info,
),
built_records: Mutex::new(None),
built_records: None,
}
}

Expand All @@ -218,13 +228,12 @@ impl ArrowLogWriteBatch {
}
}

pub fn build(&self) -> Result<Bytes> {
let mut cached = self.built_records.lock();
if let Some(bytes) = cached.as_ref() {
pub fn build(&mut self) -> Result<Bytes> {
if let Some(bytes) = &self.built_records {
return Ok(bytes.clone());
}
let bytes = Bytes::from(self.arrow_builder.build()?);
*cached = Some(bytes.clone());
self.built_records = Some(bytes.clone());
Ok(bytes)
}

Expand All @@ -237,6 +246,96 @@ impl ArrowLogWriteBatch {
}
}

pub struct KvWriteBatch {
write_batch: InnerWriteBatch,
kv_batch_builder: KvRecordBatchBuilder,
target_columns: Option<Vec<usize>>,
schema_id: i32,
}

impl KvWriteBatch {
#[allow(clippy::too_many_arguments)]
pub fn new(
batch_id: i64,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious on why we need to introduce batch_id here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will use batch_id to lookup KvWriteBatch by batch_id in WriterClient

table_path: TablePath,
schema_id: i32,
write_limit: usize,
kv_format: KvFormat,
bucket_id: BucketId,
target_columns: Option<Vec<usize>>,
create_ms: i64,
) -> Self {
let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id);
Self {
write_batch: base,
kv_batch_builder: KvRecordBatchBuilder::new(schema_id, write_limit, kv_format),
target_columns,
schema_id,
}
}

pub fn try_append(&mut self, write_record: &WriteRecord) -> Result<Option<ResultHandle>> {
let kv_write_record = match &write_record.record {
Record::Kv(record) => record,
_ => {
return Err(Error::UnsupportedOperation {
message: "Only KvRecord to append to KvWriteBatch ".to_string(),
});
}
};

let key = kv_write_record.key;

if self.schema_id != write_record.schema_id {
return Err(Error::UnexpectedError {
message: format!(
"schema id {} of the write record to append is not the same as the current schema id {} in the batch.",
write_record.schema_id, self.schema_id
),
source: None,
});
};

if self.target_columns.as_deref() != kv_write_record.target_columns {
return Err(Error::UnexpectedError {
message: format!(
"target columns {:?} of the write record to append are not the same as the current target columns {:?} in the batch.",
kv_write_record.target_columns,
self.target_columns.as_deref()
),
source: None,
});
}

let row = kv_write_record.compacted_row.as_ref();

if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, row) {
Ok(None)
} else {
// append successfully
self.kv_batch_builder
.append_row(key, row)
.map_err(|e| Error::UnexpectedError {
message: "Failed to append row to KvWriteBatch".to_string(),
source: Some(Box::new(e)),
})?;
Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
}
}

pub fn build(&mut self) -> Result<Bytes> {
self.kv_batch_builder.build()
}

pub fn is_closed(&self) -> bool {
self.kv_batch_builder.is_closed()
}

pub fn close(&mut self) -> Result<()> {
self.kv_batch_builder.close()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading