Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

IPC persistent client link #930

Merged
merged 6 commits into from
Apr 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions ipc/codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ fn push_client(
{
push_client_struct(cx, builder, item, push);
push_client_implementation(cx, builder, dispatches, item, push);
push_with_socket_client_implementation(cx, builder, item, push);
}

/// returns an expression with the body for single operation that is being sent to server
Expand Down Expand Up @@ -485,6 +486,25 @@ fn implement_client_method(
signature.unwrap()
}

fn push_with_socket_client_implementation(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
item: &Item,
push: &mut FnMut(Annotatable))
{
let client_ident = builder.id(format!("{}Client", item.ident.name.as_str()));
let implement = quote_item!(cx,
impl<S> ::ipc::WithSocket<S> for $client_ident<S> where S: ::ipc::IpcSocket {
fn init(socket: S) -> $client_ident<S> {
$client_ident {
socket: ::std::cell::RefCell::new(socket),
phantom: ::std::marker::PhantomData,
}
}
}).unwrap();
push(Annotatable::Item(implement));
}

/// pushes full client side code for the original class exposed via ipc
fn push_client_implementation(
cx: &ExtCtxt,
Expand All @@ -502,18 +522,11 @@ fn push_client_implementation(
let item_ident = builder.id(format!("{}", item.ident.name.as_str()));
let implement = quote_item!(cx,
impl<S> $client_ident<S> where S: ::ipc::IpcSocket {
pub fn new(socket: S) -> $client_ident<S> {
$client_ident {
socket: ::std::cell::RefCell::new(socket),
phantom: ::std::marker::PhantomData,
}
}

pub fn handshake(&self) -> Result<(), ::ipc::Error> {
let payload = BinHandshake {
protocol_version: $item_ident::protocol_version().to_string(),
api_version: $item_ident::api_version().to_string(),
_reserved: vec![0u8, 64],
_reserved: vec![0u8; 64],
};

let mut socket_ref = self.socket.borrow_mut();
Expand Down
39 changes: 36 additions & 3 deletions ipc/nano/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ extern crate ethcore_ipc as ipc;
extern crate nanomsg;
#[macro_use] extern crate log;

pub use ipc::*;
pub use ipc::{WithSocket, IpcInterface, IpcConfig};

use std::sync::*;
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
use std::ops::Deref;

const POLL_TIMEOUT: isize = 100;

Expand All @@ -34,6 +35,36 @@ pub struct Worker<S> where S: IpcInterface<S> {
buf: Vec<u8>,
}

pub struct GuardedSocket<S> where S: WithSocket<Socket> {
client: Arc<S>,
_endpoint: Endpoint,
}

impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
type Target = S;

fn deref(&self) -> &S {
&self.client
}
}

pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::DuplexLink
}));

let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::DuplexLink
}));

Ok(GuardedSocket {
client: Arc::new(S::init(socket)),
_endpoint: endpoint,
})
}

#[derive(Debug)]
pub enum SocketError {
DuplexLink
Expand All @@ -60,6 +91,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
match socket.nb_read_to_end(&mut self.buf) {
Ok(method_sign_len) => {
if method_sign_len >= 2 {

// method_num
let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16;
// payload
Expand Down Expand Up @@ -113,7 +145,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
}

#[cfg(test)]
mod tests {
mod service_tests {

use super::Worker;
use ipc::*;
Expand Down Expand Up @@ -150,10 +182,11 @@ mod tests {
}
}

impl IpcConfig for DummyService {}

fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) {
let mut socket = Socket::new(Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap();
//thread::sleep_ms(10);
socket.write(buf).unwrap();
(socket, endpoint)
}
Expand Down
1 change: 1 addition & 0 deletions ipc/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ license = "GPL-3.0"
[dependencies]
ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0"
nanomsg = "0.5.0"
13 changes: 10 additions & 3 deletions ipc/rpc/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use std::io::{Read, Write};
use std::marker::Sync;
use std::sync::atomic::*;
use semver::Version;

pub struct Handshake {
Expand Down Expand Up @@ -47,7 +46,7 @@ pub enum Error {
HandshakeFailed,
}

pub trait IpcInterface<T> where T: IpcConfig {
pub trait IpcInterface<T>: IpcConfig {
/// reads the message from io, dispatches the call and returns serialized result
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;

Expand All @@ -70,6 +69,7 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
if params.is_some() {
buf[2..buf_len].clone_from_slice(params.as_ref().unwrap());
}

if w.write(&buf).unwrap() != buf_len
{
// if write was inconsistent
Expand All @@ -81,5 +81,12 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
pub trait IpcSocket: Read + Write + Sync {
}

impl IpcSocket for ::devtools::TestSocket {

pub trait WithSocket<S: IpcSocket> {
fn init(socket: S) -> Self;
}


impl IpcSocket for ::devtools::TestSocket {}

impl IpcSocket for ::nanomsg::Socket {}
3 changes: 2 additions & 1 deletion ipc/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

extern crate ethcore_devtools as devtools;
extern crate semver;
extern crate nanomsg;

pub mod interface;
pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error};
pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error, WithSocket};
3 changes: 3 additions & 0 deletions ipc/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ bincode = "*"
serde = "0.7.0"
ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0"
nanomsg = "0.5.0"
ethcore-ipc-nano = { path = "../nano" }


[build-dependencies]
syntex = "0.30.0"
Expand Down
6 changes: 3 additions & 3 deletions ipc/tests/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod tests {
fn call_service_client() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket);
let service_client = ServiceClient::init(socket);

let result = service_client.commit(5);

Expand All @@ -75,7 +75,7 @@ mod tests {
fn call_service_client_optional() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket);
let service_client = ServiceClient::init(socket);

let result = service_client.rollback(Some(5), 10);

Expand All @@ -95,7 +95,7 @@ mod tests {
fn call_service_client_handshake() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![1];
let service_client = ServiceClient::new(socket);
let service_client = ServiceClient::init(socket);

let result = service_client.handshake();

Expand Down
109 changes: 109 additions & 0 deletions ipc/tests/over_nano.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

#[cfg(test)]
mod tests {

use super::super::service::*;
use nanoipc;
use std::sync::Arc;
use std::io::{Write, Read};
use std::sync::atomic::{Ordering, AtomicBool};

fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) {
let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap();
socket.write(buf).unwrap();
(socket, endpoint)
}


fn init_worker(addr: &str) -> nanoipc::Worker<Service> {
let mut worker = nanoipc::Worker::<Service>::new(Arc::new(Service::new()));
worker.add_duplex(addr).unwrap();
worker
}

#[test]
fn can_create_client() {
let client = nanoipc::init_client::<ServiceClient<_>>("ipc:///tmp/parity-nano-test10.ipc");
assert!(client.is_ok());
}

#[test]
fn can_call_handshake() {
let url = "ipc:///tmp/parity-test-nano-20.ipc";
let worker_should_exit = Arc::new(AtomicBool::new(false));
let worker_is_ready = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();

::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed);
}
});

while !worker_is_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_client::<ServiceClient<_>>(url).unwrap();

let hs = client.handshake();

worker_should_exit.store(true, Ordering::Relaxed);
assert!(hs.is_ok());
}

#[test]
fn can_receive_dummy_writes_in_thread() {
let url = "ipc:///tmp/parity-test-nano-30.ipc";
let worker_should_exit = Arc::new(AtomicBool::new(false));
let worker_is_ready = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();

::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed);
}
});
while !worker_is_ready.load(Ordering::Relaxed) { }

let (mut _s, _e) = dummy_write(url, &vec![0, 0,
// protocol version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// api version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// reserved
0, 0, 0, 0, 0, 0, 0, 64,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]);

let mut buf = vec![0u8;1];
_s.read(&mut buf).unwrap();
assert_eq!(1, buf.len());
assert_eq!(1, buf[0]);

worker_should_exit.store(true, Ordering::Relaxed);
}

}
3 changes: 3 additions & 0 deletions ipc/tests/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ extern crate ethcore_ipc as ipc;
extern crate serde;
extern crate ethcore_devtools as devtools;
extern crate semver;
extern crate nanomsg;
extern crate ethcore_ipc_nano as nanoipc;

pub mod service;
mod examples;
mod over_nano;