Skip to content
This repository has been archived by the owner on Aug 2, 2024. It is now read-only.

Commit

Permalink
fix: reorder DA tasks (#1343)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdelabro authored Jan 3, 2024
1 parent 27a7b61 commit 61a1c4c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- fix(DA): run the proof first then the state update
- fix: `prove_current_block` is called after `update_state`
- ci: add foundry ci task to push workflow
- fix: first tx for non deployed account is valid
Expand Down
163 changes: 93 additions & 70 deletions crates/client/data-availability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod utils;
use std::marker::PhantomData;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use ethers::types::{I256, U256};
use futures::channel::mpsc;
Expand All @@ -17,10 +17,10 @@ use mp_hashers::HasherT;
use serde::{Deserialize, Serialize};
use sp_runtime::traits::Block as BlockT;
use starknet_api::block::BlockHash;
use starknet_api::state::ThinStateDiff;
use utils::state_diff_to_calldata;

pub struct DataAvailabilityWorker<B, H>(PhantomData<(B, H)>);
pub struct DataAvailabilityWorkerProving<B>(PhantomData<B>);

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
Expand Down Expand Up @@ -70,89 +70,112 @@ pub trait DaClient: Send + Sync {
async fn publish_state_diff(&self, state_diff: Vec<U256>) -> Result<()>;
}

impl<B> DataAvailabilityWorkerProving<B>
where
B: BlockT,
{
pub async fn prove_current_block(da_mode: DaMode, block_hash: BlockHash, madara_backend: Arc<mc_db::Backend<B>>) {
log::info!("proving the block {block_hash}");

match da_mode {
DaMode::Validity => {
// Submit the Starknet OS PIE
// TODO: Validity Impl
// run the Starknet OS with the Cairo VM
// extract the PIE from the Cairo VM run
// pass the PIE to `submit_pie` and zip/base64 internal
if let Ok(job_resp) = sharp::submit_pie("TODO") {
log::info!("Job Submitted: {}", job_resp.cairo_job_key);
// Store the cairo job key
if let Err(db_err) = madara_backend.da().update_cairo_job(&block_hash, job_resp.cairo_job_key) {
log::error!("db err: {db_err}");
};
}
}
_ => {
log::info!("don't prove in remaining DA modes")
}
}
}
}

/// The client worker for DA related tasks
///
/// Listen to new block state diff and spawn new threads to execute each block flow concurently.
/// The flow goes as follow:
/// 1. Prove. Do nothing if node is run in sovereign mode
/// 2. Updata
impl<B, H> DataAvailabilityWorker<B, H>
where
B: BlockT,
H: HasherT,
{
pub async fn update_state(
da_client: Box<dyn DaClient + Send + Sync>,
pub async fn prove_current_block(
da_client: Arc<dyn DaClient + Send + Sync>,
mut state_diffs_rx: mpsc::Receiver<BlockDAData>,
madara_backend: Arc<mc_db::Backend<B>>,
) {
while let Some(BlockDAData(starknet_block_hash, csd, num_addr_accessed)) = state_diffs_rx.next().await {
log::info!(
"received state diff for block {starknet_block_hash}: {csd:?}. {num_addr_accessed} addresses accessed."
"received state diff for block {starknet_block_hash}: {csd:?}.{num_addr_accessed} addresses accessed."
);

// store the state diff
if let Err(db_err) = madara_backend
.da()
.store_state_diff(&starknet_block_hash, state_diff_to_calldata(csd, num_addr_accessed))
{
log::error!("db err: {db_err}");
};
// Query last written state
// TODO: this value will be used to ensure the correct state diff is being written in Validity mode
let _last_published_state = match da_client.last_published_state().await {
Ok(last_published_state) => last_published_state,
Err(e) => {
log::error!("da provider error: {e}");
continue;
let da_client = da_client.clone();
let madara_backend = madara_backend.clone();
tokio::spawn(async move {
match prove(da_client.get_mode(), starknet_block_hash, &csd, num_addr_accessed, madara_backend.clone())
.await
{
Err(err) => log::error!("proving error: {err}"),
Ok(()) => {}
}
};

match da_client.get_mode() {
DaMode::Validity => {
// Check the SHARP status of last_proved + 1
// Write the publish state diff of last_proved + 1
log::info!("validity da mode not implemented");
}
DaMode::Sovereign => match madara_backend.da().state_diff(&starknet_block_hash) {
Ok(state_diff) => {
if let Err(e) = da_client.publish_state_diff(state_diff).await {
log::error!("DA PUBLISH ERROR: {}", e);
}
}
Err(e) => log::error!("could not pull state diff for block {starknet_block_hash}: {e}"),
},
DaMode::Volition => log::info!("volition da mode not implemented"),
}
match update_state::<B, H>(madara_backend, da_client, starknet_block_hash, csd, num_addr_accessed).await
{
Err(err) => log::error!("state publishing error: {err}"),
Ok(()) => {}
};
});
}
}
}

tokio::spawn(DataAvailabilityWorkerProving::<B>::prove_current_block(
da_client.get_mode(),
starknet_block_hash,
madara_backend.clone(),
));
pub async fn prove<B: BlockT>(
da_mode: DaMode,
block_hash: BlockHash,
_state_diff: &ThinStateDiff,
_num_addr_accessed: usize,
madara_backend: Arc<mc_db::Backend<B>>,
) -> Result<(), anyhow::Error> {
log::info!("proving the block {block_hash}");

match da_mode {
DaMode::Validity => {
// Submit the Starknet OS PIE
// TODO: Validity Impl
// run the Starknet OS with the Cairo VM
// extract the PIE from the Cairo VM run
// pass the PIE to `submit_pie` and zip/base64 internal
if let Ok(job_resp) = sharp::submit_pie("TODO") {
log::info!("Job Submitted: {}", job_resp.cairo_job_key);
// Store the cairo job key
madara_backend
.da()
.update_cairo_job(&block_hash, job_resp.cairo_job_key)
.map_err(|e| anyhow!("{e}"))?;
}
}
_ => {
log::info!("don't prove in remaining DA modes")
}
}

Ok(())
}

pub async fn update_state<B: BlockT, H: HasherT>(
madara_backend: Arc<mc_db::Backend<B>>,
da_client: Arc<dyn DaClient + Send + Sync>,
starknet_block_hash: BlockHash,
csd: ThinStateDiff,
num_addr_accessed: usize,
) -> Result<(), anyhow::Error> {
// store the state diff
madara_backend
.da()
.store_state_diff(&starknet_block_hash, state_diff_to_calldata(csd, num_addr_accessed))
.map_err(|e| anyhow!("{e}"))?;

// Query last written state
// TODO: this value will be used to ensure the correct state diff is being written in
// Validity mode
let _last_published_state = da_client.last_published_state().await?;

match da_client.get_mode() {
DaMode::Validity => {
// Check the SHARP status of last_proved + 1
// Write the publish state diff of last_proved + 1
log::info!("validity da mode not implemented");
}
DaMode::Sovereign => match madara_backend.da().state_diff(&starknet_block_hash) {
Ok(state_diff) => {
da_client.publish_state_diff(state_diff).await.map_err(|e| anyhow!("DA PUBLISH ERROR: {e}"))?;
}
Err(e) => Err(anyhow!("could not pull state diff for block {starknet_block_hash}: {e}"))?,
},
DaMode::Volition => log::info!("volition da mode not implemented"),
};

Ok(())
}
12 changes: 6 additions & 6 deletions crates/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,25 +431,25 @@ pub fn new_full(
.for_each(|()| future::ready(())),
);

let da_client: Box<dyn DaClient + Send + Sync> = match da_layer {
let da_client: Arc<dyn DaClient + Send + Sync> = match da_layer {
DaLayer::Celestia => {
let celestia_conf = CelestiaConfig::try_from(&da_path)?;
Box::new(CelestiaClient::try_from(celestia_conf).map_err(|e| ServiceError::Other(e.to_string()))?)
Arc::new(CelestiaClient::try_from(celestia_conf).map_err(|e| ServiceError::Other(e.to_string()))?)
}
DaLayer::Ethereum => {
let ethereum_conf = EthereumConfig::try_from(&da_path)?;
Box::new(EthereumClient::try_from(ethereum_conf)?)
Arc::new(EthereumClient::try_from(ethereum_conf)?)
}
DaLayer::Avail => {
let avail_conf = AvailConfig::try_from(&da_path)?;
Box::new(AvailClient::try_from(avail_conf).map_err(|e| ServiceError::Other(e.to_string()))?)
Arc::new(AvailClient::try_from(avail_conf).map_err(|e| ServiceError::Other(e.to_string()))?)
}
};

task_manager.spawn_essential_handle().spawn(
"da-worker-update",
"da-worker",
Some(MADARA_TASK_GROUP),
DataAvailabilityWorker::<_, StarknetHasher>::update_state(
DataAvailabilityWorker::<_, StarknetHasher>::prove_current_block(
da_client,
commitment_state_diff_rx,
madara_backend.clone(),
Expand Down

0 comments on commit 61a1c4c

Please sign in to comment.