Skip to content

Commit

Permalink
Add Two-Phase Commit example (#86)
Browse files Browse the repository at this point in the history
* two phase commit example

* respond to PR comments

* remove dead use line

* fix comment formatting for linter

* remove redundant message construction
  • Loading branch information
jhellerstein authored Feb 24, 2022
1 parent c1884ee commit 54bafa4
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 0 deletions.
3 changes: 3 additions & 0 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ name = "graph_reachability"
[[example]]
name = "three_clique"

[[example]]
name = "two_pc"

[dependencies]
bincode = "1.3"
byteorder = "1.4.3"
Expand Down
31 changes: 31 additions & 0 deletions hydroflow/examples/two_pc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
## Two Phase Commit
This is a remedial 2PC implementation.

Design limitations:
- No database logging (just log statements via println)
- No distinction between forced and non-forced logs, no presumed commit/abort optimizations
- No recovery manager implementation (yet)
- Subordinates make random decisions whether to commit or abort

Temporary Implementation limitations:
- With the current absence of groupby, the size of the subordinate set is hardcoded inline
- With the current absence of groupby, counting is done by moving Rust HashMaps and HashSets into map operators

### To Run the code:
Look in the file `members.json` to find the addresses of the coordinator and subordinates.
For the coordinator, launch a process on the node with a matching IP address as follows.
Here we assume the coordinator's IP address is `localhost` and port `12346` is free:
```
cargo run --example two_pc -- --path hydroflow/examples/two_pc/members.json --role coordinator --port 12346 --addr localhost
```

Now for each subordinate, launch a process on the node with the matching IP address as follows.
Here we assume the subordinate's IP address is `localhost` and port `12349` is free:
```
cargo run --example two_pc -- --path hydroflow/examples/two_pc/members.json --role subordinate --addr localhost --port 12349
```

Now, in the coordinator process you can type an integer at `stdin`. Each integer you type is considered a transaction ID,
and a two-phase commit process is run for that transaction. Votes to commit or abort are randomized.

You should see logging information on screen at both the coordinator and the subordinates.
217 changes: 217 additions & 0 deletions hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
use std::collections::HashMap;
use std::collections::HashSet;

use crate::Opts;

use crate::protocol::{CoordMsg, MsgType, SubordResponse};
use hydroflow::builder::prelude::*;
use hydroflow::scheduled::handoff::VecHandoff;

pub(crate) async fn run_coordinator(opts: Opts, subordinates: Vec<String>) {
let mut hf = HydroflowBuilder::default();

// We provide a command line for users to type a Transaction ID (integer) to commit.
// setup stdio input handler
let text_out = {
use futures::stream::StreamExt;
use tokio::io::AsyncBufReadExt;
let reader = tokio::io::BufReader::new(tokio::io::stdin());
let lines = tokio_stream::wrappers::LinesStream::new(reader.lines())
.map(|result| Some(result.expect("Failed to read stdin as UTF-8.")));
hf.add_input_from_stream::<_, _, VecHandoff<String>, _>("stdin text", lines)
};

// setup message send/recv ports
let msg_recv = hf
.hydroflow
.inbound_tcp_vertex_port::<SubordResponse>(opts.port)
.await;
let msg_recv = hf.wrap_input(msg_recv);
let msg_send = hf.hydroflow.outbound_tcp_vertex::<CoordMsg>().await;
let msg_send = hf.wrap_output(msg_send);

// setup bootstrap data: subordinates list
let (subordinates_in, subordinates_out) =
hf.add_channel_input::<_, Option<String>, VecHandoff<_>>("subordinates");

// Three separate flows have to pull from subordinates; setup a handoff for each.
let (subordinates_out_tee_1_push, subordinates_out_tee_1_pull) =
hf.make_edge::<_, VecHandoff<String>, _>("subordinates_out_tee_1");
let (subordinates_out_tee_2_push, subordinates_out_tee_2_pull) =
hf.make_edge::<_, VecHandoff<String>, _>("subordinates_out_tee_2");
let (subordinates_out_tee_3_push, subordinates_out_tee_3_pull) =
hf.make_edge::<_, VecHandoff<String>, _>("subordinates_out_tee_3");

// create a flow to populate the subordinate handoffs
hf.add_subgraph(
"fetch subordinates",
subordinates_out.flatten().pull_to_push().map(Some).tee(
subordinates_out_tee_1_push,
hf.start_tee()
.tee(subordinates_out_tee_2_push, subordinates_out_tee_3_push),
),
);

// Demultiplex received messages into phase1 and phase2 handoffs
let (p1_recv_push, p1_recv_pull) = hf.make_edge::<_, VecHandoff<SubordResponse>, _>("p1");
let (p2_recv_push, p2_recv_pull) = hf.make_edge::<_, VecHandoff<SubordResponse>, _>("p2");

// Create a flow to partition the received messages and populate the handoffs
hf.add_subgraph(
"demux",
msg_recv.flatten().pull_to_push().partition(
|m| m.mtype == MsgType::Commit || m.mtype == MsgType::Abort,
hf.start_tee().map(Some).push_to(p1_recv_push),
hf.start_tee().map(Some).push_to(p2_recv_push),
),
);

// Multiplex messages to send
let (p1_send_push, p1_send_pull) =
hf.make_edge::<_, VecHandoff<(String, CoordMsg)>, _>("p1 send");
let (p2_send_push, p2_send_pull) =
hf.make_edge::<_, VecHandoff<(String, CoordMsg)>, _>("p2 send");
let (p2_send_end_push, p2_send_end_pull) =
hf.make_edge::<_, VecHandoff<(String, CoordMsg)>, _>("p2 send end");

// Create a flow to union together the messages and populate the network with them
hf.add_subgraph(
"mux",
p1_send_pull
.chain(p2_send_pull)
.chain(p2_send_end_pull)
.pull_to_push()
.push_to(msg_send),
);

// Phase 1 request flow:
// Given a transaction commit request from stdio, send a Prepare Message to each subordinate
// with the transaction ID and a unique message ID.
// HACK: We move the xids HashSet into a map operator as flow state.
// Should be done with a groupby.
let mut xids = HashSet::new();
hf.add_subgraph(
"phase 1 init",
text_out
.flatten()
.filter_map(move |xidstr| {
match xidstr.trim().parse::<u16>() {
Ok(the_xid) => {
if xids.contains(&the_xid) {
println!("Transaction ID {} already used", the_xid);
None
} else {
xids.insert(the_xid);
Some(CoordMsg {
xid: the_xid,
mid: 1, // first message for this transaction
mtype: MsgType::Prepare,
})
}
}
Err(_) => None,
}
})
.cross_join(subordinates_out_tee_1_pull.flatten())
.map(|(msg, addr)| Some((addr, msg)))
.pull_to_push()
.push_to(p1_send_push),
);

// Phase 1 Response and Phase 2 Request:
// Receive and count subordinate responses until we have them all.
// Once we have them all, assess the unanimous vote condition for commit and
// send either a Commit or Abort message.
// HACK: We move the vote_ctr((xid, mid), count) HashMap into a map operator as flow state.
// Should be done with a groupby.
let mut vote_ctr = HashMap::new();
hf.add_subgraph(
"collect votes and send command",
p1_recv_pull
.flatten()
.map(move |msg| {
println!("{:?}", msg);
msg
})
.filter_map(move |msg| {
*vote_ctr.entry((msg.xid, msg.mtype)).or_insert(0) += 1;
// HACK: the constant 3 here is the length of the subordinates list!
// Should be computed on bootstrap tick and joined in here.
if vote_ctr.get(&(msg.xid, MsgType::Commit)).unwrap_or(&0)
+ vote_ctr.get(&(msg.xid, MsgType::Abort)).unwrap_or(&0)
== 3
{
// Abort if any subordinate voted to Abort
Some(CoordMsg {
xid: msg.xid,
mid: msg.mid + 1,
// Abort if any subordinate voted to Abort
mtype: if vote_ctr.get(&(msg.xid, MsgType::Abort)).unwrap_or(&0) > &0 {
MsgType::Abort
} else {
MsgType::Commit
},
})
} else {
None
}
})
.map(|msg| {
println!(
"All votes in for xid {:?}, sending {:?}",
msg.xid, msg.mtype
);
println!("Logging {:?}", msg);
msg
})
.cross_join(subordinates_out_tee_2_pull.flatten())
.map(|(msg, addr)| Some((addr, msg)))
.pull_to_push()
.push_to(p2_send_push),
);

// Phase 2 Response: collect subordinate acks until we have them all
// Then write to local log and send End message to all subordinates.
let mut ack_ctr = HashMap::new();
hf.add_subgraph(
"collect acks and end",
p2_recv_pull
.flatten()
.map(|msg| {
println!("{:?}", msg);
msg
})
.filter_map(move |msg| {
*ack_ctr.entry((msg.xid, msg.mtype)).or_insert(0) += 1;
// HACK
if ack_ctr.get(&(msg.xid, MsgType::AckP2)).unwrap_or(&0) + 0 == 3 {
Some(msg)
} else {
None
}
})
.map(|smsg| {
let cmsg = CoordMsg {
xid: smsg.xid,
mid: smsg.mid + 1,
mtype: MsgType::End,
};
println!("Logging {:?}", cmsg);
cmsg
})
.cross_join(subordinates_out_tee_3_pull.flatten())
.map(|(msg, addr)| Some((addr, msg)))
.pull_to_push()
.push_to(p2_send_end_push),
);

// start the data flowing.
let mut hf = hf.build();
// first populate the static subordinates into the flow
subordinates
.into_iter()
.map(Some)
.for_each(|x| subordinates_in.give(x));
subordinates_in.flush();
hf.run_async().await.unwrap();
}
68 changes: 68 additions & 0 deletions hydroflow/examples/two_pc/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use clap::{ArgEnum, Parser};
use coordinator::run_coordinator;
use hydroflow::tokio;
use serde::Deserialize;
use subordinate::run_subordinate;

use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::path::Path;

mod coordinator;
mod protocol;
mod subordinate;

/// This is a remedial 2PC implementation.
#[derive(Clone, ArgEnum, Debug)]
enum Role {
Coordinator,
Subordinate,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(long)]
path: String,
#[clap(arg_enum, long)]
role: Role,
#[clap(long)]
port: u16,
#[clap(long)]
addr: String,
}

#[derive(Deserialize, Debug)]
struct Addresses {
coordinator: String,
subordinates: Vec<String>,
}

fn read_addresses_from_file<P: AsRef<Path>>(path: P) -> Result<Addresses, Box<dyn Error>> {
// Open the file in read-only mode with buffer.
let file = File::open(path)?;
let reader = BufReader::new(file);

// Read the JSON contents of the file as an instance of `Subordinates`.
let u = serde_json::from_reader(reader)?;

// Return the `Subordinates`.
Ok(u)
}

#[tokio::main]
async fn main() {
let opts = Opts::parse();
let path = Path::new(&opts.path);
let subordinates = read_addresses_from_file(path).unwrap().subordinates;
let coordinator = read_addresses_from_file(path).unwrap().coordinator;
match opts.role {
Role::Coordinator => {
run_coordinator(opts, subordinates).await;
}
Role::Subordinate => {
run_subordinate(opts, coordinator).await;
}
}
}
8 changes: 8 additions & 0 deletions hydroflow/examples/two_pc/members.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"coordinator": "localhost:12346",
"subordinates": [
"localhost:12347",
"localhost:12348",
"localhost:12349"
]
}
27 changes: 27 additions & 0 deletions hydroflow/examples/two_pc/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash, Copy)]
pub enum MsgType {
Prepare,
Commit,
Abort,
AckP2,
End,
Ended,
Err,
}

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
pub struct CoordMsg {
pub xid: u16,
pub mid: u16,
pub mtype: MsgType,
}
/// Member Response
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
pub struct SubordResponse {
pub xid: u16,
pub mid: u16,
pub addr: String,
pub mtype: MsgType,
}
Loading

0 comments on commit 54bafa4

Please sign in to comment.