Skip to content

Commit

Permalink
parallel requests to bigtable as well
Browse files Browse the repository at this point in the history
  • Loading branch information
StanChe committed Jan 28, 2025
1 parent a981a7c commit 5a0a266
Showing 1 changed file with 68 additions and 34 deletions.
102 changes: 68 additions & 34 deletions nft_ingester/src/bin/slot_persister/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ async fn process_slots(
args: &Args,
shutdown_token: CancellationToken,
) {
// Process slots in batches
// Process slots in "outer" batches of up to `args.chunk_size`.
for batch in slots.chunks(args.chunk_size) {
if shutdown_token.is_cancelled() {
info!("Shutdown signal received during batch processing, exiting...");
Expand All @@ -327,11 +327,11 @@ async fn process_slots(
let mut batch_retries = 0;
let mut batch_delay_ms = INITIAL_BATCH_DELAY_MS;

// Initialize the list of slots to fetch and the map of successful blocks
// The set of slots we still need to fetch in this "outer" batch.
let mut slots_to_fetch: Vec<u64> = batch.to_vec();
let mut successful_blocks: HashMap<u64, RawBlock> = HashMap::new();

// Retry loop for the batch
// Retry loop for the entire "outer" batch, including partial failures.
loop {
if shutdown_token.is_cancelled() {
info!("Shutdown signal received during batch processing, exiting...");
Expand All @@ -341,33 +341,70 @@ async fn process_slots(
// We will fill `new_failed_slots` if any slot(s) or the whole chunk fails
let mut new_failed_slots = Vec::new();

// -------------------------------------------------------------
// 1) FETCH BLOCKS — Bigtable vs. RPC
// -------------------------------------------------------------
match &*backfill_source {
// ---------------------------------------------------------
// BIGTABLE PATH: fetch the entire chunk in one call
// ---------------------------------------------------------
// ------------------------------------------------------------------
// 1) Bigtable path: split the batch into sub-chunks, fetch in parallel
// ------------------------------------------------------------------
BackfillSource::Bigtable(bigtable_client) => {
// Attempt to fetch the whole chunk at once
match bigtable_client.get_blocks(&slots_to_fetch).await {
Ok(blocks) => {
// If success, convert each fetched block into RawBlock
for (slot, confirmed_block) in blocks {
successful_blocks
.insert(slot, RawBlock { slot, block: confirmed_block });
// We want up to `args.max_concurrency` parallel requests.
// We'll split `slots_to_fetch` into that many sub-chunks.
// Example: if slots_to_fetch.len()=300 & max_concurrency=10,
// each sub-chunk is ~30 slots.
let total = slots_to_fetch.len();
let sub_chunk_size = std::cmp::max(total / args.max_concurrency, 1);

// Force at least a single sub-chunk
let sub_chunks: Vec<&[u64]> = slots_to_fetch.chunks(sub_chunk_size).collect();

// A semaphore ensures we won't exceed `args.max_concurrency`
// *sub-chunk fetches* in parallel. (Though usually sub_chunks.len()
// is <= max_concurrency anyway.)
let semaphore = Arc::new(Semaphore::new(args.max_concurrency));

let fetch_futures = sub_chunks.into_iter().map(|sub_slots| {
let bigtable_client = bigtable_client.clone();
let semaphore = semaphore.clone();
let shutdown_token = shutdown_token.clone();

async move {
let _permit = semaphore.acquire().await;
if shutdown_token.is_cancelled() {
return Err((sub_slots.to_vec(), "Shutdown".to_string()));
}
},
Err(e) => {
// The entire chunk failed to fetch; mark all slots as failed
error!("Failed to fetch chunk via Bigtable: {}", e);
new_failed_slots = slots_to_fetch.clone();
},

// Single Bigtable call for this sub-chunk
match bigtable_client.get_blocks(sub_slots).await {
Ok(blocks_map) => Ok(blocks_map),
Err(e) => Err((sub_slots.to_vec(), e.to_string())),
}
}
});

// Launch all sub-chunk futures in parallel, then gather results.
let sub_results = join_all(fetch_futures).await;

for sub_result in sub_results {
match sub_result {
Ok(blocks) => {
for (slot, confirmed_block) in blocks {
successful_blocks
.insert(slot, RawBlock { slot, block: confirmed_block });
}
},
Err((slots_failed, e)) => {
error!(
"Failed to fetch sub-chunk from Bigtable ({} slots). Error: {}",
slots_failed.len(),
e
);
new_failed_slots.extend(slots_failed);
},
}
}
},

// ---------------------------------------------------------
// RPC or other variant: old concurrency approach
// 2) RPC or other: original slot-by-slot concurrency
// ---------------------------------------------------------
_ => {
let semaphore = Arc::new(Semaphore::new(args.max_concurrency));
Expand All @@ -385,7 +422,6 @@ async fn process_slots(
});

let results = join_all(fetch_futures).await;

for result in results {
match result {
Ok((slot, raw_block)) => {
Expand All @@ -400,29 +436,27 @@ async fn process_slots(
},
}

// -------------------------------------------------------------
// 2) IF all fetched, WRITE TO ROCKSDB
// -------------------------------------------------------------
// -----------------------
// Attempt to persist
// -----------------------
if new_failed_slots.is_empty() {
// All slots for this batch were successfully fetched
// We successfully fetched all requested slots for this batch
debug!(
"All slots fetched successfully for current batch. Saving {} slots to RocksDB.",
"All slots fetched for current batch. Saving {} slots to RocksDB.",
successful_blocks.len()
);

// Try saving to DB (we also retry on DB write errors)
match target_db.raw_blocks_cbor.put_batch(successful_blocks.clone()).await {
Ok(_) => {
// Successfully saved to DB; proceed to next batch
let last_slot = successful_blocks.keys().max().cloned().unwrap_or(0);
info!(
"Successfully saved batch to RocksDB. Last stored slot: {}",
last_slot
);
break; // proceed to next chunk
break; // Move on to next chunk of `slots`
},
Err(e) => {
// DB write failed; retry
// DB write failed
error!("Failed to save blocks to RocksDB: {}", e);
batch_retries += 1;
if batch_retries >= MAX_BATCH_RETRIES {
Expand All @@ -441,7 +475,7 @@ async fn process_slots(
},
}
} else {
// Some or all slots failed to fetch
// Some slots in the chunk were not fetched
batch_retries += 1;
if batch_retries >= MAX_BATCH_RETRIES {
panic!(
Expand Down

0 comments on commit 5a0a266

Please sign in to comment.