Skip to content

Commit

Permalink
Merge pull request #2584 from eqlabs/krisztian/rpc-0.8-remove-call-on…
Browse files Browse the repository at this point in the history
…-pending-error

fix(rpc/v08): remove `CALL_ON_PENDING` error and introduce `SUBSCRIPTION_BLOCK_ID`
  • Loading branch information
kkovaacs authored Feb 11, 2025
2 parents 4ec4853 + 9c3393f commit 6d59417
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 46 deletions.
4 changes: 0 additions & 4 deletions crates/rpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ pub enum ApplicationError {
InvalidSubscriptionID,
#[error("Too many addresses in filter sender_address filter")]
TooManyAddressesInFilter,
#[error("This method does not support being called on the pending block")]
CallOnPending,
/// Internal errors are errors whose details we don't want to show to the
/// end user. These are logged, and a simple "internal error" message is
/// shown to the end user.
Expand Down Expand Up @@ -169,7 +167,6 @@ impl ApplicationError {
// specs/rpc/starknet_ws_api.json
ApplicationError::InvalidSubscriptionID => 66,
ApplicationError::TooManyAddressesInFilter => 67,
ApplicationError::CallOnPending => 69,
// https://www.jsonrpc.org/specification#error_object
ApplicationError::GatewayError(_)
| ApplicationError::Internal(_)
Expand Down Expand Up @@ -232,7 +229,6 @@ impl ApplicationError {
ApplicationError::UnsupportedContractClassVersion => None,
ApplicationError::InvalidSubscriptionID => None,
ApplicationError::TooManyAddressesInFilter => None,
ApplicationError::CallOnPending => None,
ApplicationError::GatewayError(error) => Some(json!({
"error": error,
})),
Expand Down
28 changes: 13 additions & 15 deletions crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use axum::extract::ws::{Message, WebSocket};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use pathfinder_common::{BlockId, BlockNumber};
use pathfinder_common::BlockNumber;
use serde_json::value::RawValue;
use tokio::sync::{mpsc, RwLock};
use tracing::Instrument;
Expand All @@ -13,6 +13,7 @@ use crate::context::RpcContext;
use crate::dto::{DeserializeForVersion, SerializeForVersion};
use crate::error::ApplicationError;
use crate::jsonrpc::{RpcError, RpcRequest, RpcResponse};
use crate::types::request::SubscriptionBlockId;
use crate::{RpcVersion, SubscriptionId};

/// See [`RpcSubscriptionFlow`].
Expand Down Expand Up @@ -74,8 +75,8 @@ pub trait RpcSubscriptionFlow: Send + Sync {

/// The block to start streaming from. If the subscription endpoint does not
/// support catching up, leave this method unimplemented.
fn starting_block(_params: &Self::Params) -> BlockId {
BlockId::Latest
fn starting_block(_params: &Self::Params) -> SubscriptionBlockId {
SubscriptionBlockId::Latest
}

/// Fetch historical data from the `from` block to the `to` block. The
Expand Down Expand Up @@ -163,17 +164,13 @@ where
let first_block = T::starting_block(&params);

let mut current_block = match first_block {
BlockId::Pending => {
return Err(RpcError::ApplicationError(ApplicationError::CallOnPending));
}
BlockId::Latest => {
SubscriptionBlockId::Latest => {
// No need to catch up. The code below will subscribe to new blocks.
None
}
first_block @ (BlockId::Number(_) | BlockId::Hash(_)) => {
first_block @ (SubscriptionBlockId::Number(_) | SubscriptionBlockId::Hash(_)) => {
// Load the first block number, return an error if it's invalid.
let first_block = pathfinder_storage::BlockId::try_from(first_block)
.map_err(|e| RpcError::InvalidParams(e.to_string()))?;
let first_block = pathfinder_storage::BlockId::from(first_block);
let storage = router.context.storage.clone();
let current_block = util::task::spawn_blocking(move |_| -> Result<_, RpcError> {
let mut conn = storage.connection().map_err(RpcError::InternalError)?;
Expand Down Expand Up @@ -800,7 +797,7 @@ mod tests {

use axum::async_trait;
use axum::extract::ws::Message;
use pathfinder_common::{BlockHash, BlockHeader, BlockId, BlockNumber, ChainId};
use pathfinder_common::{BlockHash, BlockHeader, BlockNumber, ChainId};
use pathfinder_crypto::Felt;
use pathfinder_ethereum::EthereumClient;
use pathfinder_storage::StorageBuilder;
Expand All @@ -818,6 +815,7 @@ mod tests {
SubscriptionMessage,
};
use crate::pending::PendingWatcher;
use crate::types::request::SubscriptionBlockId;
use crate::types::syncing::Syncing;
use crate::{Notifications, SyncState};

Expand All @@ -830,8 +828,8 @@ mod tests {
type Params = Params;
type Notification = serde_json::Value;

fn starting_block(_params: &Self::Params) -> BlockId {
BlockId::Number(BlockNumber::GENESIS)
fn starting_block(_params: &Self::Params) -> SubscriptionBlockId {
SubscriptionBlockId::Number(BlockNumber::GENESIS)
}

async fn catch_up(
Expand Down Expand Up @@ -907,8 +905,8 @@ mod tests {
type Params = Params;
type Notification = serde_json::Value;

fn starting_block(_params: &Self::Params) -> BlockId {
BlockId::Number(BlockNumber::GENESIS)
fn starting_block(_params: &Self::Params) -> SubscriptionBlockId {
SubscriptionBlockId::Number(BlockNumber::GENESIS)
}

async fn catch_up(
Expand Down
21 changes: 11 additions & 10 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use axum::async_trait;
use pathfinder_common::{BlockId, BlockNumber, ContractAddress, EventKey};
use pathfinder_common::{BlockNumber, ContractAddress, EventKey};
use pathfinder_storage::{AGGREGATE_BLOOM_BLOCK_RANGE_LEN, EVENT_KEY_FILTER_LIMIT};
use tokio::sync::mpsc;

Expand All @@ -10,6 +10,7 @@ use crate::context::RpcContext;
use crate::error::ApplicationError;
use crate::jsonrpc::{CatchUp, RpcError, RpcSubscriptionFlow, SubscriptionMessage};
use crate::method::get_events::EmittedEvent;
use crate::types::request::SubscriptionBlockId;
use crate::Reorg;

pub struct SubscribeEvents;
Expand All @@ -18,7 +19,7 @@ pub struct SubscribeEvents;
pub struct Params {
from_address: Option<ContractAddress>,
keys: Option<Vec<Vec<EventKey>>>,
block_id: Option<BlockId>,
block_id: Option<SubscriptionBlockId>,
}

impl crate::dto::DeserializeForVersion for Option<Params> {
Expand All @@ -35,7 +36,7 @@ impl crate::dto::DeserializeForVersion for Option<Params> {
keys: value.deserialize_optional_array("keys", |value| {
value.deserialize_array(|value| Ok(EventKey(value.deserialize()?)))
})?,
block_id: value.deserialize_optional_serde("block_id")?,
block_id: value.deserialize_optional("block_id")?,
}))
})
}
Expand Down Expand Up @@ -69,9 +70,6 @@ impl RpcSubscriptionFlow for SubscribeEvents {

fn validate_params(params: &Self::Params) -> Result<(), RpcError> {
if let Some(params) = params {
if let Some(BlockId::Pending) = params.block_id {
return Err(RpcError::ApplicationError(ApplicationError::CallOnPending));
}
if let Some(keys) = &params.keys {
if keys.len() > EVENT_KEY_FILTER_LIMIT {
return Err(RpcError::ApplicationError(
Expand All @@ -86,11 +84,11 @@ impl RpcSubscriptionFlow for SubscribeEvents {
Ok(())
}

fn starting_block(params: &Self::Params) -> BlockId {
fn starting_block(params: &Self::Params) -> SubscriptionBlockId {
params
.as_ref()
.and_then(|req| req.block_id)
.unwrap_or(BlockId::Latest)
.unwrap_or(SubscriptionBlockId::Latest)
}

async fn catch_up(
Expand Down Expand Up @@ -684,8 +682,11 @@ mod tests {
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": 69,
"message": "This method does not support being called on the pending block"
"code": -32602,
"message": "Invalid params",
"data": {
"reason": "Invalid block id"
}
}
})
);
Expand Down
28 changes: 11 additions & 17 deletions crates/rpc/src/method/subscribe_new_heads.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use std::sync::Arc;

use axum::async_trait;
use pathfinder_common::{BlockId, BlockNumber};
use pathfinder_common::BlockNumber;
use tokio::sync::mpsc;

use super::REORG_SUBSCRIPTION_NAME;
use crate::context::RpcContext;
use crate::error::ApplicationError;
use crate::jsonrpc::{CatchUp, RpcError, RpcSubscriptionFlow, SubscriptionMessage};
use crate::types::request::SubscriptionBlockId;
use crate::Reorg;

pub struct SubscribeNewHeads;

#[derive(Debug, Clone)]
pub struct Params {
block_id: Option<BlockId>,
block_id: Option<SubscriptionBlockId>,
}

impl crate::dto::DeserializeForVersion for Option<Params> {
Expand All @@ -25,7 +25,7 @@ impl crate::dto::DeserializeForVersion for Option<Params> {
}
value.deserialize_map(|value| {
Ok(Some(Params {
block_id: value.deserialize_optional_serde("block_id")?,
block_id: value.deserialize_optional("block_id")?,
}))
})
}
Expand Down Expand Up @@ -56,20 +56,11 @@ impl RpcSubscriptionFlow for SubscribeNewHeads {
type Params = Option<Params>;
type Notification = Notification;

fn validate_params(params: &Self::Params) -> Result<(), RpcError> {
if let Some(params) = params {
if let Some(BlockId::Pending) = params.block_id {
return Err(RpcError::ApplicationError(ApplicationError::CallOnPending));
}
}
Ok(())
}

fn starting_block(params: &Self::Params) -> BlockId {
fn starting_block(params: &Self::Params) -> SubscriptionBlockId {
params
.as_ref()
.and_then(|req| req.block_id)
.unwrap_or(BlockId::Latest)
.unwrap_or(SubscriptionBlockId::Latest)
}

async fn catch_up(
Expand Down Expand Up @@ -503,8 +494,11 @@ mod tests {
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": 69,
"message": "This method does not support being called on the pending block"
"code": -32602,
"message": "Invalid params",
"data": {
"reason": "Invalid block id"
}
}
})
);
Expand Down
84 changes: 84 additions & 0 deletions crates/rpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ impl crate::dto::DeserializeForVersion for DataAvailabilityMode {
pub mod request {
use pathfinder_common::{
AccountDeploymentDataElem,
BlockHash,
BlockNumber,
CallParam,
CasmHash,
ChainId,
Expand All @@ -178,6 +180,55 @@ pub mod request {

use crate::dto::U64Hex;

/// A way of identifying a block in a subscription request.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SubscriptionBlockId {
Number(BlockNumber),
Hash(BlockHash),
Latest,
}

impl From<SubscriptionBlockId> for pathfinder_storage::BlockId {
fn from(value: SubscriptionBlockId) -> Self {
match value {
SubscriptionBlockId::Number(block_number) => {
pathfinder_storage::BlockId::Number(block_number)
}
SubscriptionBlockId::Hash(block_hash) => {
pathfinder_storage::BlockId::Hash(block_hash)
}
SubscriptionBlockId::Latest => pathfinder_storage::BlockId::Latest,
}
}
}

impl crate::dto::DeserializeForVersion for SubscriptionBlockId {
fn deserialize(value: crate::dto::Value) -> Result<Self, serde_json::Error> {
if value.is_string() {
let value: String = value.deserialize()?;
match value.as_str() {
"latest" => Ok(Self::Latest),
_ => Err(serde_json::Error::custom("Invalid block id")),
}
} else {
value.deserialize_map(|value| {
if value.contains_key("block_number") {
Ok(Self::Number(
pathfinder_common::BlockNumber::new(value.deserialize("block_number")?)
.ok_or_else(|| serde_json::Error::custom("Invalid block number"))?,
))
} else if value.contains_key("block_hash") {
Ok(Self::Hash(pathfinder_common::BlockHash(
value.deserialize("block_hash")?,
)))
} else {
Err(serde_json::Error::custom("Invalid block id"))
}
})
}
}
}

/// "Broadcasted" L2 transaction in requests the RPC API.
///
/// "Broadcasted" transactions represent the data required to submit a new
Expand Down Expand Up @@ -1403,6 +1454,7 @@ pub mod request {
use pathfinder_common::macro_prelude::*;
use pathfinder_common::{felt, ResourceAmount, ResourcePricePerUnit};
use pretty_assertions_sorted::assert_eq;
use serde_json::json;

use super::super::*;
use crate::dto::DeserializeForVersion;
Expand All @@ -1417,6 +1469,38 @@ pub mod request {
SierraEntryPoints,
};

#[rstest::rstest]
#[case::number(json!({"block_number": 1}), SubscriptionBlockId::Number(BlockNumber::new_or_panic(1)))]
#[case::hash(json!({"block_hash": "0xdeadbeef"}), SubscriptionBlockId::Hash(block_hash!("0xdeadbeef")))]
#[case::latest(json!("latest"), SubscriptionBlockId::Latest)]
#[test]
fn subscription_block_id(
#[case] input: serde_json::Value,
#[case] expected: SubscriptionBlockId,
) {
assert_eq!(
SubscriptionBlockId::deserialize(crate::dto::Value::new(
input,
crate::RpcVersion::V08
))
.unwrap(),
expected
);
}

#[test]
fn subscription_block_id_deserialization_failure() {
assert_eq!(
SubscriptionBlockId::deserialize(crate::dto::Value::new(
json!("pending"),
crate::RpcVersion::V08
))
.unwrap_err()
.to_string(),
"Invalid block id"
);
}

#[test]
fn broadcasted_transaction() {
let contract_class = CairoContractClass {
Expand Down

0 comments on commit 6d59417

Please sign in to comment.