Skip to content

Commit

Permalink
Implement unsubscription TODOs [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
FabijanC committed Nov 4, 2024
1 parent cc05e4f commit 6c48ccb
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl JsonRpcHandler {
}),
)?;

socket_context.unsubscribe(subscription_id).await;
socket_context.unsubscribe(rpc_request_id, subscription_id).await;
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet-devnet-server/src/api/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,8 +1423,8 @@ mod response_tests {
use crate::api::json_rpc::ToRpcResponseResult;

#[test]
fn serializing_starknet_response_empty_variant_has_to_produce_empty_json_object_when_converted_to_rpc_result(
) {
fn serializing_starknet_response_empty_variant_has_to_produce_empty_json_object_when_converted_to_rpc_result()
{
assert_eq!(
r#"{"result":{}}"#,
serde_json::to_string(
Expand Down
30 changes: 23 additions & 7 deletions crates/starknet-devnet-server/src/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
use std::sync::Arc;

use axum::extract::ws::{Message, WebSocket};
use futures::stream::SplitSink;
Expand All @@ -7,6 +8,7 @@ use serde::{self, Serialize};
use starknet_types::rpc::block::BlockHeader;
use tokio::sync::Mutex;

use crate::rpc_core::error::{ErrorCode, RpcError};
use crate::rpc_core::request::Id;

pub type SocketId = u64;
Expand Down Expand Up @@ -59,6 +61,7 @@ impl SubscriptionNotification {
pub enum SubscriptionResponse {
Confirmation { rpc_request_id: Id, result: SubscriptionConfirmation },
Notification { subscription_id: SubscriptionId, data: SubscriptionNotification },
Invalid(RpcError),
}

impl SubscriptionResponse {
Expand All @@ -79,11 +82,20 @@ impl SubscriptionResponse {
}
})
}
SubscriptionResponse::Invalid(rpc_error) => serde_json::json!(rpc_error),
};

resp["jsonrpc"] = "2.0".into();
resp
}

fn invalid_subscription_id() -> Self {
Self::Invalid(RpcError {
code: ErrorCode::ServerError(66),
message: "Invalid subscription id".into(),
data: None,
})
}
}

pub struct SocketContext {
Expand Down Expand Up @@ -118,11 +130,15 @@ impl SocketContext {
subscription_id
}

pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) {
match self.subscriptions.remove(&subscription_id) {
Some(_) => todo!("return true"),
None => todo!("return INVALID_SUBSCRIPTION_ID"),
}
pub async fn unsubscribe(&mut self, rpc_request_id: Id, subscription_id: SubscriptionId) {
let resp = match self.subscriptions.remove(&subscription_id) {
Some(_) => SubscriptionResponse::Confirmation {
rpc_request_id,
result: SubscriptionConfirmation::UnsubscriptionConfirmation(true),
},
None => SubscriptionResponse::invalid_subscription_id(),
};
self.send(resp).await;
}

pub async fn notify(&self, subscription_id: SubscriptionId, data: SubscriptionNotification) {
Expand All @@ -137,7 +153,7 @@ impl SocketContext {
self.notify(*subscription_id, data.clone()).await;
}
}
other => println!("DEBUG unsupported subscription: {other:?}"),
other => panic!("DEBUG unsupported subscription: {other:?}"),
}
}
}
Expand Down

0 comments on commit 6c48ccb

Please sign in to comment.