diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b46dc42d..09308df8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ - NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)). - Removed gRPC reflection v1-alpha support ([#1795](https://github.com/0xMiden/node/pull/1795)). - [BREAKING] Rust requirement bumped from `v1.91` to `v1.93` ([#1803](https://github.com/0xMiden/node/pull/1803)). -- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809)). +- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809), ([#1851](https://github.com/0xMiden/node/pull/1851))). ### Fixes diff --git a/bin/stress-test/src/store/mod.rs b/bin/stress-test/src/store/mod.rs index 3b2b5b1e9..e65ee1fee 100644 --- a/bin/stress-test/src/store/mod.rs +++ b/bin/stress-test/src/store/mod.rs @@ -149,12 +149,18 @@ pub async fn bench_sync_nullifiers( let resp_block_range = response.block_range.expect("block_range should exist"); let resp_chain_tip = resp_block_range.block_to.expect("block_to should exist"); - if response.notes.is_empty() { + if response.blocks.is_empty() { break; } - // Get the notes nullifiers, limiting to 20 notes maximum - let note_ids: Vec<_> = response.notes.iter().map(|n| n.note_id.unwrap()).collect(); + // Collect note IDs from all blocks in the response. + let note_ids: Vec<_> = response + .blocks + .iter() + .flat_map(|b| b.notes.iter().map(|n| n.note_id.unwrap())) + .collect(); + + // Get the notes nullifiers, limiting to 20 notes maximum. let note_ids_to_fetch: Vec<_> = note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect(); if !note_ids_to_fetch.is_empty() { @@ -172,8 +178,10 @@ pub async fn bench_sync_nullifiers( })); } - // The notes all come from the same block; use the block header to find it. - current_block_num = response.block_header.map_or(resp_chain_tip, |h| h.block_num); + // Advance past the last block in the response. + let last_block = response.blocks.last().unwrap(); + current_block_num = + last_block.block_header.as_ref().map_or(resp_chain_tip, |h| h.block_num); if current_block_num >= resp_chain_tip { break; } diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index c58568cbe..8f2b8e34a 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -16,7 +16,6 @@ use miden_node_proto::generated::{self as proto}; use miden_node_proto::try_convert; use miden_node_utils::ErrorReport; use miden_node_utils::limiter::{ - MAX_RESPONSE_PAYLOAD_BYTES, QueryParamAccountIdLimit, QueryParamLimiter, QueryParamNoteIdLimit, @@ -41,17 +40,6 @@ use url::Url; use crate::COMPONENT; -/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes. -/// -/// `BlockHeader` (~341 bytes) + MMR proof with 32 siblings (~1216 bytes). -const BLOCK_OVERHEAD_BYTES: usize = 1600; - -/// Estimated byte size of a single [`NoteSyncRecord`]. -/// -/// Note ID (~38 bytes) + index + metadata (~26 bytes) + sparse merkle path with 16 -/// siblings (~608 bytes). -const NOTE_RECORD_BYTES: usize = 700; - // RPC SERVICE // ================================================================================================ @@ -271,86 +259,9 @@ impl api_server::Api for RpcService { ) -> Result, Status> { debug!(target: COMPONENT, request = ?request.get_ref()); - let request = request.into_inner(); - check::(request.note_tags.len())?; - - let block_range = request - .block_range - .ok_or_else(|| Status::invalid_argument("missing block_range"))?; - let note_tags = request.note_tags; + check::(request.get_ref().note_tags.len())?; - let mut sync_blocks = Vec::new(); - let mut accumulated_size: usize = 0; - let mut current_block_from = block_range.block_from; - // The upper bound for the response: the requested block_to if specified, - // otherwise updated to the chain tip from the store on each iteration. - let mut response_block_to = block_range.block_to.unwrap_or(0); - - loop { - let store_request = proto::rpc::SyncNotesRequest { - block_range: Some(proto::rpc::BlockRange { - block_from: current_block_from, - block_to: block_range.block_to, - }), - note_tags: note_tags.clone(), - }; - - let store_response = - self.store.clone().sync_notes(store_request.into_request()).await?.into_inner(); - - // When block_to was not specified, use the chain tip from the store. - if block_range.block_to.is_none() { - let store_chain_tip = store_response - .block_range - .ok_or_else(|| Status::internal("store response missing block_range"))? - .block_to - .ok_or_else(|| Status::internal("store response missing block_to"))?; - response_block_to = store_chain_tip; - } - - // No notes means we've reached the end of the range without more matches. - if store_response.notes.is_empty() { - break; - } - - let sync_block = proto::rpc::sync_notes_response::NoteSyncBlock { - block_header: store_response.block_header, - mmr_path: store_response.mmr_path, - notes: store_response.notes, - }; - - accumulated_size += BLOCK_OVERHEAD_BYTES + sync_block.notes.len() * NOTE_RECORD_BYTES; - - // If we exceed the budget, drop this block and stop. - if accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES { - break; - } - - // The block number of the returned notes, used to advance the cursor. - let notes_block_num = sync_block - .block_header - .as_ref() - .ok_or_else(|| Status::internal("store response missing block_header"))? - .block_num; - sync_blocks.push(sync_block); - - // Check if we've reached the end of the requested range. - if notes_block_num >= response_block_to { - break; - } - - // Advance the cursor. The store query uses `committed_at > block_range.start()` - // (exclusive), so setting block_from to the current block is sufficient to skip it. - current_block_from = notes_block_num; - } - - Ok(Response::new(proto::rpc::SyncNotesResponse { - block_range: Some(proto::rpc::BlockRange { - block_from: block_range.block_from, - block_to: Some(response_block_to), - }), - blocks: sync_blocks, - })) + self.store.clone().sync_notes(request).await } async fn get_notes_by_id( diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index af230b05e..d47fbca7d 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -511,7 +511,7 @@ impl Db { &self, block_range: RangeInclusive, note_tags: Vec, - ) -> Result { + ) -> Result, NoteSyncError> { self.transact("notes sync task", move |conn| { queries::get_note_sync(conn, note_tags.as_slice(), block_range) }) diff --git a/crates/store/src/db/models/queries/notes.rs b/crates/store/src/db/models/queries/notes.rs index 0a99f1d41..933df6886 100644 --- a/crates/store/src/db/models/queries/notes.rs +++ b/crates/store/src/db/models/queries/notes.rs @@ -518,16 +518,19 @@ pub(crate) fn get_note_sync( conn: &mut SqliteConnection, note_tags: &[u32], block_range: RangeInclusive, -) -> Result { +) -> Result, NoteSyncError> { QueryParamNoteTagLimit::check(note_tags.len()).map_err(DatabaseError::from)?; - let (notes, _last_included_block) = - select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?; + let (notes, _) = select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?; + + if notes.is_empty() { + return Ok(None); + } let block_header = select_block_header_by_block_num(conn, notes.first().map(|note| note.block_num))? .ok_or(NoteSyncError::EmptyBlockHeadersTable)?; - Ok(NoteSyncUpdate { notes, block_header }) + Ok(Some(NoteSyncUpdate { notes, block_header })) } #[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)] diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index 42bb57a39..75a68db2f 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -956,23 +956,21 @@ fn note_sync_across_multiple_blocks() { queries::insert_notes(conn, &[(note, None)]).unwrap(); } - // Simulate the RPC batching loop: repeatedly query with advancing block_from. - let full_range = BlockNumber::GENESIS..=BlockNumber::from(3); + // Simulate the store batching loop: repeatedly call get_note_sync with advancing + // block_from, same as `State::sync_notes` does. let mut collected_block_nums = Vec::new(); - let mut current_from = *full_range.start(); + let mut current_from = BlockNumber::GENESIS; + let end = BlockNumber::from(3); loop { - let range = current_from..=*full_range.end(); - let (notes, _last_included) = - queries::select_notes_since_block_by_tag_and_sender(conn, &[], &[tag], range).unwrap(); - - if notes.is_empty() { + let range = current_from..=end; + let Some(update) = queries::get_note_sync(conn, &[tag], range).unwrap() else { break; - } + }; // All notes in a single response come from the same block. - let block_num = notes[0].block_num; - assert!(notes.iter().all(|n| n.block_num == block_num)); + let block_num = update.block_header.block_num(); + assert!(update.notes.iter().all(|n| n.block_num == block_num)); collected_block_nums.push(block_num); // The query uses `committed_at > block_range.start()` (exclusive), so @@ -1019,13 +1017,10 @@ fn note_sync_no_matching_tags() { queries::insert_scripts(conn, [¬e]).unwrap(); queries::insert_notes(conn, &[(note, None)]).unwrap(); - // Query with a different tag — should return no notes. + // Query with a different tag — should return None. let range = BlockNumber::GENESIS..=BlockNumber::from(1); - let (notes, last_included) = - queries::select_notes_since_block_by_tag_and_sender(conn, &[], &[999], range).unwrap(); - - assert!(notes.is_empty()); - assert_eq!(last_included, BlockNumber::from(1)); + let result = queries::get_note_sync(conn, &[999], range).unwrap(); + assert!(result.is_none()); } fn insert_account_delta( diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index 83ddef880..1a22fe463 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -126,38 +126,38 @@ impl rpc_server::Rpc for StoreApi { async fn sync_notes( &self, request: Request, - ) -> Result, Status> { + ) -> Result, Status> { let request = request.into_inner(); let chain_tip = self.state.latest_block_num().await; + let requested_block_to = request.block_range.as_ref().and_then(|r| r.block_to); let block_range = read_block_range::(request.block_range, "SyncNotesRequest")? .into_inclusive_range::(&chain_tip)?; let block_from = block_range.start().as_u32(); + let response_block_to = requested_block_to.unwrap_or(chain_tip.as_u32()); // Validate note tags count check::(request.note_tags.len())?; - let result = self.state.sync_notes(request.note_tags, block_range).await?; + let results = self.state.sync_notes(request.note_tags, block_range).await?; - let (block_header, mmr_path, notes) = match result { - Some((state, mmr_proof)) => ( - Some(state.block_header.into()), - Some(mmr_proof.merkle_path().clone().into()), - state.notes.into_iter().map(Into::into).collect(), - ), - None => (None, None, Vec::new()), - }; + let blocks = results + .into_iter() + .map(|(state, mmr_proof)| proto::rpc::sync_notes_response::NoteSyncBlock { + block_header: Some(state.block_header.into()), + mmr_path: Some(mmr_proof.merkle_path().clone().into()), + notes: state.notes.into_iter().map(Into::into).collect(), + }) + .collect(); - Ok(Response::new(proto::store::StoreSyncNotesResponse { + Ok(Response::new(proto::rpc::SyncNotesResponse { block_range: Some(proto::rpc::BlockRange { block_from, - block_to: Some(chain_tip.as_u32()), + block_to: Some(response_block_to), }), - block_header, - mmr_path, - notes, + blocks, })) } diff --git a/crates/store/src/state/sync_state.rs b/crates/store/src/state/sync_state.rs index 92e92224e..25aaa6524 100644 --- a/crates/store/src/state/sync_state.rs +++ b/crates/store/src/state/sync_state.rs @@ -1,5 +1,6 @@ use std::ops::RangeInclusive; +use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::crypto::merkle::mmr::{Forest, MmrDelta, MmrProof}; @@ -11,6 +12,17 @@ use crate::db::models::queries::StorageMapValuesPage; use crate::db::{AccountVaultValue, NoteSyncUpdate, NullifierInfo}; use crate::errors::{DatabaseError, NoteSyncError, StateSyncError}; +/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes. +/// +/// `BlockHeader` (~341 bytes) + MMR proof with 32 siblings (~1216 bytes). +const BLOCK_OVERHEAD_BYTES: usize = 1600; + +/// Estimated byte size of a single [`NoteSyncRecord`]. +/// +/// Note ID (~38 bytes) + index + metadata (~26 bytes) + sparse merkle path with 16 +/// siblings (~608 bytes). +const NOTE_RECORD_BYTES: usize = 700; + // STATE SYNCHRONIZATION ENDPOINTS // ================================================================================================ @@ -64,33 +76,48 @@ impl State { /// Loads data to synchronize a client's notes. /// - /// The client's request contains a list of tags, this method will return the first - /// block with a matching tag, or the chain tip. All the other values are filter based on this - /// block range. - /// - /// # Arguments - /// - /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the - /// first block containing a matching note. - /// - `block_range`: The range of blocks from which to synchronize notes. + /// Returns as many blocks with matching notes as fit within the response payload + /// limit. Each block includes its header and MMR proof at `block_range.end()`. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn sync_notes( &self, note_tags: Vec, block_range: RangeInclusive, - ) -> Result, NoteSyncError> { + ) -> Result, NoteSyncError> { let inner = self.inner.read().await; let checkpoint = *block_range.end(); - let note_sync = self.db.get_note_sync(block_range, note_tags).await?; + let mut results = Vec::new(); + let mut accumulated_size: usize = 0; + let mut current_from = *block_range.start(); - if note_sync.notes.is_empty() { - return Ok(None); - } + loop { + let range = current_from..=checkpoint; + let Some(note_sync) = self.db.get_note_sync(range, note_tags.clone()).await? else { + break; + }; + + accumulated_size += BLOCK_OVERHEAD_BYTES + note_sync.notes.len() * NOTE_RECORD_BYTES; - let mmr_proof = inner.blockchain.open_at(note_sync.block_header.block_num(), checkpoint)?; + if accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES { + break; + } + + let block_num = note_sync.block_header.block_num(); + + if block_num >= checkpoint { + break; + } + + let mmr_proof = inner.blockchain.open_at(block_num, checkpoint)?; + results.push((note_sync, mmr_proof)); + + // The DB query uses `committed_at > block_range.start()` (exclusive), + // so setting current_from to the found block is sufficient to skip it. + current_from = block_num; + } - Ok(Some((note_sync, mmr_proof))) + Ok(results) } pub async fn sync_nullifiers( diff --git a/proto/proto/internal/store.proto b/proto/proto/internal/store.proto index c3efa51b6..7de72ef0d 100644 --- a/proto/proto/internal/store.proto +++ b/proto/proto/internal/store.proto @@ -52,11 +52,10 @@ service Rpc { // Note that only 16-bit prefixes are supported at this time. rpc SyncNullifiers(rpc.SyncNullifiersRequest) returns (rpc.SyncNullifiersResponse) {} - // Returns the next block containing notes matching the requested tags within the given range. + // Returns blocks containing notes matching the requested tags within the given range. // - // This is the store-internal single-block variant. The external RPC endpoint batches multiple - // calls to this into a single multi-block response. - rpc SyncNotes(rpc.SyncNotesRequest) returns (StoreSyncNotesResponse) {} + // The response batches as many blocks as fit within the response payload limit. + rpc SyncNotes(rpc.SyncNotesRequest) returns (rpc.SyncNotesResponse) {} // Returns chain MMR updates within a block range. rpc SyncChainMmr(rpc.SyncChainMmrRequest) returns (rpc.SyncChainMmrResponse) {} @@ -71,31 +70,6 @@ service Rpc { rpc SyncTransactions(rpc.SyncTransactionsRequest) returns (rpc.SyncTransactionsResponse) {} } -// SYNC NOTES (STORE-INTERNAL) -// ================================================================================================ - -// Store-internal response for note sync (single block). -// -// This is the single-block response returned by the store. The external RPC endpoint -// batches multiple of these into a multi-block `SyncNotesResponse`. -message StoreSyncNotesResponse { - // The block range covered by this response. - // - // `block_from` matches the request's `block_range.block_from`. - // `block_to` is the chain tip at the time of the response. - rpc.BlockRange block_range = 1; - - // Block header of the block containing the matching notes. - blockchain.BlockHeader block_header = 2; - - // Merkle path to verify the block's inclusion in the MMR at the returned - // `block_range.block_to`. - primitives.MerklePath mmr_path = 3; - - // List of notes matching the specified criteria in this block. - repeated note.NoteSyncRecord notes = 4; -} - // BLOCK PRODUCER STORE API // ================================================================================================