diff --git a/nft_ingester/src/bin/slot_persister/main.rs b/nft_ingester/src/bin/slot_persister/main.rs index 15c85efb..7939c8d0 100644 --- a/nft_ingester/src/bin/slot_persister/main.rs +++ b/nft_ingester/src/bin/slot_persister/main.rs @@ -317,7 +317,7 @@ async fn process_slots( args: &Args, shutdown_token: CancellationToken, ) { - // Process slots in batches + // Process slots in "outer" batches of up to `args.chunk_size`. for batch in slots.chunks(args.chunk_size) { if shutdown_token.is_cancelled() { info!("Shutdown signal received during batch processing, exiting..."); @@ -327,11 +327,11 @@ async fn process_slots( let mut batch_retries = 0; let mut batch_delay_ms = INITIAL_BATCH_DELAY_MS; - // Initialize the list of slots to fetch and the map of successful blocks + // The set of slots we still need to fetch in this "outer" batch. let mut slots_to_fetch: Vec = batch.to_vec(); let mut successful_blocks: HashMap = HashMap::new(); - // Retry loop for the batch + // Retry loop for the entire "outer" batch, including partial failures. loop { if shutdown_token.is_cancelled() { info!("Shutdown signal received during batch processing, exiting..."); @@ -341,33 +341,70 @@ async fn process_slots( // We will fill `new_failed_slots` if any slot(s) or the whole chunk fails let mut new_failed_slots = Vec::new(); - // ------------------------------------------------------------- - // 1) FETCH BLOCKS — Bigtable vs. RPC - // ------------------------------------------------------------- match &*backfill_source { - // --------------------------------------------------------- - // BIGTABLE PATH: fetch the entire chunk in one call - // --------------------------------------------------------- + // ------------------------------------------------------------------ + // 1) Bigtable path: split the batch into sub-chunks, fetch in parallel + // ------------------------------------------------------------------ BackfillSource::Bigtable(bigtable_client) => { - // Attempt to fetch the whole chunk at once - match bigtable_client.get_blocks(&slots_to_fetch).await { - Ok(blocks) => { - // If success, convert each fetched block into RawBlock - for (slot, confirmed_block) in blocks { - successful_blocks - .insert(slot, RawBlock { slot, block: confirmed_block }); + // We want up to `args.max_concurrency` parallel requests. + // We'll split `slots_to_fetch` into that many sub-chunks. + // Example: if slots_to_fetch.len()=300 & max_concurrency=10, + // each sub-chunk is ~30 slots. + let total = slots_to_fetch.len(); + let sub_chunk_size = std::cmp::max(total / args.max_concurrency, 1); + + // Force at least a single sub-chunk + let sub_chunks: Vec<&[u64]> = slots_to_fetch.chunks(sub_chunk_size).collect(); + + // A semaphore ensures we won't exceed `args.max_concurrency` + // *sub-chunk fetches* in parallel. (Though usually sub_chunks.len() + // is <= max_concurrency anyway.) + let semaphore = Arc::new(Semaphore::new(args.max_concurrency)); + + let fetch_futures = sub_chunks.into_iter().map(|sub_slots| { + let bigtable_client = bigtable_client.clone(); + let semaphore = semaphore.clone(); + let shutdown_token = shutdown_token.clone(); + + async move { + let _permit = semaphore.acquire().await; + if shutdown_token.is_cancelled() { + return Err((sub_slots.to_vec(), "Shutdown".to_string())); } - }, - Err(e) => { - // The entire chunk failed to fetch; mark all slots as failed - error!("Failed to fetch chunk via Bigtable: {}", e); - new_failed_slots = slots_to_fetch.clone(); - }, + + // Single Bigtable call for this sub-chunk + match bigtable_client.get_blocks(sub_slots).await { + Ok(blocks_map) => Ok(blocks_map), + Err(e) => Err((sub_slots.to_vec(), e.to_string())), + } + } + }); + + // Launch all sub-chunk futures in parallel, then gather results. + let sub_results = join_all(fetch_futures).await; + + for sub_result in sub_results { + match sub_result { + Ok(blocks) => { + for (slot, confirmed_block) in blocks { + successful_blocks + .insert(slot, RawBlock { slot, block: confirmed_block }); + } + }, + Err((slots_failed, e)) => { + error!( + "Failed to fetch sub-chunk from Bigtable ({} slots). Error: {}", + slots_failed.len(), + e + ); + new_failed_slots.extend(slots_failed); + }, + } } }, // --------------------------------------------------------- - // RPC or other variant: old concurrency approach + // 2) RPC or other: original slot-by-slot concurrency // --------------------------------------------------------- _ => { let semaphore = Arc::new(Semaphore::new(args.max_concurrency)); @@ -385,7 +422,6 @@ async fn process_slots( }); let results = join_all(fetch_futures).await; - for result in results { match result { Ok((slot, raw_block)) => { @@ -400,29 +436,27 @@ async fn process_slots( }, } - // ------------------------------------------------------------- - // 2) IF all fetched, WRITE TO ROCKSDB - // ------------------------------------------------------------- + // ----------------------- + // Attempt to persist + // ----------------------- if new_failed_slots.is_empty() { - // All slots for this batch were successfully fetched + // We successfully fetched all requested slots for this batch debug!( - "All slots fetched successfully for current batch. Saving {} slots to RocksDB.", + "All slots fetched for current batch. Saving {} slots to RocksDB.", successful_blocks.len() ); - // Try saving to DB (we also retry on DB write errors) match target_db.raw_blocks_cbor.put_batch(successful_blocks.clone()).await { Ok(_) => { - // Successfully saved to DB; proceed to next batch let last_slot = successful_blocks.keys().max().cloned().unwrap_or(0); info!( "Successfully saved batch to RocksDB. Last stored slot: {}", last_slot ); - break; // proceed to next chunk + break; // Move on to next chunk of `slots` }, Err(e) => { - // DB write failed; retry + // DB write failed error!("Failed to save blocks to RocksDB: {}", e); batch_retries += 1; if batch_retries >= MAX_BATCH_RETRIES { @@ -441,7 +475,7 @@ async fn process_slots( }, } } else { - // Some or all slots failed to fetch + // Some slots in the chunk were not fetched batch_retries += 1; if batch_retries >= MAX_BATCH_RETRIES { panic!(