Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

SecretStore: non-blocking wait of session completion #10303

Merged
merged 5 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 280 additions & 104 deletions secret-store/src/key_server.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use ethereum_types::{Address, H256};
use ethkey::Secret;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal};
use key_server_cluster::decryption_session::SessionImpl as DecryptionSession;
use key_server_cluster::signing_session_ecdsa::SessionImpl as EcdsaSigningSession;
use key_server_cluster::signing_session_schnorr::SessionImpl as SchnorrSigningSession;
Expand Down Expand Up @@ -87,8 +88,8 @@ struct SessionCore<T: SessionTransport> {
pub transport: T,
/// Session nonce.
pub nonce: u64,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<Option<(H256, NodeId)>>,
}

/// Mutable session data.
Expand Down Expand Up @@ -166,16 +167,17 @@ pub struct LargestSupportResultComputer;

impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new session.
pub fn new(params: SessionParams<T>) -> Self {
SessionImpl {
pub fn new(params: SessionParams<T>) -> (Self, Oneshot<Result<Option<(H256, NodeId)>, Error>>) {
let (completed, oneshot) = CompletionSignal::new();
(SessionImpl {
core: SessionCore {
meta: params.meta,
sub_session: params.sub_session,
key_share: params.key_share.clone(),
result_computer: params.result_computer,
transport: params.transport,
nonce: params.nonce,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
Expand All @@ -191,7 +193,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
continue_with: None,
failed_continue_with: None,
})
}
}, oneshot)
}

/// Return session meta.
Expand Down Expand Up @@ -221,10 +223,9 @@ impl<T> SessionImpl<T> where T: SessionTransport {
self.data.lock().failed_continue_with.take()
}

/// Wait for session completion.
pub fn wait(&self) -> Result<Option<(H256, NodeId)>, Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
/// Return session completion result (if available).
pub fn result(&self) -> Option<Result<Option<(H256, NodeId)>, Error>> {
self.data.lock().result.clone()
}

/// Retrieve common key data (author, threshold, public), if available.
Expand Down Expand Up @@ -344,7 +345,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// update state
data.state = SessionState::Finished;
data.result = Some(Ok(None));
self.core.completed.notify_all();
self.core.completed.send(Ok(None));

Ok(())
}
Expand Down Expand Up @@ -450,15 +451,18 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
}

let result = result.map(Some);
data.state = SessionState::Finished;
data.result = Some(result.map(Some));
core.completed.notify_all();
data.result = Some(result.clone());
core.completed.send(result);
}
}
}

impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
type Id = SessionIdWithSubSession;
type CreationData = ();
type SuccessfulResult = Option<(H256, NodeId)>;

fn type_name() -> &'static str {
"version negotiation"
Expand All @@ -482,7 +486,7 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
warn!(target: "secretstore_net", "{}: key version negotiation session failed with timeout", self.core.meta.self_node_id);

data.result = Some(Err(Error::ConsensusTemporaryUnreachable));
self.core.completed.notify_all();
self.core.completed.send(Err(Error::ConsensusTemporaryUnreachable));
}
}
}
Expand Down Expand Up @@ -510,8 +514,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
self.core.meta.self_node_id, error, node);

data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}

fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
Expand Down Expand Up @@ -698,7 +702,7 @@ mod tests {
cluster: cluster,
},
nonce: 0,
}),
}).0,
})
}).collect(),
queue: VecDeque::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use std::collections::btree_map::Entry;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use ethereum_types::H256;
use ethkey::{Public, Signature};
use key_server_cluster::{Error, NodeId, SessionId, KeyStorage};
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::message::{Message, ServersSetChangeMessage,
ConsensusMessageWithServersSet, InitializeConsensusSessionWithServersSet,
ServersSetChangeConsensusMessage, ConfirmConsensusInitialization, UnknownSessionsRequest, UnknownSessions,
Expand Down Expand Up @@ -93,8 +94,8 @@ struct SessionCore {
pub admin_public: Public,
/// Migration id (if this session is a part of auto-migration process).
pub migration_id: Option<H256>,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<()>,
}

/// Servers set change consensus session type.
Expand Down Expand Up @@ -182,8 +183,9 @@ struct ServersSetChangeKeyVersionNegotiationTransport {

impl SessionImpl {
/// Create new servers set change session.
pub fn new(params: SessionParams) -> Result<Self, Error> {
Ok(SessionImpl {
pub fn new(params: SessionParams) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
cluster: params.cluster,
Expand All @@ -192,7 +194,7 @@ impl SessionImpl {
all_nodes_set: params.all_nodes_set,
admin_public: params.admin_public,
migration_id: params.migration_id,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::EstablishingConsensus,
Expand All @@ -205,7 +207,7 @@ impl SessionImpl {
active_key_sessions: BTreeMap::new(),
result: None,
}),
})
}, oneshot))
}

/// Get session id.
Expand All @@ -218,10 +220,9 @@ impl SessionImpl {
self.core.migration_id.as_ref()
}

/// Wait for session completion.
pub fn wait(&self) -> Result<(), Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
/// Return session completion result (if available).
pub fn result(&self) -> Option<Result<(), Error>> {
self.data.lock().result.clone()
}

/// Initialize servers set change session on master node.
Expand Down Expand Up @@ -423,7 +424,7 @@ impl SessionImpl {
&KeyVersionNegotiationMessage::RequestKeyVersions(ref message) if sender == &self.core.meta.master_node_id => {
let key_id = message.session.clone().into();
let key_share = self.core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id.clone(),
self_node_id: self.core.meta.self_node_id.clone(),
Expand Down Expand Up @@ -671,7 +672,7 @@ impl SessionImpl {
}

data.state = SessionState::Finished;
self.core.completed.notify_all();
self.core.completed.send(Ok(()));

Ok(())
}
Expand Down Expand Up @@ -741,7 +742,7 @@ impl SessionImpl {
};

let key_share = core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id,
self_node_id: core.meta.self_node_id.clone(),
Expand Down Expand Up @@ -797,7 +798,8 @@ impl SessionImpl {
let negotiation_session = data.negotiation_sessions.remove(&key_id)
.expect("share change session is only initialized when negotiation is completed; qed");
let (selected_version, selected_master) = negotiation_session
.wait()?
.result()
.expect("share change session is only initialized when negotiation is completed; qed")?
.expect("initialize_share_change_session is only called on share change master; negotiation session completes with some on master; qed");
let selected_version_holders = negotiation_session.version_holders(&selected_version)?;
let selected_version_threshold = negotiation_session.common_key_data()?.threshold;
Expand Down Expand Up @@ -882,7 +884,7 @@ impl SessionImpl {

if data.result.is_some() && data.active_key_sessions.len() == 0 {
data.state = SessionState::Finished;
core.completed.notify_all();
core.completed.send(Ok(()));
}

Ok(())
Expand All @@ -907,14 +909,16 @@ impl SessionImpl {

data.state = SessionState::Finished;
data.result = Some(Ok(()));
core.completed.notify_all();
core.completed.send(Ok(()));

Ok(())
}
}

impl ClusterSession for SessionImpl {
type Id = SessionId;
type CreationData = (); // never used directly
type SuccessfulResult = ();

fn type_name() -> &'static str {
"servers set change"
Expand Down Expand Up @@ -954,8 +958,8 @@ impl ClusterSession for SessionImpl {
self.core.meta.self_node_id, error, node);

data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}

fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
Expand Down Expand Up @@ -1109,7 +1113,7 @@ pub mod tests {
nonce: 1,
admin_public: admin_public,
migration_id: None,
}).unwrap()
}).unwrap().0
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use ethereum_types::{H256, Address};
use ethkey::{Public, Secret, Signature};
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare, DocumentKeyShareVersion, KeyStorage};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::math;
use key_server_cluster::message::{Message, ShareAddMessage, ShareAddConsensusMessage, ConsensusMessageOfShareAdd,
InitializeConsensusSessionOfShareAdd, KeyShareCommon, NewKeysDissemination, ShareAddError,
Expand Down Expand Up @@ -71,8 +72,8 @@ struct SessionCore<T: SessionTransport> {
pub key_storage: Arc<KeyStorage>,
/// Administrator public key.
pub admin_public: Option<Public>,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<()>,
}

/// Share add consensus session type.
Expand Down Expand Up @@ -158,18 +159,18 @@ pub struct IsolatedSessionTransport {

impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new share addition session.
pub fn new(params: SessionParams<T>) -> Result<Self, Error> {
pub fn new(params: SessionParams<T>) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
let key_share = params.key_storage.get(&params.meta.id)?;

Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
nonce: params.nonce,
key_share: key_share,
transport: params.transport,
key_storage: params.key_storage,
admin_public: params.admin_public,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::ConsensusEstablishing,
Expand All @@ -181,7 +182,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
secret_subshares: None,
result: None,
}),
})
}, oneshot))
}

/// Set pre-established consensus data.
Expand Down Expand Up @@ -752,14 +753,16 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// signal session completion
data.state = SessionState::Finished;
data.result = Some(Ok(()));
core.completed.notify_all();
core.completed.send(Ok(()));

Ok(())
}
}

impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
type Id = SessionId;
type CreationData = (); // never used directly
type SuccessfulResult = ();

fn type_name() -> &'static str {
"share add"
Expand Down Expand Up @@ -801,8 +804,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
self.core.meta.self_node_id, error, node);

data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}

fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
Expand Down Expand Up @@ -914,7 +917,7 @@ pub mod tests {
key_storage,
admin_public: Some(admin_public),
nonce: 1,
}).unwrap()
}).unwrap().0
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl ShareChangeSession {
let consensus_group = self.consensus_group.take().ok_or(Error::InvalidStateForRequest)?;
let version_holders = self.version_holders.take().ok_or(Error::InvalidStateForRequest)?;
let new_nodes_map = self.new_nodes_map.take().ok_or(Error::InvalidStateForRequest)?;
let share_add_session = ShareAddSessionImpl::new(ShareAddSessionParams {
let (share_add_session, _) = ShareAddSessionImpl::new(ShareAddSessionParams {
meta: self.meta.clone(),
nonce: self.nonce,
transport: ShareChangeTransport::new(self.session_id, self.nonce, self.cluster.clone()),
Expand Down
Loading