Skip to content

Commit b6e6bd3

Browse files
authored
refactor(stages): empty transactions range (paradigmxyz#19753)
1 parent d150b0a commit b6e6bd3

File tree

3 files changed

+37
-31
lines changed

3 files changed

+37
-31
lines changed

crates/stages/api/src/stage.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -86,24 +86,22 @@ impl ExecInput {
8686
/// Return the next block range determined the number of transactions within it.
8787
/// This function walks the block indices until either the end of the range is reached or
8888
/// the number of transactions exceeds the threshold.
89+
///
90+
/// Returns [`None`] if no transactions are found for the current execution input.
8991
#[instrument(level = "debug", target = "sync::stages", skip(provider), ret)]
9092
pub fn next_block_range_with_transaction_threshold<Provider>(
9193
&self,
9294
provider: &Provider,
9395
tx_threshold: u64,
94-
) -> Result<TransactionRangeOutput, StageError>
96+
) -> Result<Option<TransactionRangeOutput>, StageError>
9597
where
9698
Provider: StaticFileProviderFactory + BlockReader,
9799
{
98100
// Get lowest available block number for transactions
99101
let Some(lowest_transactions_block) =
100102
provider.static_file_provider().get_lowest_range_start(StaticFileSegment::Transactions)
101103
else {
102-
return Ok(TransactionRangeOutput {
103-
tx_range: 0..0,
104-
block_range: 0..=0,
105-
is_final_range: true,
106-
});
104+
return Ok(None)
107105
};
108106

109107
// We can only process transactions that have associated static files, so we cap the start
@@ -115,6 +113,13 @@ impl ExecInput {
115113
let start_block = self.next_block().max(lowest_transactions_block);
116114
let target_block = self.target();
117115

116+
// If the start block is greater than the target, then there's no transactions to process
117+
// and we return early. It's possible to trigger this scenario when running `reth
118+
// stage run` manually for a range of transactions that doesn't exist.
119+
if start_block > target_block {
120+
return Ok(None)
121+
}
122+
118123
let start_block_body = provider
119124
.block_body_indices(start_block)?
120125
.ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
@@ -129,11 +134,7 @@ impl ExecInput {
129134

130135
if all_tx_cnt == 0 {
131136
// if there is no more transaction return back.
132-
return Ok(TransactionRangeOutput {
133-
tx_range: first_tx_num..first_tx_num,
134-
block_range: start_block..=target_block,
135-
is_final_range: true,
136-
})
137+
return Ok(None)
137138
}
138139

139140
// get block of this tx
@@ -154,11 +155,11 @@ impl ExecInput {
154155
};
155156

156157
let tx_range = first_tx_num..next_tx_num;
157-
Ok(TransactionRangeOutput {
158+
Ok(Some(TransactionRangeOutput {
158159
tx_range,
159160
block_range: start_block..=end_block,
160161
is_final_range,
161-
})
162+
}))
162163
}
163164
}
164165

@@ -355,9 +356,7 @@ mod tests {
355356
let range_output = exec_input
356357
.next_block_range_with_transaction_threshold(&provider_factory, 10)
357358
.unwrap();
358-
assert_eq!(range_output.tx_range, 0..0);
359-
assert_eq!(range_output.block_range, 0..=0);
360-
assert!(range_output.is_final_range);
359+
assert!(range_output.is_none());
361360
}
362361

363362
// With checkpoint at block 10, without transactions in static files
@@ -368,9 +367,7 @@ mod tests {
368367
let range_output = exec_input
369368
.next_block_range_with_transaction_threshold(&provider_factory, 10)
370369
.unwrap();
371-
assert_eq!(range_output.tx_range, 0..0);
372-
assert_eq!(range_output.block_range, 0..=0);
373-
assert!(range_output.is_final_range);
370+
assert!(range_output.is_none());
374371
}
375372

376373
// Without checkpoint, with transactions in static files starting from block 1
@@ -396,6 +393,7 @@ mod tests {
396393

397394
let range_output = exec_input
398395
.next_block_range_with_transaction_threshold(&provider_factory, 10)
396+
.unwrap()
399397
.unwrap();
400398
assert_eq!(range_output.tx_range, 0..2);
401399
assert_eq!(range_output.block_range, 1..=1);
@@ -424,6 +422,7 @@ mod tests {
424422

425423
let range_output = exec_input
426424
.next_block_range_with_transaction_threshold(&provider_factory, 10)
425+
.unwrap()
427426
.unwrap();
428427
assert_eq!(range_output.tx_range, 2..3);
429428
assert_eq!(range_output.block_range, 2..=2);
@@ -445,6 +444,7 @@ mod tests {
445444

446445
let range_output = exec_input
447446
.next_block_range_with_transaction_threshold(&provider_factory, 10)
447+
.unwrap()
448448
.unwrap();
449449
assert_eq!(range_output.tx_range, 2..3);
450450
assert_eq!(range_output.block_range, 2..=2);
@@ -473,6 +473,7 @@ mod tests {
473473

474474
let range_output = exec_input
475475
.next_block_range_with_transaction_threshold(&provider_factory, 10)
476+
.unwrap()
476477
.unwrap();
477478
assert_eq!(range_output.tx_range, 3..4);
478479
assert_eq!(range_output.block_range, 3..=3);

crates/stages/stages/src/stages/sender_recovery.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,17 @@ where
8080
return Ok(ExecOutput::done(input.checkpoint()))
8181
}
8282

83-
let range_output =
84-
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
85-
let end_block = *range_output.block_range.end();
86-
87-
// No transactions to walk over
88-
if range_output.tx_range.is_empty() {
89-
info!(target: "sync::stages::sender_recovery", tx_range = ?range_output.tx_range, "Target transaction already reached");
83+
let Some(range_output) =
84+
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?
85+
else {
86+
info!(target: "sync::stages::sender_recovery", "No transaction senders to recover");
9087
return Ok(ExecOutput {
91-
checkpoint: StageCheckpoint::new(end_block)
88+
checkpoint: StageCheckpoint::new(input.target())
9289
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
93-
done: range_output.is_final_range,
90+
done: true,
9491
})
95-
}
92+
};
93+
let end_block = *range_output.block_range.end();
9694

9795
// Acquire the cursor for inserting elements
9896
let mut senders_cursor = provider.tx_ref().cursor_write::<tables::TransactionSenders>()?;

crates/stages/stages/src/stages/tx_lookup.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,15 @@ where
126126
);
127127

128128
loop {
129-
let range_output =
130-
input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
129+
let Some(range_output) =
130+
input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
131+
else {
132+
input.checkpoint = Some(
133+
StageCheckpoint::new(input.target())
134+
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
135+
);
136+
break;
137+
};
131138

132139
let end_block = *range_output.block_range.end();
133140

0 commit comments

Comments
 (0)