Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify Claim and Transaction handling #30

Merged
merged 13 commits into from
Mar 2, 2018
Prev Previous commit
Next Next commit
More defense against a double-spend attack
Before this change, a client could spend funds before the accountant
processed a previous spend. With this change in place, the accountant
updates balances immediately, but that comes at an architectural cost.
The accountant now verifies signatures on behalf of the historian, so
that it can ensure logging will not fail.
  • Loading branch information
garious committed Mar 2, 2018
commit 36bb1f989d0c57cdbff9b31ed72cfa17d5221037
86 changes: 43 additions & 43 deletions src/accountant.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use log::{Entry, Sha256Hash};
use event::{Event, PublicKey, Signature};
use historian::Historian;
use ring::signature::Ed25519KeyPair;
use std::sync::mpsc::{RecvError, SendError};
use std::sync::mpsc::SendError;
use std::collections::HashMap;

pub struct Accountant {
@@ -25,33 +25,6 @@ impl Accountant {
}
}

pub fn process_event(self: &mut Self, event: &Event<u64>) {
match *event {
Event::Claim { key, data, .. } => {
if self.balances.contains_key(&key) {
if let Some(x) = self.balances.get_mut(&key) {
*x += data;
}
} else {
self.balances.insert(key, data);
}
}
Event::Transaction { from, to, data, .. } => {
if let Some(x) = self.balances.get_mut(&from) {
*x -= data;
}
if self.balances.contains_key(&to) {
if let Some(x) = self.balances.get_mut(&to) {
*x += data;
}
} else {
self.balances.insert(to, data);
}
}
_ => (),
}
}

pub fn sync(self: &mut Self) -> Vec<Entry<u64>> {
let mut entries = vec![];
while let Ok(entry) = self.historian.receiver.try_recv() {
@@ -62,25 +35,35 @@ impl Accountant {
self.end_hash = last_entry.end_hash;
}

for e in &entries {
self.process_event(&e.event);
}

entries
}

pub fn deposit_signed(
self: &Self,
self: &mut Self,
key: PublicKey,
data: u64,
sig: Signature,
) -> Result<(), SendError<Event<u64>>> {
let event = Event::Claim { key, data, sig };
if !self.historian.verify_event(&event) {
// TODO: Replace the SendError result with a custom one.
println!("Rejecting transaction: Invalid event");
return Ok(());
}

if self.balances.contains_key(&key) {
if let Some(x) = self.balances.get_mut(&key) {
*x += data;
}
} else {
self.balances.insert(key, data);
}

self.historian.sender.send(event)
}

pub fn deposit(
self: &Self,
self: &mut Self,
n: u64,
keypair: &Ed25519KeyPair,
) -> Result<Signature, SendError<Event<u64>>> {
@@ -97,17 +80,36 @@ impl Accountant {
data: u64,
sig: Signature,
) -> Result<(), SendError<Event<u64>>> {
if self.get_balance(&from).unwrap() < data {
// TODO: Replace the SendError result with a custom one.
println!("Error: Insufficient funds");
return Ok(());
}
let event = Event::Transaction {
from,
to,
data,
sig,
};
if !self.historian.verify_event(&event) {
// TODO: Replace the SendError result with a custom one.
println!("Rejecting transaction: Invalid event");
return Ok(());
}

if self.get_balance(&from).unwrap_or(0) < data {
// TODO: Replace the SendError result with a custom one.
println!("Rejecting transaction: Insufficient funds");
return Ok(());
}

if let Some(x) = self.balances.get_mut(&from) {
*x -= data;
}

if self.balances.contains_key(&to) {
if let Some(x) = self.balances.get_mut(&to) {
*x += data;
}
} else {
self.balances.insert(to, data);
}

self.historian.sender.send(event)
}

@@ -118,15 +120,13 @@ impl Accountant {
to: PublicKey,
) -> Result<Signature, SendError<Event<u64>>> {
use event::{get_pubkey, sign_transaction_data};

let from = get_pubkey(keypair);
let sig = sign_transaction_data(&n, keypair, &to);
self.transfer_signed(from, to, n, sig).map(|_| sig)
}

pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result<u64, RecvError> {
self.sync();
Ok(*self.balances.get(pubkey).unwrap_or(&0))
pub fn get_balance(self: &Self, pubkey: &PublicKey) -> Option<u64> {
self.balances.get(pubkey).map(|x| *x)
}

pub fn wait_on_signature(self: &mut Self, wait_sig: &Signature) {
67 changes: 42 additions & 25 deletions src/historian.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
//! The resulting stream of entries represents ordered events in time.

use std::thread::JoinHandle;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::mpsc::{Receiver, SyncSender};
use std::time::{Duration, SystemTime};
use log::{hash, hash_event, Entry, Sha256Hash};
@@ -18,6 +18,7 @@ pub struct Historian<T> {
pub sender: SyncSender<Event<T>>,
pub receiver: Receiver<Entry<T>>,
pub thread_hdl: JoinHandle<(Entry<T>, ExitReason)>,
pub signatures: HashSet<Signature>,
}

#[derive(Debug, PartialEq, Eq)]
@@ -44,10 +45,25 @@ fn log_event<T: Serialize + Clone + Debug>(
Ok(())
}

fn verify_event_and_reserve_signature<T: Serialize>(
signatures: &mut HashSet<Signature>,
event: &Event<T>,
) -> bool {
if !verify_event(&event) {
return false;
}
if let Some(sig) = get_signature(&event) {
if signatures.contains(&sig) {
return false;
}
signatures.insert(sig);
}
true
}

fn log_events<T: Serialize + Clone + Debug>(
receiver: &Receiver<Event<T>>,
sender: &SyncSender<Entry<T>>,
signatures: &mut HashMap<Signature, bool>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
epoch: SystemTime,
@@ -65,15 +81,7 @@ fn log_events<T: Serialize + Clone + Debug>(
}
match receiver.try_recv() {
Ok(event) => {
if verify_event(&event) {
if let Some(sig) = get_signature(&event) {
if signatures.contains_key(&sig) {
continue;
}
signatures.insert(sig, true);
}
log_event(sender, num_hashes, end_hash, event)?;
}
log_event(sender, num_hashes, end_hash, event)?;
}
Err(TryRecvError::Empty) => {
return Ok(());
@@ -103,13 +111,11 @@ pub fn create_logger<T: 'static + Serialize + Clone + Debug + Send>(
let mut end_hash = start_hash;
let mut num_hashes = 0;
let mut num_ticks = 0;
let mut signatures = HashMap::new();
let epoch = SystemTime::now();
loop {
if let Err(err) = log_events(
&receiver,
&sender,
&mut signatures,
&mut num_hashes,
&mut end_hash,
epoch,
@@ -130,12 +136,17 @@ impl<T: 'static + Serialize + Clone + Debug + Send> Historian<T> {
let (sender, event_receiver) = sync_channel(1000);
let (entry_sender, receiver) = sync_channel(1000);
let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender);
let signatures = HashSet::new();
Historian {
sender,
receiver,
thread_hdl,
signatures,
}
}
pub fn verify_event(self: &mut Self, event: &Event<T>) -> bool {
return verify_event_and_reserve_signature(&mut self.signatures, event);
}
}

#[cfg(test)]
@@ -201,22 +212,28 @@ mod tests {
}

#[test]
fn test_bad_event_attack() {
let zero = Sha256Hash::default();
let hist = Historian::new(&zero, None);
fn test_bad_event_signature() {
let keypair = generate_keypair();
let sig = sign_serialized(&hash(b"hello, world"), &keypair);
let event0 = Event::Claim {
key: get_pubkey(&keypair),
data: hash(b"goodbye cruel world"),
sig: sign_serialized(&hash(b"hello, world"), &keypair),
sig,
};
hist.sender.send(event0).unwrap();
drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap().1,
ExitReason::RecvDisconnected
);
let entries: Vec<Entry<Sha256Hash>> = hist.receiver.iter().collect();
assert_eq!(entries.len(), 0);
let mut sigs = HashSet::new();
assert!(!verify_event_and_reserve_signature(&mut sigs, &event0));
assert!(!sigs.contains(&sig));
}

#[test]
fn test_duplicate_event_signature() {
let keypair = generate_keypair();
let key = get_pubkey(&keypair);
let data = &hash(b"hello, world");
let sig = sign_serialized(data, &keypair);
let event0 = Event::Claim { key, data, sig };
let mut sigs = HashSet::new();
assert!(verify_event_and_reserve_signature(&mut sigs, &event0));
assert!(!verify_event_and_reserve_signature(&mut sigs, &event0));
}
}