Skip to content

Commit

Permalink
Updated block importer to allow more blocks to be queue (#2010)
Browse files Browse the repository at this point in the history
Before, we required all services to process the import result before
committing a new result. It was done to reduce the gap between on-chain
and off-chain databases and minimize the damage from the
#1584 problem.

The #1584 is no longer a
problem with #2004 fix.
Because of that, we can allow committing more blocks in parallel while
other services are processing old ones. It improves synchronization
speed because we have a buffer before we wait for other services to
catch up. It is very actual for cases when other services are busy right
now with other work, but soon will be available to process
`ImportResult`.

The default value size of the buffer is `1024`.

### Before requesting review
- [x] I have reviewed the code myself

---------

Co-authored-by: Hannes Karppila <[email protected]>
  • Loading branch information
xgreenx and Dentosal authored Jul 5, 2024
1 parent 6979ce6 commit 6215f6d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [#1948](https://github.com/FuelLabs/fuel-core/pull/1948): Add new `AlgorithmV1` and `AlgorithmUpdaterV1` for the gas price. Include tools for analysis

### Changed
- [#2010](https://github.com/FuelLabs/fuel-core/pull/2010): Updated the block importer to allow more blocks to be in the queue. It improves synchronization speed and mitigate the impact of other services on synchronization speed.
- [#2006](https://github.com/FuelLabs/fuel-core/pull/2006): Process block importer events first under P2P pressure.
- [#2002](https://github.com/FuelLabs/fuel-core/pull/2002): Adapted the block producer to react to checked transactions that were using another version of consensus parameters during validation in the TxPool. After an upgrade of the consensus parameters of the network, TxPool could store invalid `Checked` transactions. This change fixes that by tracking the version that was used to validate the transactions.
- [#1999](https://github.com/FuelLabs/fuel-core/pull/1999): Minimize the number of panics in the codebase.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/services/importer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ derive_more = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-rayon = { workspace = true }
tracing = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/services/importer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
#[derive(Debug, Clone)]
pub struct Config {
pub max_block_notify_buffer: usize,
pub metrics: bool,
}

impl Config {
pub fn new() -> Self {
Self {
max_block_notify_buffer: 1 << 10,
metrics: false,
}
}
}
Expand Down
120 changes: 52 additions & 68 deletions crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ use fuel_core_types::{
Uncommitted,
},
};
use parking_lot::Mutex;
use std::{
ops::{
Deref,
DerefMut,
},
sync::{
Arc,
Mutex,
},
sync::Arc,
time::{
Instant,
SystemTime,
Expand All @@ -56,7 +54,8 @@ use std::{
};
use tokio::sync::{
broadcast,
oneshot,
OwnedSemaphorePermit,
Semaphore,
TryAcquireError,
};

Expand Down Expand Up @@ -97,6 +96,7 @@ pub enum Error {
#[from]
StorageError(StorageError),
UnsupportedConsensusVariant(String),
ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError),
}

impl From<Error> for anyhow::Error {
Expand All @@ -118,11 +118,12 @@ pub struct Importer<D, E, V> {
verifier: Arc<V>,
chain_id: ChainId,
broadcast: broadcast::Sender<ImporterResult>,
/// The channel to notify about the end of the processing of the previous block by all listeners.
/// It is used to await until all receivers of the notification process the `SharedImportResult`
/// before starting committing a new block.
prev_block_process_result: Mutex<Option<oneshot::Receiver<()>>>,
guard: tokio::sync::Semaphore,
guard: Semaphore,
/// The semaphore tracks the number of unprocessed `SharedImportResult`.
/// If the number of unprocessed results is more than the threshold,
/// the block importer stops committing new blocks and waits for
/// the resolution of the previous one.
active_import_results: Arc<Semaphore>,
}

impl<D, E, V> Importer<D, E, V> {
Expand All @@ -133,16 +134,20 @@ impl<D, E, V> Importer<D, E, V> {
executor: E,
verifier: V,
) -> Self {
let (broadcast, _) = broadcast::channel(config.max_block_notify_buffer);
// We use semaphore as a back pressure mechanism instead of a `broadcast`
// channel because we want to prevent committing to the database results
// that will not be processed.
let max_block_notify_buffer = config.max_block_notify_buffer;
let (broadcast, _) = broadcast::channel(max_block_notify_buffer);

Self {
database: Mutex::new(database),
executor: Arc::new(executor),
verifier: Arc::new(verifier),
chain_id,
broadcast,
prev_block_process_result: Default::default(),
guard: tokio::sync::Semaphore::new(1),
active_import_results: Arc::new(Semaphore::new(max_block_notify_buffer)),
guard: Semaphore::new(1),
}
}

Expand Down Expand Up @@ -198,35 +203,31 @@ where
result: UncommittedResult<Changes>,
) -> Result<(), Error> {
let _guard = self.lock()?;
// It is safe to unwrap the channel because we have the `_guard`.
let previous_block_result = self
.prev_block_process_result
.lock()
.expect("poisoned")
.take();

// Await until all receivers of the notification process the result.
if let Some(channel) = previous_block_result {
const TIMEOUT: u64 = 20;
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
.await;
const TIMEOUT: u64 = 20;
let await_result = tokio::time::timeout(
tokio::time::Duration::from_secs(TIMEOUT),
self.active_import_results.clone().acquire_owned(),
)
.await;

if result.is_err() {
tracing::error!(
"The previous block processing \
let Ok(permit) = await_result else {
tracing::error!(
"The previous block processing \
was not finished for {TIMEOUT} seconds."
);
return Err(Error::PreviousBlockProcessingNotFinished)
}
}
);
return Err(Error::PreviousBlockProcessingNotFinished)
};
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;

let mut guard = self
.database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();

self._commit_result(result, database)
self._commit_result(result, permit, database)
}

/// The method commits the result of the block execution and notifies about a new imported block.
Expand All @@ -242,6 +243,7 @@ where
fn _commit_result(
&self,
result: UncommittedResult<Changes>,
permit: OwnedSemaphorePermit,
database: &mut D,
) -> Result<(), Error> {
let (result, changes) = result.into();
Expand Down Expand Up @@ -327,16 +329,12 @@ where

tracing::info!("Committed block {:#x}", result.sealed_block.entity.id());

// The `tokio::sync::oneshot::Sender` is used to notify about the end
// of the processing of a new block by all listeners.
let (sender, receiver) = oneshot::channel();
let result = ImporterResult {
shared_result: Arc::new(Awaiter::new(result, sender)),
shared_result: Arc::new(Awaiter::new(result, permit)),
#[cfg(feature = "test-helpers")]
changes: Arc::new(changes_clone),
};
let _ = self.broadcast.send(result);
*self.prev_block_process_result.lock().expect("poisoned") = Some(receiver);

Ok(())
}
Expand Down Expand Up @@ -467,28 +465,22 @@ where

let result = result?;

// It is safe to unwrap the channel because we have the `_guard`.
let previous_block_result = self
.prev_block_process_result
.lock()
.expect("poisoned")
.take();

// Await until all receivers of the notification process the result.
if let Some(channel) = previous_block_result {
const TIMEOUT: u64 = 20;
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
.await;
const TIMEOUT: u64 = 20;
let await_result = tokio::time::timeout(
tokio::time::Duration::from_secs(TIMEOUT),
self.active_import_results.clone().acquire_owned(),
)
.await;

if result.is_err() {
tracing::error!(
"The previous block processing \
let Ok(permit) = await_result else {
tracing::error!(
"The previous block processing \
was not finished for {TIMEOUT} seconds."
);
return Err(Error::PreviousBlockProcessingNotFinished)
}
}
);
return Err(Error::PreviousBlockProcessingNotFinished)
};
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;

let start = Instant::now();

Expand All @@ -497,7 +489,7 @@ where
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();
let commit_result = self._commit_result(result, database);
let commit_result = self._commit_result(result, permit, database);
let commit_time = start.elapsed().as_secs_f64();
let time = execute_time + commit_time;
importer_metrics().execute_and_commit_duration.observe(time);
Expand All @@ -509,15 +501,7 @@ where
/// The wrapper around `ImportResult` to notify about the end of the processing of a new block.
struct Awaiter {
result: ImportResult,
release_channel: Option<oneshot::Sender<()>>,
}

impl Drop for Awaiter {
fn drop(&mut self) {
if let Some(release_channel) = core::mem::take(&mut self.release_channel) {
let _ = release_channel.send(());
}
}
_permit: OwnedSemaphorePermit,
}

impl Deref for Awaiter {
Expand All @@ -529,10 +513,10 @@ impl Deref for Awaiter {
}

impl Awaiter {
fn new(result: ImportResult, channel: oneshot::Sender<()>) -> Self {
fn new(result: ImportResult, permit: OwnedSemaphorePermit) -> Self {
Self {
result,
release_channel: Some(channel),
_permit: permit,
}
}
}

0 comments on commit 6215f6d

Please sign in to comment.