Skip to content

Commit 55a480b

Browse files
GeorgeBakeDentosal
andcommitted
Updated block importer to allow more blocks to be queue (#2010)
Before, we required all services to process the import result before committing a new result. It was done to reduce the gap between on-chain and off-chain databases and minimize the damage from the FuelLabs/fuel-core#1584 problem. The FuelLabs/fuel-core#1584 is no longer a problem with FuelLabs/fuel-core#2004 fix. Because of that, we can allow committing more blocks in parallel while other services are processing old ones. It improves synchronization speed because we have a buffer before we wait for other services to catch up. It is very actual for cases when other services are busy right now with other work, but soon will be available to process `ImportResult`. The default value size of the buffer is `1024`. ### Before requesting review - [x] I have reviewed the code myself --------- Co-authored-by: Hannes Karppila <[email protected]>
1 parent cfc74d1 commit 55a480b

File tree

5 files changed

+55
-70
lines changed

5 files changed

+55
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1818
- [#1948](https://github.com/FuelLabs/fuel-core/pull/1948): Add new `AlgorithmV1` and `AlgorithmUpdaterV1` for the gas price. Include tools for analysis
1919

2020
### Changed
21+
- [#2010](https://github.com/FuelLabs/fuel-core/pull/2010): Updated the block importer to allow more blocks to be in the queue. It improves synchronization speed and mitigate the impact of other services on synchronization speed.
2122
- [#2006](https://github.com/FuelLabs/fuel-core/pull/2006): Process block importer events first under P2P pressure.
2223
- [#2002](https://github.com/FuelLabs/fuel-core/pull/2002): Adapted the block producer to react to checked transactions that were using another version of consensus parameters during validation in the TxPool. After an upgrade of the consensus parameters of the network, TxPool could store invalid `Checked` transactions. This change fixes that by tracking the version that was used to validate the transactions.
2324
- [#1999](https://github.com/FuelLabs/fuel-core/pull/1999): Minimize the number of panics in the codebase.

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/services/importer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ derive_more = { workspace = true }
1515
fuel-core-metrics = { workspace = true }
1616
fuel-core-storage = { workspace = true }
1717
fuel-core-types = { workspace = true }
18+
parking_lot = { workspace = true }
1819
tokio = { workspace = true, features = ["full"] }
1920
tokio-rayon = { workspace = true }
2021
tracing = { workspace = true }

crates/services/importer/src/config.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
#[derive(Debug, Clone)]
22
pub struct Config {
33
pub max_block_notify_buffer: usize,
4-
pub metrics: bool,
54
}
65

76
impl Config {
87
pub fn new() -> Self {
98
Self {
109
max_block_notify_buffer: 1 << 10,
11-
metrics: false,
1210
}
1311
}
1412
}

crates/services/importer/src/importer.rs

Lines changed: 52 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,13 @@ use fuel_core_types::{
3939
Uncommitted,
4040
},
4141
};
42+
use parking_lot::Mutex;
4243
use std::{
4344
ops::{
4445
Deref,
4546
DerefMut,
4647
},
47-
sync::{
48-
Arc,
49-
Mutex,
50-
},
48+
sync::Arc,
5149
time::{
5250
Instant,
5351
SystemTime,
@@ -56,7 +54,8 @@ use std::{
5654
};
5755
use tokio::sync::{
5856
broadcast,
59-
oneshot,
57+
OwnedSemaphorePermit,
58+
Semaphore,
6059
TryAcquireError,
6160
};
6261

@@ -97,6 +96,7 @@ pub enum Error {
9796
#[from]
9897
StorageError(StorageError),
9998
UnsupportedConsensusVariant(String),
99+
ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError),
100100
}
101101

102102
impl From<Error> for anyhow::Error {
@@ -118,11 +118,12 @@ pub struct Importer<D, E, V> {
118118
verifier: Arc<V>,
119119
chain_id: ChainId,
120120
broadcast: broadcast::Sender<ImporterResult>,
121-
/// The channel to notify about the end of the processing of the previous block by all listeners.
122-
/// It is used to await until all receivers of the notification process the `SharedImportResult`
123-
/// before starting committing a new block.
124-
prev_block_process_result: Mutex<Option<oneshot::Receiver<()>>>,
125-
guard: tokio::sync::Semaphore,
121+
guard: Semaphore,
122+
/// The semaphore tracks the number of unprocessed `SharedImportResult`.
123+
/// If the number of unprocessed results is more than the threshold,
124+
/// the block importer stops committing new blocks and waits for
125+
/// the resolution of the previous one.
126+
active_import_results: Arc<Semaphore>,
126127
}
127128

128129
impl<D, E, V> Importer<D, E, V> {
@@ -133,16 +134,20 @@ impl<D, E, V> Importer<D, E, V> {
133134
executor: E,
134135
verifier: V,
135136
) -> Self {
136-
let (broadcast, _) = broadcast::channel(config.max_block_notify_buffer);
137+
// We use semaphore as a back pressure mechanism instead of a `broadcast`
138+
// channel because we want to prevent committing to the database results
139+
// that will not be processed.
140+
let max_block_notify_buffer = config.max_block_notify_buffer;
141+
let (broadcast, _) = broadcast::channel(max_block_notify_buffer);
137142

138143
Self {
139144
database: Mutex::new(database),
140145
executor: Arc::new(executor),
141146
verifier: Arc::new(verifier),
142147
chain_id,
143148
broadcast,
144-
prev_block_process_result: Default::default(),
145-
guard: tokio::sync::Semaphore::new(1),
149+
active_import_results: Arc::new(Semaphore::new(max_block_notify_buffer)),
150+
guard: Semaphore::new(1),
146151
}
147152
}
148153

@@ -198,35 +203,31 @@ where
198203
result: UncommittedResult<Changes>,
199204
) -> Result<(), Error> {
200205
let _guard = self.lock()?;
201-
// It is safe to unwrap the channel because we have the `_guard`.
202-
let previous_block_result = self
203-
.prev_block_process_result
204-
.lock()
205-
.expect("poisoned")
206-
.take();
207206

208207
// Await until all receivers of the notification process the result.
209-
if let Some(channel) = previous_block_result {
210-
const TIMEOUT: u64 = 20;
211-
let result =
212-
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
213-
.await;
208+
const TIMEOUT: u64 = 20;
209+
let await_result = tokio::time::timeout(
210+
tokio::time::Duration::from_secs(TIMEOUT),
211+
self.active_import_results.clone().acquire_owned(),
212+
)
213+
.await;
214214

215-
if result.is_err() {
216-
tracing::error!(
217-
"The previous block processing \
215+
let Ok(permit) = await_result else {
216+
tracing::error!(
217+
"The previous block processing \
218218
was not finished for {TIMEOUT} seconds."
219-
);
220-
return Err(Error::PreviousBlockProcessingNotFinished)
221-
}
222-
}
219+
);
220+
return Err(Error::PreviousBlockProcessingNotFinished)
221+
};
222+
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;
223+
223224
let mut guard = self
224225
.database
225226
.try_lock()
226227
.expect("Semaphore prevents concurrent access to the database");
227228
let database = guard.deref_mut();
228229

229-
self._commit_result(result, database)
230+
self._commit_result(result, permit, database)
230231
}
231232

232233
/// The method commits the result of the block execution and notifies about a new imported block.
@@ -242,6 +243,7 @@ where
242243
fn _commit_result(
243244
&self,
244245
result: UncommittedResult<Changes>,
246+
permit: OwnedSemaphorePermit,
245247
database: &mut D,
246248
) -> Result<(), Error> {
247249
let (result, changes) = result.into();
@@ -327,16 +329,12 @@ where
327329

328330
tracing::info!("Committed block {:#x}", result.sealed_block.entity.id());
329331

330-
// The `tokio::sync::oneshot::Sender` is used to notify about the end
331-
// of the processing of a new block by all listeners.
332-
let (sender, receiver) = oneshot::channel();
333332
let result = ImporterResult {
334-
shared_result: Arc::new(Awaiter::new(result, sender)),
333+
shared_result: Arc::new(Awaiter::new(result, permit)),
335334
#[cfg(feature = "test-helpers")]
336335
changes: Arc::new(changes_clone),
337336
};
338337
let _ = self.broadcast.send(result);
339-
*self.prev_block_process_result.lock().expect("poisoned") = Some(receiver);
340338

341339
Ok(())
342340
}
@@ -467,28 +465,22 @@ where
467465

468466
let result = result?;
469467

470-
// It is safe to unwrap the channel because we have the `_guard`.
471-
let previous_block_result = self
472-
.prev_block_process_result
473-
.lock()
474-
.expect("poisoned")
475-
.take();
476-
477468
// Await until all receivers of the notification process the result.
478-
if let Some(channel) = previous_block_result {
479-
const TIMEOUT: u64 = 20;
480-
let result =
481-
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
482-
.await;
469+
const TIMEOUT: u64 = 20;
470+
let await_result = tokio::time::timeout(
471+
tokio::time::Duration::from_secs(TIMEOUT),
472+
self.active_import_results.clone().acquire_owned(),
473+
)
474+
.await;
483475

484-
if result.is_err() {
485-
tracing::error!(
486-
"The previous block processing \
476+
let Ok(permit) = await_result else {
477+
tracing::error!(
478+
"The previous block processing \
487479
was not finished for {TIMEOUT} seconds."
488-
);
489-
return Err(Error::PreviousBlockProcessingNotFinished)
490-
}
491-
}
480+
);
481+
return Err(Error::PreviousBlockProcessingNotFinished)
482+
};
483+
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;
492484

493485
let start = Instant::now();
494486

@@ -497,7 +489,7 @@ where
497489
.try_lock()
498490
.expect("Semaphore prevents concurrent access to the database");
499491
let database = guard.deref_mut();
500-
let commit_result = self._commit_result(result, database);
492+
let commit_result = self._commit_result(result, permit, database);
501493
let commit_time = start.elapsed().as_secs_f64();
502494
let time = execute_time + commit_time;
503495
importer_metrics().execute_and_commit_duration.observe(time);
@@ -509,15 +501,7 @@ where
509501
/// The wrapper around `ImportResult` to notify about the end of the processing of a new block.
510502
struct Awaiter {
511503
result: ImportResult,
512-
release_channel: Option<oneshot::Sender<()>>,
513-
}
514-
515-
impl Drop for Awaiter {
516-
fn drop(&mut self) {
517-
if let Some(release_channel) = core::mem::take(&mut self.release_channel) {
518-
let _ = release_channel.send(());
519-
}
520-
}
504+
_permit: OwnedSemaphorePermit,
521505
}
522506

523507
impl Deref for Awaiter {
@@ -529,10 +513,10 @@ impl Deref for Awaiter {
529513
}
530514

531515
impl Awaiter {
532-
fn new(result: ImportResult, channel: oneshot::Sender<()>) -> Self {
516+
fn new(result: ImportResult, permit: OwnedSemaphorePermit) -> Self {
533517
Self {
534518
result,
535-
release_channel: Some(channel),
519+
_permit: permit,
536520
}
537521
}
538522
}

0 commit comments

Comments
 (0)