Skip to content

Commit

Permalink
Add streaming support for reading rows from Bigtable (#100)
Browse files Browse the repository at this point in the history
* Add streaming support for read_rows

* Add streaming example
  • Loading branch information
nrempel authored Nov 7, 2024
1 parent c88bc67 commit 719f3af
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 1 deletion.
1 change: 1 addition & 0 deletions bigtable_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ prost-wkt-types = { version = "0.6.0" }
serde = { version = "1.0.192", features = ["derive"] }
serde_with = { version = "3.4.0", features = ["base64"] }
# end of above part
futures-util = "0.3.31"
gcp_auth = "0.12.2"
log = "0.4.20"
thiserror = "1.0.50"
Expand Down
29 changes: 28 additions & 1 deletion bigtable_rs/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
use std::sync::Arc;
use std::time::Duration;

use futures_util::Stream;
use gcp_auth::TokenProvider;
use log::info;
use thiserror::Error;
Expand All @@ -98,7 +99,7 @@ use tonic::{codec::Streaming, transport::Channel, transport::ClientTlsConfig, Re
use tower::ServiceBuilder;

use crate::auth_service::AuthSvc;
use crate::bigtable::read_rows::decode_read_rows_response;
use crate::bigtable::read_rows::{decode_read_rows_response, decode_read_rows_response_stream};
use crate::google::bigtable::v2::{
bigtable_client::BigtableClient, MutateRowRequest, MutateRowResponse, MutateRowsRequest,
MutateRowsResponse, ReadRowsRequest, RowSet, SampleRowKeysRequest, SampleRowKeysResponse,
Expand Down Expand Up @@ -493,6 +494,32 @@ impl BigTable {
decode_read_rows_response(self.timeout.as_ref(), response).await
}

/// Streaming support for `read_rows` method
pub async fn stream_rows(
&mut self,
request: ReadRowsRequest,
) -> Result<impl Stream<Item = Result<(RowKey, Vec<RowCell>)>>> {
let response = self.client.read_rows(request).await?.into_inner();
let stream = decode_read_rows_response_stream(response).await;
Ok(stream)
}

/// Streaming support for `read_rows_with_prefix` method
pub async fn stream_rows_with_prefix(
&mut self,
mut request: ReadRowsRequest,
prefix: Vec<u8>,
) -> Result<impl Stream<Item = Result<(RowKey, Vec<RowCell>)>>> {
let row_range = get_row_range_from_prefix(prefix);
request.rows = Some(RowSet {
row_keys: vec![],
row_ranges: vec![row_range],
});
let response = self.client.read_rows(request).await?.into_inner();
let stream = decode_read_rows_response_stream(response).await;
Ok(stream)
}

/// Wrapped `sample_row_keys` method
pub async fn sample_row_keys(
&mut self,
Expand Down
15 changes: 15 additions & 0 deletions bigtable_rs/src/bigtable/read_rows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::bigtable::{Error, Result, RowCell, RowKey};
use crate::google::bigtable::v2::read_rows_response::cell_chunk::RowStatus;
use crate::google::bigtable::v2::read_rows_response::CellChunk;
use crate::google::bigtable::v2::ReadRowsResponse;
use futures_util::stream::iter;
use futures_util::{Stream, StreamExt};
use log::trace;
use std::collections::HashSet;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -33,6 +35,19 @@ pub async fn decode_read_rows_response(
Ok(rows)
}

/// Flatten and decode the stream of `ReadRowsResponse` into a stream of `Result<(RowKey, Vec<RowCell>)>>`.
pub async fn decode_read_rows_response_stream(
rrr: Streaming<ReadRowsResponse>,
) -> impl Stream<Item = Result<(RowKey, Vec<RowCell>)>> {
rrr.flat_map(|message| match message {
Ok(response) => {
let results = decode_read_rows_response_to_vec(response.chunks);
iter(results)
}
Err(e) => iter(vec![Err(Error::RpcError(e))]),
})
}

pub fn decode_read_rows_response_to_vec(
chunks: Vec<CellChunk>,
) -> Vec<Result<(RowKey, Vec<RowCell>)>> {
Expand Down
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ path = "src/prefix.rs"
name = "custom_query"
path = "src/custom_query.rs"

[[bin]]
name = "stream"
path = "src/stream.rs"

[dependencies]
bigtable_rs = { path = "../bigtable_rs" }
tokio = { version = "1.34.0", features = ["rt-multi-thread"] }
env_logger = "0.11.1"
futures-util = "0.3.31"
log = "0.4.20"
warp = "0.3.6"
serde = { version = "1.0", features = ["derive"] }
Expand Down
82 changes: 82 additions & 0 deletions examples/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use bigtable_rs::bigtable;
use bigtable_rs::google::bigtable::v2::row_filter::{Chain, Filter};
use bigtable_rs::google::bigtable::v2::row_range::{EndKey, StartKey};
use bigtable_rs::google::bigtable::v2::{ReadRowsRequest, RowFilter, RowRange, RowSet};
use env_logger;
use futures_util::TryStreamExt;
use std::error::Error;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let project_id = "project-1";
let instance_name = "instance-1";
let table_name = "table-1";
let channel_size = 4;
let timeout = Duration::from_secs(10);

let key_start: String = "key1".to_owned();
let key_end: String = "key4".to_owned();

// make a bigtable client
let connection = bigtable::BigTableConnection::new(
project_id,
instance_name,
true,
channel_size,
Some(timeout),
)
.await?;
let mut bigtable = connection.client();

// prepare a ReadRowsRequest
let request = ReadRowsRequest {
app_profile_id: "default".to_owned(),
table_name: bigtable.get_full_table_name(table_name),
rows_limit: 10,
rows: Some(RowSet {
row_keys: vec![], // use this field to put keys for reading specific rows
row_ranges: vec![RowRange {
start_key: Some(StartKey::StartKeyClosed(key_start.into_bytes())),
end_key: Some(EndKey::EndKeyOpen(key_end.into_bytes())),
}],
}),
filter: Some(RowFilter {
filter: Some(Filter::Chain(Chain {
filters: vec![
RowFilter {
filter: Some(Filter::FamilyNameRegexFilter("cf1".to_owned())),
},
RowFilter {
filter: Some(Filter::ColumnQualifierRegexFilter("c1".as_bytes().to_vec())),
},
RowFilter {
filter: Some(Filter::CellsPerColumnLimitFilter(2)),
},
],
})),
}),
..ReadRowsRequest::default()
};

// calling bigtable API to get streaming results
let mut stream = bigtable.stream_rows(request).await?;

// process the stream of results
while let Some((key, data)) = stream.try_next().await? {
println!("------------\n{}", String::from_utf8(key.clone()).unwrap());
data.into_iter().for_each(|row_cell| {
println!(
" [{}:{}] \"{}\" @ {}",
row_cell.family_name,
String::from_utf8(row_cell.qualifier).unwrap(),
String::from_utf8(row_cell.value).unwrap(),
row_cell.timestamp_micros
)
});
}

Ok(())
}

0 comments on commit 719f3af

Please sign in to comment.