Skip to content

Commit

Permalink
Add WebSocket RPC methods: subscribeNewHeads, unsubscribe (#634)
Browse files Browse the repository at this point in the history
* Do interval block creation via request
  • Loading branch information
FabijanC authored Nov 8, 2024
1 parent 6f86cf2 commit c5e3a61
Show file tree
Hide file tree
Showing 17 changed files with 973 additions and 133 deletions.
8 changes: 8 additions & 0 deletions crates/starknet-devnet-core/src/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ impl StarknetBlock {
}
}

pub fn create_empty_accepted() -> Self {
Self {
header: BlockHeader::default(),
transaction_hashes: vec![],
status: BlockStatus::AcceptedOnL2,
}
}

pub(crate) fn set_block_number(&mut self, block_number: u64) {
self.header.block_number = BlockNumber(block_number)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet-devnet-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ thiserror = { workspace = true }
anyhow = { workspace = true }
lazy_static = { workspace = true }
enum-helper-macros = { workspace = true }
rand = { workspace = true }

# devnet
starknet-core = { workspace = true }
starknet-types = { workspace = true }
starknet-rs-core = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
rand_chacha = { workspace = true }
regex_generate = { workspace = true }
serde_yaml = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::api::http::endpoints::DevnetConfig;

const DEFAULT_CONTINUATION_TOKEN: &str = "0";

/// here are the definitions and stub implementations of all JSON-RPC read endpoints
/// The definitions of JSON-RPC read endpoints defined in starknet_api_openrpc.json
impl JsonRpcHandler {
/// starknet_specVersion
pub fn spec_version(&self) -> StrictRpcResult {
Expand Down
111 changes: 111 additions & 0 deletions crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use starknet_core::error::Error;
use starknet_rs_core::types::{BlockId, BlockTag};
use starknet_types::starknet_api::block::BlockStatus;

use super::error::ApiError;
use super::models::{BlockIdInput, SubscriptionIdInput};
use super::{JsonRpcHandler, JsonRpcSubscriptionRequest};
use crate::rpc_core::request::Id;
use crate::subscribe::{SocketId, SubscriptionNotification};

/// The definitions of JSON-RPC read endpoints defined in starknet_ws_api.json
impl JsonRpcHandler {
pub async fn execute_ws(
&self,
request: JsonRpcSubscriptionRequest,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
match request {
JsonRpcSubscriptionRequest::NewHeads(data) => {
self.subscribe_new_heads(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::TransactionStatus => todo!(),
JsonRpcSubscriptionRequest::PendingTransactions => todo!(),
JsonRpcSubscriptionRequest::Events => todo!(),
JsonRpcSubscriptionRequest::Unsubscribe(SubscriptionIdInput { subscription_id }) => {
let mut sockets = self.api.sockets.lock().await;
let socket_context = sockets.get_mut(&socket_id).ok_or(
ApiError::StarknetDevnetError(Error::UnexpectedInternalError {
msg: format!("Unregistered socket ID: {socket_id}"),
}),
)?;

socket_context.unsubscribe(rpc_request_id, subscription_id).await?;
Ok(())
}
}
}

/// starknet_subscribeNewHeads
/// Checks if an optional block ID is provided. Validates that the block exists and is not too
/// many blocks in the past. If it is a valid block, the user is notified of all blocks from the
/// old up to the latest, and subscribed to new ones. If no block ID specified, the user is just
/// subscribed to new blocks.
pub async fn subscribe_new_heads(
&self,
block_id_input: Option<BlockIdInput>,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
let latest_tag = BlockId::Tag(BlockTag::Latest);
let block_id = if let Some(BlockIdInput { block_id }) = block_id_input {
block_id.into()
} else {
// if no block ID input, this eventually just subscribes the user to new blocks
latest_tag
};

let starknet = self.api.starknet.lock().await;

// checking the block's existence; aborted blocks treated as not found
let query_block = match starknet.get_block(&block_id) {
Ok(block) => match block.status() {
BlockStatus::Rejected => Err(ApiError::BlockNotFound),
_ => Ok(block),
},
Err(Error::NoBlock) => Err(ApiError::BlockNotFound),
Err(other) => Err(ApiError::StarknetDevnetError(other)),
}?;

let latest_block = starknet.get_block(&latest_tag)?;

let query_block_number = query_block.block_number().0;
let latest_block_number = latest_block.block_number().0;

let blocks_back_amount = if query_block_number > latest_block_number {
0
} else {
latest_block_number - query_block_number
};

if blocks_back_amount > 1024 {
return Err(ApiError::TooManyBlocksBack);
}

// perform the actual subscription
let mut sockets = self.api.sockets.lock().await;
let socket_context = sockets.get_mut(&socket_id).ok_or(ApiError::StarknetDevnetError(
Error::UnexpectedInternalError { msg: format!("Unregistered socket ID: {socket_id}") },
))?;
let subscription_id = socket_context.subscribe(rpc_request_id).await;

if let BlockId::Tag(_) = block_id {
// if the specified block ID is a tag (i.e. latest/pending), no old block handling
return Ok(());
}

// Notifying of old blocks. latest_block_number inclusive?
// Yes, only if block_id != latest/pending (handled above)
for block_n in query_block_number..=latest_block_number {
let old_block = starknet
.get_block(&BlockId::Number(block_n))
.map_err(ApiError::StarknetDevnetError)?;

let notification = SubscriptionNotification::NewHeadsNotification(old_block.into());
socket_context.notify(subscription_id, notification).await;
}

Ok(())
}
}
14 changes: 14 additions & 0 deletions crates/starknet-devnet-server/src/api/json_rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub enum ApiError {
HttpApiError(#[from] HttpApiError),
#[error("the compiled class hash did not match the one supplied in the transaction")]
CompiledClassHashMismatch,
#[error("Cannot go back more than 1024 blocks")]
TooManyBlocksBack,
#[error("Invalid subscription id")]
InvalidSubscriptionId,
}

impl ApiError {
Expand Down Expand Up @@ -205,6 +209,16 @@ impl ApiError {
data: None,
},
ApiError::HttpApiError(http_api_error) => http_api_error.http_api_error_to_rpc_error(),
ApiError::TooManyBlocksBack => RpcError {
code: crate::rpc_core::error::ErrorCode::ServerError(68),
message: error_message.into(),
data: None,
},
ApiError::InvalidSubscriptionId => RpcError {
code: crate::rpc_core::error::ErrorCode::ServerError(66),
message: error_message.into(),
data: None,
},
}
}
}
Expand Down
Loading

0 comments on commit c5e3a61

Please sign in to comment.