Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Gpu rebase inprog #107

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ chrono = { version = "0.4.0", features = ["serde"] }
log = "^0.4.1"
matches = "^0.1.6"
byteorder = "^1.2.1"
libc = "^0.2.1"
46 changes: 46 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
export RUST_LOG=packet=TRACE
#export RUST_BACKTRACE=1

all: htest

htest:wfmt
#cargo test accountant_skel::tests::test_layout -- --nocapture 2>&1 | head -n 30
#cargo test accountant_skel::tests::test_layout -- --nocapture
cargo test accountant_stub -- --nocapture 2>&1 | head -n 30

ci: test bench release clippy ipv6

build:
cargo build 2>&1 | head -n 30

loop:
while true; do fswatch -1 -r src; make; done

test:
cargo test

clippy:
cargo +nightly clippy --features="unstable"

cov:
docker run -it --rm --security-opt seccomp=unconfined --volume "$$PWD:/volume" elmtai/docker-rust-kcov

wfmt:
cargo fmt -- --write-mode=overwrite

release:
cargo build --all-targets --release

node:
cat genesis.log | cargo run --bin silk-testnode > transactions0.log

bench:
cargo +nightly bench --features="unstable" -- --nocapture

ipv6:
cargo test ipv6 --features="ipv6" -- --nocapture

lib:libcuda_verify_ed25519.a
libcuda_verify_ed25519.a:dummy.c
cc -o dummy.o -c dummy.c
ar -cvq libcuda_verify_ed25519.a dummy.o
8 changes: 8 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn main() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the user doesn't have cuda libs installed or doesn't want to install them?

Copy link
Member

@sakridge sakridge Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we can have something like: if !env::var("CARGO_CFG_CUDA").is_err() { prints... }

if we have a cfg like this:
#[cfg(cuda)] => CARGO_CFG_CUDA

println!("cargo:rustc-link-search=native=.");
println!("cargo:rustc-link-lib=static=cuda_verify_ed25519");
println!("cargo:rustc-link-search=native=/usr/local/cuda/lib64");
println!("cargo:rustc-link-lib=dylib=cudart");
println!("cargo:rustc-link-lib=dylib=cuda");
println!("cargo:rustc-link-lib=dylib=cudadevrt");
}
51 changes: 51 additions & 0 deletions dummy.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <stdint.h>
#include <assert.h>
#include <stdio.h>

#define PACKET_SIZE 288
#define PACKET_DATA_SIZE 256
union Packet {
char data[PACKET_DATA_SIZE];
char total[PACKET_SIZE];
};

struct Elems {
union Packet *packet;
uint32_t len;
};

int ed25519_verify_many(
const struct Elems *vecs,
uint32_t num,
uint32_t message_size,
uint32_t public_key_offset,
uint32_t signature_offset,
uint32_t signed_message_offset,
uint32_t signed_message_len_offset,
uint8_t *out
) {
int i, p = 0;
assert(num > 0);
for(i = 0; i < num; ++i) {
int j;
assert(vecs[i].len > 0);
assert(message_size == PACKET_SIZE);
assert(signed_message_len_offset == PACKET_DATA_SIZE);
for(j = 0; j < vecs[i].len; ++j) {
uint32_t *len = (uint32_t*)&vecs[i].packet[j].total[signed_message_len_offset];
assert(*len <= PACKET_DATA_SIZE);
p += 1;
if(public_key_offset > *len - 32) {
continue;
}
if(signature_offset > *len - 64) {
continue;
}
if(signed_message_offset > *len) {
continue;
}
out[p - 1] = 1;
}
}
return 0;
}
10 changes: 6 additions & 4 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//! on behalf of the caller, and a private low-level API for when they have
//! already been signed and verified.

extern crate libc;

use chrono::prelude::*;
use event::Event;
use hash::Hash;
Expand Down Expand Up @@ -68,19 +70,19 @@ impl Accountant {

/// Process a Transaction that has already been verified.
pub fn process_verified_transaction(&mut self, tr: &Transaction) -> Result<()> {
if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens {
if self.get_balance(&tr.data.from).unwrap_or(0) < tr.data.tokens {
return Err(AccountingError::InsufficientFunds);
}

if !self.reserve_signature(&tr.sig) {
return Err(AccountingError::InvalidTransferSignature);
}

if let Some(x) = self.balances.get_mut(&tr.from) {
*x -= tr.tokens;
if let Some(x) = self.balances.get_mut(&tr.data.from) {
*x -= tr.data.tokens;
}

let mut plan = tr.plan.clone();
let mut plan = tr.data.plan.clone();
plan.apply_witness(&Witness::Timestamp(self.last_time));

if let Some(ref payment) = plan.final_payment() {
Expand Down
162 changes: 123 additions & 39 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ use accountant::Accountant;
use bincode::{deserialize, serialize};
use entry::Entry;
use event::Event;
use gpu;
use hash::Hash;
use historian::Historian;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use recorder::Signal;
use result::Result;
use serde_json;
use signature::PublicKey;
use std::collections::VecDeque;
use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, SendError};
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use packet;
use std::sync::{Arc, Mutex};
use transaction::Transaction;
use std::collections::VecDeque;

pub struct AccountantSkel<W: Write + Send + 'static> {
acc: Accountant,
Expand All @@ -44,14 +46,14 @@ impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify(),
Request::Transaction(ref tr) => tr.plan_verify(),
_ => true,
}
}
}

/// Parallel verfication of a batch of requests.
fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> {
pub fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> {
reqs.into_par_iter().filter({ |x| x.0.verify() }).collect()
}

Expand Down Expand Up @@ -83,14 +85,18 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}

/// Process Request items sent by clients.
pub fn log_verified_request(&mut self, msg: Request) -> Option<Response> {
pub fn log_verified_request(&mut self, msg: Request, verify: u8) -> Option<Response> {
match msg {
Request::Transaction(_) if verify == 0 => {
eprintln!("Transaction falid sigverify");
None
}
Request::Transaction(tr) => {
if let Err(err) = self.acc.process_verified_transaction(&tr) {
eprintln!("Transaction error: {:?}", err);
} else if let Err(SendError(_)) = self.historian
.sender
.send(Signal::Event(Event::Transaction(tr)))
.send(Signal::Event(Event::Transaction(tr.clone())))
{
eprintln!("Channel send error");
}
Expand All @@ -104,46 +110,99 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
}

fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
//println!("got msgs");
let mut v = Vec::new();
v.push(msgs);
while let Ok(more) = recvr.try_recv() {
//println!("got more msgs");
v.push(more);
}
//println!("verifying");
let rvs = gpu::ecdsa_verify(&v);
//println!("verified!");
let mut len = 0;
let mut sv = Vec::new();
let mut sr = Vec::new();
for (v, r) in v.iter().zip(rvs.iter()) {
if len + r.len() >= 256 {
println!("sending {}", len);
sendr.send((sv, sr))?;
sv = Vec::new();
sr = Vec::new();
len = 0;
}
sv.push(v.clone());
sr.push(r.clone());
len += r.len();
assert!(len < 256);
}
if !sv.is_empty() {
sendr.send((sv, sr))?;
}
Ok(())
}

pub fn deserialize_packets(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
//deserealize in parallel
let mut r = vec![];
for x in &p.packets {
let rsp_addr = x.meta.addr();
let sz = x.meta.size;
if let Ok(req) = deserialize(&x.data[0..sz]) {
r.push(Some((req, rsp_addr)));
} else {
r.push(None);
}
}
r
}

fn process(
obj: &Arc<Mutex<AccountantSkel<W>>>,
packet_receiver: &streamer::PacketReceiver,
verified_receiver: &Receiver<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = packet_receiver.recv_timeout(timer)?;
let msgs_ = msgs.clone();
let mut rsps = VecDeque::new();
{
let mut reqs = vec![];
for packet in &msgs.read().unwrap().packets {
let rsp_addr = packet.meta.addr();
let sz = packet.meta.size;
let req = deserialize(&packet.data[0..sz])?;
reqs.push((req, rsp_addr));
}
let reqs = filter_valid_requests(reqs);
for (req, rsp_addr) in reqs {
if let Some(resp) = obj.lock().unwrap().log_verified_request(req) {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
let (mms, vvs) = verified_receiver.recv_timeout(timer)?;
for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) {
let msgs_ = msgs.clone();
let mut rsps = VecDeque::new();
{
let reqs = Self::deserialize_packets(&((*msgs).read().unwrap()));
for (data, v) in reqs.into_iter().zip(vers.into_iter()) {
if let Some((req, rsp_addr)) = data {
if !req.verify() {
continue;
}
if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
rsps.push_back(blob);
}
}
rsps.push_back(blob);
}
}
if !rsps.is_empty() {
//don't wake up the other side if there is nothing
blob_sender.send(rsps)?;
}
packet_recycler.recycle(msgs_);
}
if !rsps.is_empty() {
//don't wake up the other side if there is nothing
blob_sender.send(rsps)?;
}
packet_recycler.recycle(msgs_);
Ok(())
}

Expand All @@ -168,11 +227,21 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let (blob_sender, blob_receiver) = channel();
let t_responder =
streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
&skel,
&packet_receiver,
&verified_receiver,
&blob_sender,
&packet_recycler,
&blob_recycler,
Expand All @@ -181,6 +250,21 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
break;
}
});
Ok(vec![t_receiver, t_responder, t_server])
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}
}

#[cfg(test)]
mod tests {
use accountant_skel::Request;
use bincode::serialize;
use gpu;
use transaction::{memfind, test_tx};
#[test]
fn test_layout() {
let tr = test_tx();
let tx = serialize(&tr).unwrap();
let packet = serialize(&Request::Transaction(tr)).unwrap();
assert_matches!(memfind(&packet, &tx), Some(gpu::TX_OFFSET));
}
}
Loading