Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
refactor stratum to remove retain cycle (#7827)
Browse files Browse the repository at this point in the history
* refactor stratum to remove retain cycle, fixed #7823

* fix tests
  • Loading branch information
debris authored Feb 7, 2018
1 parent b4ed51c commit f244ebe
Showing 1 changed file with 148 additions and 137 deletions.
285 changes: 148 additions & 137 deletions stratum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,68 +47,84 @@ use std::net::SocketAddr;
use std::collections::{HashSet, HashMap};
use hash::keccak;
use ethereum_types::H256;
use parking_lot::{RwLock, RwLockReadGuard};
use parking_lot::RwLock;

type RpcResult = Result<jsonrpc_core::Value, jsonrpc_core::Error>;

const NOTIFY_COUNTER_INITIAL: u32 = 16;

struct StratumRpc {
stratum: RwLock<Option<Arc<Stratum>>>,
/// Container which owns rpc server and stratum implementation
pub struct Stratum {
/// RPC server
///
/// It is an `Option` so it can be easily closed and released during `drop` phase
rpc_server: Option<JsonRpcServer>,
/// stratum protocol implementation
///
/// It is owned by a container and rpc server
implementation: Arc<StratumImpl>,
/// Message dispatcher (tcp/ip service)
///
/// Used to push messages to peers
tcp_dispatcher: Dispatcher,
}

impl StratumRpc {
fn subscribe(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.subscribe(params, meta)
}
impl Stratum {
pub fn start(
addr: &SocketAddr,
dispatcher: Arc<JobDispatcher>,
secret: Option<H256>,
) -> Result<Arc<Stratum>, Error> {

fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.authorize(params, meta)
}
let implementation = Arc::new(StratumImpl {
subscribers: RwLock::new(Vec::new()),
job_que: RwLock::new(HashSet::new()),
dispatcher,
workers: Arc::new(RwLock::new(HashMap::new())),
secret,
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
});

fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.submit(params, meta)
}
}
let mut delegate = IoDelegate::<StratumImpl, SocketMetadata>::new(implementation.clone());
delegate.add_method_with_meta("mining.subscribe", StratumImpl::subscribe);
delegate.add_method_with_meta("mining.authorize", StratumImpl::authorize);
delegate.add_method_with_meta("mining.submit", StratumImpl::submit);
let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
handler.extend_with(delegate);

#[derive(Clone)]
pub struct SocketMetadata {
addr: SocketAddr,
}
let server_builder = JsonRpcServerBuilder::new(handler);
let tcp_dispatcher = server_builder.dispatcher();
let server_builder = server_builder.session_meta_extractor(PeerMetaExtractor::new(tcp_dispatcher.clone()));
let server = server_builder.start(addr)?;

impl Default for SocketMetadata {
fn default() -> Self {
SocketMetadata { addr: "0.0.0.0:0".parse().unwrap() }
}
}
let stratum = Arc::new(Stratum {
rpc_server: Some(server),
implementation,
tcp_dispatcher,
});

impl SocketMetadata {
pub fn addr(&self) -> &SocketAddr {
&self.addr
Ok(stratum)
}
}

impl Metadata for SocketMetadata { }
impl PushWorkHandler for Stratum {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
self.implementation.push_work_all(payload, &self.tcp_dispatcher)
}

impl From<SocketAddr> for SocketMetadata {
fn from(addr: SocketAddr) -> SocketMetadata {
SocketMetadata { addr: addr }
fn push_work(&self, payloads: Vec<String>) -> Result<(), Error> {
self.implementation.push_work(payloads, &self.tcp_dispatcher)
}
}

pub struct PeerMetaExtractor;

impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
fn extract(&self, context: &RequestContext) -> SocketMetadata {
context.peer_addr.into()
impl Drop for Stratum {
fn drop(&mut self) {
// shut down rpc server
self.rpc_server.take().map(|server| server.close());
}
}

pub struct Stratum {
rpc_server: Option<JsonRpcServer>,
struct StratumImpl {
/// Subscribed clients
subscribers: RwLock<Vec<SocketAddr>>,
/// List of workers supposed to receive job update
Expand All @@ -121,84 +137,10 @@ pub struct Stratum {
secret: Option<H256>,
/// Dispatch notify couinter
notify_counter: RwLock<u32>,
/// Message dispatcher (tcp/ip service)
tcp_dispatcher: Dispatcher,
}

impl Drop for Stratum {
fn drop(&mut self) {
self.rpc_server.take().map(|server| server.close());
}
}

impl Stratum {
pub fn start(
addr: &SocketAddr,
dispatcher: Arc<JobDispatcher>,
secret: Option<H256>,
) -> Result<Arc<Stratum>, Error> {

let rpc = Arc::new(StratumRpc {
stratum: RwLock::new(None),
});
let mut delegate = IoDelegate::<StratumRpc, SocketMetadata>::new(rpc.clone());
delegate.add_method_with_meta("mining.subscribe", StratumRpc::subscribe);
delegate.add_method_with_meta("mining.authorize", StratumRpc::authorize);
delegate.add_method_with_meta("mining.submit", StratumRpc::submit);
let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
handler.extend_with(delegate);

let server = JsonRpcServerBuilder::new(handler)
.session_meta_extractor(PeerMetaExtractor);
let tcp_dispatcher = server.dispatcher();
let server = server.start(addr)?;

let stratum = Arc::new(Stratum {
tcp_dispatcher: tcp_dispatcher,
rpc_server: Some(server),
subscribers: RwLock::new(Vec::new()),
job_que: RwLock::new(HashSet::new()),
dispatcher: dispatcher,
workers: Arc::new(RwLock::new(HashMap::new())),
secret: secret,
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
});
*rpc.stratum.write() = Some(stratum.clone());
Ok(stratum)
}

fn update_peers(&self) {
if let Some(job) = self.dispatcher.job() {
if let Err(e) = self.push_work_all(job) {
warn!("Failed to update some of the peers: {:?}", e);
}
}
}

fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult {
Ok(match params {
Params::Array(vals) => {
// first two elements are service messages (worker_id & job_id)
match self.dispatcher.submit(vals.iter().skip(2)
.filter_map(|val| match val { &Value::String(ref str) => Some(str.to_owned()), _ => None })
.collect::<Vec<String>>()) {
Ok(()) => {
self.update_peers();
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
}
},
_ => {
trace!(target: "stratum", "Invalid submit work format {:?}", params);
to_value(false)
}
}.expect("Only true/false is returned and it's always serializable; qed"))
}

impl StratumImpl {
/// rpc method `mining.subscribe`
fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult {
use std::str::FromStr;

Expand All @@ -218,6 +160,7 @@ impl Stratum {
}.expect("Empty slices are serializable; qed"))
}

/// rpc method `mining.authorize`
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
params.parse::<(String, String)>().map(|(worker_id, secret)|{
if let Some(valid_secret) = self.secret {
Expand All @@ -232,23 +175,44 @@ impl Stratum {
}).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))
}

pub fn subscribers(&self) -> RwLockReadGuard<Vec<SocketAddr>> {
self.subscribers.read()
/// rpc method `mining.submit`
fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
Ok(match params {
Params::Array(vals) => {
// first two elements are service messages (worker_id & job_id)
match self.dispatcher.submit(vals.iter().skip(2)
.filter_map(|val| match *val {
Value::String(ref s) => Some(s.to_owned()),
_ => None
})
.collect::<Vec<String>>()) {
Ok(()) => {
self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed"));
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
}
},
_ => {
trace!(target: "stratum", "Invalid submit work format {:?}", params);
to_value(false)
}
}.expect("Only true/false is returned and it's always serializable; qed"))
}

pub fn maintain(&self) {
let mut job_que = self.job_que.write();
let job_payload = self.dispatcher.job();
for socket_addr in job_que.drain() {
job_payload.as_ref().map(
|json| self.tcp_dispatcher.push_message(&socket_addr, json.to_owned())
);
/// Helper method
fn update_peers(&self, tcp_dispatcher: &Dispatcher) {
if let Some(job) = self.dispatcher.job() {
if let Err(e) = self.push_work_all(job, tcp_dispatcher) {
warn!("Failed to update some of the peers: {:?}", e);
}
}
}
}

impl PushWorkHandler for Stratum {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
let hup_peers = {
let workers = self.workers.read();
let next_request_id = {
Expand All @@ -263,7 +227,7 @@ impl PushWorkHandler for Stratum {
trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
for (ref addr, _) in workers.iter() {
trace!(target: "stratum", "pusing work to {}", addr);
match self.tcp_dispatcher.push_message(addr, workers_msg.clone()) {
match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
Err(PushMessageError::NoSuchPeer) => {
trace!(target: "stratum", "Worker no longer connected: {}", &addr);
hup_peers.insert(*addr.clone());
Expand All @@ -285,7 +249,7 @@ impl PushWorkHandler for Stratum {
Ok(())
}

fn push_work(&self, payloads: Vec<String>) -> Result<(), Error> {
fn push_work(&self, payloads: Vec<String>, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
if !payloads.len() > 0 {
return Err(Error::NoWork);
}
Expand All @@ -299,16 +263,63 @@ impl PushWorkHandler for Stratum {
while que.len() > 0 {
let next_worker = addrs[addr_index];
let mut next_payload = que.drain(0..1);
self.tcp_dispatcher.push_message(
next_worker,
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
)?;
tcp_dispatcher.push_message(
next_worker,
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
)?;
addr_index = addr_index + 1;
}
Ok(())
}
}

#[derive(Clone)]
pub struct SocketMetadata {
addr: SocketAddr,
// with the new version of jsonrpc-core, SocketMetadata
// won't have to implement default, so this field will not
// have to be an Option
tcp_dispatcher: Option<Dispatcher>,
}

impl Default for SocketMetadata {
fn default() -> Self {
SocketMetadata {
addr: "0.0.0.0:0".parse().unwrap(),
tcp_dispatcher: None,
}
}
}

impl SocketMetadata {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
}

impl Metadata for SocketMetadata { }

pub struct PeerMetaExtractor {
tcp_dispatcher: Dispatcher,
}

impl PeerMetaExtractor {
fn new(tcp_dispatcher: Dispatcher) -> Self {
PeerMetaExtractor {
tcp_dispatcher,
}
}
}

impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
fn extract(&self, context: &RequestContext) -> SocketMetadata {
SocketMetadata {
addr: context.peer_addr,
tcp_dispatcher: Some(self.tcp_dispatcher.clone()),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -367,7 +378,7 @@ mod tests {
let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap();
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#;
dummy_request(&addr, request);
assert_eq!(1, stratum.subscribers.read().len());
assert_eq!(1, stratum.implementation.subscribers.read().len());
}

struct DummyManager {
Expand Down Expand Up @@ -409,7 +420,7 @@ mod tests {
#[test]
fn receives_initial_paylaod() {
let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap();
Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;

let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
Expand All @@ -430,7 +441,7 @@ mod tests {
let response = String::from_utf8(dummy_request(&addr, request)).unwrap();

assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response);
assert_eq!(1, stratum.workers.read().len());
assert_eq!(1, stratum.implementation.workers.read().len());
}

#[test]
Expand Down

0 comments on commit f244ebe

Please sign in to comment.