Skip to content

Commit

Permalink
Remove Box for RPC pubsub subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 authored and solana-grimes committed Feb 11, 2019
1 parent d41dec9 commit 144d321
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 29 deletions.
37 changes: 12 additions & 25 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig};
use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error;
use crate::rpc_pubsub::RpcSubscriptions;
use crate::status_cache::StatusCache;
use bincode::deserialize;
use itertools::Itertools;
Expand Down Expand Up @@ -93,18 +94,6 @@ pub trait BankSubscriptions {
fn check_signature(&self, signature: &Signature, status: &Result<()>);
}

struct LocalSubscriptions {}
impl Default for LocalSubscriptions {
fn default() -> Self {
LocalSubscriptions {}
}
}

impl BankSubscriptions for LocalSubscriptions {
fn check_account(&self, _pubkey: &Pubkey, _account: &Account) {}
fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {}
}

type BankStatusCache = StatusCache<BankError>;

/// Manager for the state of all accounts and programs after processing its entries.
Expand All @@ -124,7 +113,7 @@ pub struct Bank {
/// processed by the bank
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,

subscriptions: RwLock<Box<Arc<BankSubscriptions + Send + Sync>>>,
subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,
}

impl Default for Bank {
Expand All @@ -135,7 +124,7 @@ impl Default for Bank {
status_cache: RwLock::new(BankStatusCache::default()),
confirmation_time: AtomicUsize::new(std::usize::MAX),
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
subscriptions: RwLock::new(None),
}
}
}
Expand All @@ -157,9 +146,9 @@ impl Bank {
Self::new_with_leader_scheduler_config(genesis_block, &LeaderSchedulerConfig::default())
}

pub fn set_subscriptions(&self, subscriptions: Box<Arc<BankSubscriptions + Send + Sync>>) {
pub fn set_subscriptions(&self, subscriptions: Arc<RpcSubscriptions>) {
let mut sub = self.subscriptions.write().unwrap();
*sub = subscriptions
*sub = Some(subscriptions)
}

pub fn copy_for_tpu(&self) -> Self {
Expand All @@ -171,7 +160,7 @@ impl Bank {
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
confirmation_time: AtomicUsize::new(self.confirmation_time()),
leader_scheduler: self.leader_scheduler.clone(),
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
subscriptions: RwLock::new(None),
}
}

Expand Down Expand Up @@ -352,10 +341,9 @@ impl Bank {

fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) {
for (i, tx) in txs.iter().enumerate() {
self.subscriptions
.read()
.unwrap()
.check_signature(&tx.signatures[0], &res[i]);
if let Some(ref subs) = *self.subscriptions.read().unwrap() {
subs.check_signature(&tx.signatures[0], &res[i]);
}
}
}
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
Expand Down Expand Up @@ -932,10 +920,9 @@ impl Bank {
let tx = &txs[i];
let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
self.subscriptions
.read()
.unwrap()
.check_account(&key, account);
if let Some(ref subs) = *self.subscriptions.read().unwrap() {
subs.check_account(&key, account)
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/rpc_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl PubSubService {
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
bank.set_subscriptions(Box::new(subscription.clone()));
bank.set_subscriptions(subscription.clone());
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let thread_hdl = Builder::new()
Expand Down Expand Up @@ -82,7 +82,7 @@ impl PubSubService {

pub fn set_bank(&self, bank: &Arc<Bank>) {
self.rpc_bank.write().unwrap().bank = bank.clone();
bank.set_subscriptions(Box::new(self.subscription.clone()));
bank.set_subscriptions(self.subscription.clone());
}

pub fn exit(&self) {
Expand Down Expand Up @@ -429,7 +429,7 @@ mod tests {
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
arc_bank.set_subscriptions(Box::new(subscription));
arc_bank.set_subscriptions(subscription);

// Test signature subscription
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
Expand Down Expand Up @@ -516,7 +516,7 @@ mod tests {
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
arc_bank.set_subscriptions(Box::new(subscription));
arc_bank.set_subscriptions(subscription);

let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.subscribe_to_account_updates(subscriber, contract_state.pubkey().to_string());
Expand Down

0 comments on commit 144d321

Please sign in to comment.