Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crates/data-chain/src/worker/batch_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ impl BatchMaker {
None
}

/// Add multiple transactions at once, returning any completed batches.
///
/// Convenience method that collects all resulting batches when adding
/// a pre-drained set of transactions in bulk.
pub fn add_transactions(&mut self, txs: Vec<Transaction>) -> Vec<Batch> {
let mut batches = Vec::new();
for tx in txs {
if let Some(batch) = self.add_transaction(tx) {
batches.push(batch);
}
}
batches
}

/// Check if batch should be flushed
fn should_flush(&self) -> bool {
self.pending_size >= self.max_bytes || self.pending_txs.len() >= self.max_txs
Expand Down
4 changes: 2 additions & 2 deletions crates/data-chain/src/worker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ impl WorkerConfig {
/// Create a new configuration with defaults
///
/// Default batch thresholds are tuned for responsive transaction processing:
/// - `max_batch_txs`: 100 transactions triggers immediate batch flush
/// - `max_batch_txs`: 500 transactions triggers immediate batch flush
/// - `flush_interval`: 50ms ensures batches don't wait too long
pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self {
Self {
validator_id,
worker_id,
max_batch_bytes: 1024 * 1024, // 1MB
max_batch_txs: 100, // Flush after 100 txs for responsive batching
max_batch_txs: 500, // Flush after 500 txs for higher throughput
flush_interval: Duration::from_millis(50), // Faster time-based flush
}
}
Expand Down
80 changes: 67 additions & 13 deletions crates/data-chain/src/worker/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,18 @@ impl Worker {
self.check_time_flush().await;
}

// Handle incoming transactions
// Handle incoming transactions - drain all ready TXs at once
tx = self.tx_receiver.recv() => {
if let Some(tx) = tx {
self.handle_transaction(tx).await;
// Drain additional ready transactions to amortize select! overhead
let mut txs = vec![tx];
while let Ok(t) = self.tx_receiver.try_recv() {
txs.push(t);
if txs.len() >= self.config.max_batch_txs {
break;
}
}
self.handle_transactions_batch(txs).await;
} else {
warn!(worker_id = self.config.worker_id, "tx_receiver closed");
self.shutdown = true;
Expand Down Expand Up @@ -420,9 +428,10 @@ impl Worker {
info!(worker_id = self.config.worker_id, "Worker shutting down");
}

/// Handle incoming transaction
/// Handle incoming transaction (used by tests only; production uses handle_transactions_batch)
#[cfg(test)]
async fn handle_transaction(&mut self, tx: Transaction) {
info!(
trace!(
worker_id = self.config.worker_id,
tx_size = tx.len(),
"Worker received transaction from channel"
Expand Down Expand Up @@ -457,14 +466,60 @@ impl Worker {
);
self.process_batch(batch).await;
} else {
info!(
trace!(
worker_id = self.config.worker_id,
pending_txs = self.batch_maker.pending_count(),
"Transaction added to batch maker, waiting for more or flush"
);
}
}

/// Handle a batch of incoming transactions drained from the channel.
///
/// Validates each transaction, adds all valid ones to the batch maker,
/// and processes any resulting batches. This amortizes the per-transaction
/// overhead of the select! loop and logging.
async fn handle_transactions_batch(&mut self, txs: Vec<Transaction>) {
let total = txs.len();
debug!(
worker_id = self.config.worker_id,
tx_count = total,
"Processing drained transaction batch"
);

// Validate and collect valid transactions
let mut valid_txs = Vec::with_capacity(total);
for tx in txs {
if let Some(ref validator) = self.validator {
match validator.validate_transaction(&tx).await {
Ok(()) => valid_txs.push(tx),
Err(e) => {
warn!(
worker_id = self.config.worker_id,
error = %e,
"Transaction validation failed, rejecting"
);
}
}
} else {
valid_txs.push(tx);
}
}

// Add all valid transactions and collect any completed batches
let batches = self.batch_maker.add_transactions(valid_txs);

// Process all completed batches
for batch in batches {
info!(
worker_id = self.config.worker_id,
tx_count = batch.transactions.len(),
"Batch ready, processing"
);
self.process_batch(batch).await;
}
}

/// Handle message from Primary
async fn handle_primary_message(&mut self, msg: PrimaryToWorker) {
match msg {
Expand Down Expand Up @@ -694,9 +749,8 @@ impl Worker {
let has_pending = self.batch_maker.has_pending();
let elapsed = self.batch_maker.time_since_batch_start();

// Log every call so we can see the tick is working
if has_pending {
info!(
trace!(
worker_id = self.config.worker_id,
should_flush,
has_pending,
Expand All @@ -706,7 +760,7 @@ impl Worker {
}

if should_flush && has_pending {
info!(
debug!(
worker_id = self.config.worker_id,
pending_txs = self.batch_maker.pending_count(),
"Time flush triggered, creating batch"
Expand Down Expand Up @@ -783,7 +837,7 @@ impl Worker {
if let Some(ref storage) = self.storage {
match storage.put_batch(batch.clone()).await {
Ok(_) => {
info!(
debug!(
worker_id = self.config.worker_id,
digest = %digest.digest,
tx_count = batch.transactions.len(),
Expand Down Expand Up @@ -812,20 +866,20 @@ impl Worker {
self.state.store_batch(batch.clone());

// Broadcast to peer Workers
info!(
debug!(
worker_id = self.config.worker_id,
digest = %digest.digest,
"Broadcasting batch to peer Workers..."
);
self.network.broadcast_batch(&batch).await;
info!(
debug!(
worker_id = self.config.worker_id,
digest = %digest.digest,
"Broadcast complete"
);

// Report to Primary
info!(
debug!(
worker_id = self.config.worker_id,
digest = %digest.digest,
"Sending BatchDigest to Primary"
Expand All @@ -846,7 +900,7 @@ impl Worker {
"Failed to send BatchDigest to Primary - channel closed"
);
} else {
info!(
debug!(
worker_id = self.config.worker_id,
"BatchDigest sent to Primary successfully"
);
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ toml = { workspace = true }
# Logging
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] }
tracing-appender = "0.2"

# CLI
clap = { version = "4.4", features = ["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl NodeConfig {
data_dir: home_dir.join("data"),
genesis_path: None, // Uses default: {home_dir}/config/genesis.json
car_interval_ms: 100,
max_batch_txs: 100,
max_batch_txs: 500,
max_batch_bytes: 1024 * 1024, // 1MB
rpc_enabled: false,
rpc_http_port: DEFAULT_RPC_HTTP_PORT + (index as u16),
Expand Down Expand Up @@ -473,7 +473,7 @@ mod tests {
"num_workers": 1,
"data_dir": "/tmp/cipherd-0",
"car_interval_ms": 100,
"max_batch_txs": 100,
"max_batch_txs": 500,
"max_batch_bytes": 1048576
}"#;

Expand Down Expand Up @@ -556,7 +556,7 @@ mod tests {
"num_workers": 1,
"data_dir": "/tmp/cipherd-0",
"car_interval_ms": 100,
"max_batch_txs": 100,
"max_batch_txs": 500,
"max_batch_bytes": 1048576
}"#;

Expand Down
21 changes: 16 additions & 5 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ async fn main() -> Result<()> {
let cli = Cli::parse();

// Initialize tracing based on global flags
init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color);
// Hold the guard to keep the non-blocking writer alive for the process lifetime
let _tracing_guard = init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color);

let result = match cli.command {
Commands::Init {
Expand Down Expand Up @@ -424,7 +425,11 @@ async fn main() -> Result<()> {
Ok(())
}

fn init_tracing(log_level: &str, log_format: &str, no_color: bool) {
fn init_tracing(
log_level: &str,
log_format: &str,
no_color: bool,
) -> tracing_appender::non_blocking::WorkerGuard {
let level = match log_level.to_lowercase().as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
Expand All @@ -437,16 +442,22 @@ fn init_tracing(log_level: &str, log_format: &str, no_color: bool) {
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level.to_string()));

// Use non-blocking writer to avoid blocking the async runtime on log I/O
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let subscriber = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(true)
.with_thread_ids(true)
.with_ansi(!no_color);
.with_ansi(!no_color)
.with_writer(non_blocking);

match log_format {
"json" => subscriber.json().init(),
_ => subscriber.init(),
}

guard
}

// =============================================================================
Expand Down Expand Up @@ -554,7 +565,7 @@ fn cmd_init(
data_dir: data_dir.clone(),
genesis_path: Some(genesis_path.clone()),
car_interval_ms: 100,
max_batch_txs: 100,
max_batch_txs: 500,
max_batch_bytes: 1024 * 1024,
rpc_enabled: true,
rpc_http_port: cipherd::DEFAULT_RPC_HTTP_PORT,
Expand Down Expand Up @@ -968,7 +979,7 @@ fn cmd_testnet_init_files(
data_dir: data_dir.clone(),
genesis_path: Some(genesis_path.clone()),
car_interval_ms: 100,
max_batch_txs: 100,
max_batch_txs: 500,
max_batch_bytes: 1024 * 1024,
rpc_enabled: true,
// Each validator gets HTTP and WS ports spaced by 10 to avoid conflicts
Expand Down
12 changes: 7 additions & 5 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use cipherbft_metrics;

Expand Down Expand Up @@ -703,7 +703,9 @@ impl Node {
// to ensure batches are flushed before Cars are created. Without this, there's a race
// condition where Primary creates empty Cars before Worker flushes pending batches.
let worker_config = WorkerConfig::new(self.validator_id, worker_id)
.with_flush_interval(std::time::Duration::from_millis(50));
.with_flush_interval(std::time::Duration::from_millis(50))
.with_max_batch_txs(self.config.max_batch_txs)
.with_max_batch_bytes(self.config.max_batch_bytes);
let mut worker_handle = Worker::spawn_with_storage(
worker_config,
Box::new(worker_network),
Expand Down Expand Up @@ -754,12 +756,12 @@ impl Node {
}
} => {
if let Some(tx_bytes) = tx {
info!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len());
trace!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len());
if worker_handle.submit_transaction(tx_bytes).await.is_err() {
warn!("Worker {} submit_transaction failed", worker_id);
// Don't break - continue processing other messages
} else {
info!("Worker {} forwarded transaction to batch maker", worker_id);
trace!("Worker {} forwarded transaction to batch maker", worker_id);
}
}
}
Expand All @@ -784,7 +786,7 @@ impl Node {
msg = worker_handle.recv_from_worker() => {
match msg {
Some(m) => {
info!("Worker {} bridge forwarding {:?} to Primary", worker_id, m);
debug!("Worker {} bridge forwarding {:?} to Primary", worker_id, m);
if primary_worker_sender.send(m).await.is_err() {
warn!("Worker {} send to primary failed", worker_id);
break;
Expand Down
8 changes: 4 additions & 4 deletions crates/rpc/src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};

use cipherbft_execution::precompiles::{CipherBftPrecompileProvider, StakingPrecompile};
use cipherbft_execution::AccountProof;
Expand Down Expand Up @@ -1573,7 +1573,7 @@ impl ChannelMempoolApi {
#[async_trait]
impl MempoolApi for ChannelMempoolApi {
async fn submit_transaction(&self, tx_bytes: Bytes) -> RpcResult<B256> {
info!(
trace!(
"ChannelMempoolApi::submit_transaction received {} bytes (chain_id={})",
tx_bytes.len(),
self.chain_id
Expand Down Expand Up @@ -1616,7 +1616,7 @@ impl MempoolApi for ChannelMempoolApi {
}

// Forward to worker via channel
info!(
trace!(
"Sending transaction {} to worker channel (capacity: {})",
tx_hash,
self.tx_sender.capacity()
Expand All @@ -1626,7 +1626,7 @@ impl MempoolApi for ChannelMempoolApi {
RpcError::Execution("Transaction submission failed: worker channel closed".to_string())
})?;

info!(
trace!(
"Transaction {} sent to worker channel ({} bytes)",
tx_hash,
tx_bytes.len()
Expand Down