diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index ed848c2c474..c2b11aad581 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -22,6 +22,9 @@ name = "graph_reachability" [[example]] name = "three_clique" +[[example]] +name = "two_pc" + [dependencies] bincode = "1.3" byteorder = "1.4.3" diff --git a/hydroflow/examples/two_pc/README.md b/hydroflow/examples/two_pc/README.md new file mode 100644 index 00000000000..cc5fae71373 --- /dev/null +++ b/hydroflow/examples/two_pc/README.md @@ -0,0 +1,31 @@ +## Two Phase Commit +This is a remedial 2PC implementation. + +Design limitations: +- No database logging (just log statements via println) +- No distinction between forced and non-forced logs, no presumed commit/abort optimizations +- No recovery manager implementation (yet) +- Subordinates make random decisions whether to commit or abort + +Temporary Implementation limitations: +- With the current absence of groupby, the size of the subordinate set is hardcoded inline +- With the current absence of groupby, counting is done by moving Rust HashMaps and HashSets into map operators + +### To Run the code: +Look in the file `members.json` to find the addresses of the coordinator and subordinates. +For the coordinator, launch a process on the node with a matching IP address as follows. +Here we assume the coordinator's IP address is `localhost` and port `12346` is free: +``` +cargo run --example two_pc -- --path hydroflow/examples/two_pc/members.json --role coordinator --port 12346 --addr localhost +``` + +Now for each subordinate, launch a process on the node with the matching IP address as follows. +Here we assume the subordinate's IP address is `localhost` and port `12349` is free: +``` +cargo run --example two_pc -- --path hydroflow/examples/two_pc/members.json --role subordinate --addr localhost --port 12349 +``` + +Now, in the coordinator process you can type an integer at `stdin`. Each integer you type is considered a transaction ID, +and a two-phase commit process is run for that transaction. Votes to commit or abort are randomized. + +You should see logging information on screen at both the coordinator and the subordinates. \ No newline at end of file diff --git a/hydroflow/examples/two_pc/coordinator.rs b/hydroflow/examples/two_pc/coordinator.rs new file mode 100644 index 00000000000..fbc0eabdef2 --- /dev/null +++ b/hydroflow/examples/two_pc/coordinator.rs @@ -0,0 +1,217 @@ +use std::collections::HashMap; +use std::collections::HashSet; + +use crate::Opts; + +use crate::protocol::{CoordMsg, MsgType, SubordResponse}; +use hydroflow::builder::prelude::*; +use hydroflow::scheduled::handoff::VecHandoff; + +pub(crate) async fn run_coordinator(opts: Opts, subordinates: Vec) { + let mut hf = HydroflowBuilder::default(); + + // We provide a command line for users to type a Transaction ID (integer) to commit. + // setup stdio input handler + let text_out = { + use futures::stream::StreamExt; + use tokio::io::AsyncBufReadExt; + let reader = tokio::io::BufReader::new(tokio::io::stdin()); + let lines = tokio_stream::wrappers::LinesStream::new(reader.lines()) + .map(|result| Some(result.expect("Failed to read stdin as UTF-8."))); + hf.add_input_from_stream::<_, _, VecHandoff, _>("stdin text", lines) + }; + + // setup message send/recv ports + let msg_recv = hf + .hydroflow + .inbound_tcp_vertex_port::(opts.port) + .await; + let msg_recv = hf.wrap_input(msg_recv); + let msg_send = hf.hydroflow.outbound_tcp_vertex::().await; + let msg_send = hf.wrap_output(msg_send); + + // setup bootstrap data: subordinates list + let (subordinates_in, subordinates_out) = + hf.add_channel_input::<_, Option, VecHandoff<_>>("subordinates"); + + // Three separate flows have to pull from subordinates; setup a handoff for each. + let (subordinates_out_tee_1_push, subordinates_out_tee_1_pull) = + hf.make_edge::<_, VecHandoff, _>("subordinates_out_tee_1"); + let (subordinates_out_tee_2_push, subordinates_out_tee_2_pull) = + hf.make_edge::<_, VecHandoff, _>("subordinates_out_tee_2"); + let (subordinates_out_tee_3_push, subordinates_out_tee_3_pull) = + hf.make_edge::<_, VecHandoff, _>("subordinates_out_tee_3"); + + // create a flow to populate the subordinate handoffs + hf.add_subgraph( + "fetch subordinates", + subordinates_out.flatten().pull_to_push().map(Some).tee( + subordinates_out_tee_1_push, + hf.start_tee() + .tee(subordinates_out_tee_2_push, subordinates_out_tee_3_push), + ), + ); + + // Demultiplex received messages into phase1 and phase2 handoffs + let (p1_recv_push, p1_recv_pull) = hf.make_edge::<_, VecHandoff, _>("p1"); + let (p2_recv_push, p2_recv_pull) = hf.make_edge::<_, VecHandoff, _>("p2"); + + // Create a flow to partition the received messages and populate the handoffs + hf.add_subgraph( + "demux", + msg_recv.flatten().pull_to_push().partition( + |m| m.mtype == MsgType::Commit || m.mtype == MsgType::Abort, + hf.start_tee().map(Some).push_to(p1_recv_push), + hf.start_tee().map(Some).push_to(p2_recv_push), + ), + ); + + // Multiplex messages to send + let (p1_send_push, p1_send_pull) = + hf.make_edge::<_, VecHandoff<(String, CoordMsg)>, _>("p1 send"); + let (p2_send_push, p2_send_pull) = + hf.make_edge::<_, VecHandoff<(String, CoordMsg)>, _>("p2 send"); + let (p2_send_end_push, p2_send_end_pull) = + hf.make_edge::<_, VecHandoff<(String, CoordMsg)>, _>("p2 send end"); + + // Create a flow to union together the messages and populate the network with them + hf.add_subgraph( + "mux", + p1_send_pull + .chain(p2_send_pull) + .chain(p2_send_end_pull) + .pull_to_push() + .push_to(msg_send), + ); + + // Phase 1 request flow: + // Given a transaction commit request from stdio, send a Prepare Message to each subordinate + // with the transaction ID and a unique message ID. + // HACK: We move the xids HashSet into a map operator as flow state. + // Should be done with a groupby. + let mut xids = HashSet::new(); + hf.add_subgraph( + "phase 1 init", + text_out + .flatten() + .filter_map(move |xidstr| { + match xidstr.trim().parse::() { + Ok(the_xid) => { + if xids.contains(&the_xid) { + println!("Transaction ID {} already used", the_xid); + None + } else { + xids.insert(the_xid); + Some(CoordMsg { + xid: the_xid, + mid: 1, // first message for this transaction + mtype: MsgType::Prepare, + }) + } + } + Err(_) => None, + } + }) + .cross_join(subordinates_out_tee_1_pull.flatten()) + .map(|(msg, addr)| Some((addr, msg))) + .pull_to_push() + .push_to(p1_send_push), + ); + + // Phase 1 Response and Phase 2 Request: + // Receive and count subordinate responses until we have them all. + // Once we have them all, assess the unanimous vote condition for commit and + // send either a Commit or Abort message. + // HACK: We move the vote_ctr((xid, mid), count) HashMap into a map operator as flow state. + // Should be done with a groupby. + let mut vote_ctr = HashMap::new(); + hf.add_subgraph( + "collect votes and send command", + p1_recv_pull + .flatten() + .map(move |msg| { + println!("{:?}", msg); + msg + }) + .filter_map(move |msg| { + *vote_ctr.entry((msg.xid, msg.mtype)).or_insert(0) += 1; + // HACK: the constant 3 here is the length of the subordinates list! + // Should be computed on bootstrap tick and joined in here. + if vote_ctr.get(&(msg.xid, MsgType::Commit)).unwrap_or(&0) + + vote_ctr.get(&(msg.xid, MsgType::Abort)).unwrap_or(&0) + == 3 + { + // Abort if any subordinate voted to Abort + Some(CoordMsg { + xid: msg.xid, + mid: msg.mid + 1, + // Abort if any subordinate voted to Abort + mtype: if vote_ctr.get(&(msg.xid, MsgType::Abort)).unwrap_or(&0) > &0 { + MsgType::Abort + } else { + MsgType::Commit + }, + }) + } else { + None + } + }) + .map(|msg| { + println!( + "All votes in for xid {:?}, sending {:?}", + msg.xid, msg.mtype + ); + println!("Logging {:?}", msg); + msg + }) + .cross_join(subordinates_out_tee_2_pull.flatten()) + .map(|(msg, addr)| Some((addr, msg))) + .pull_to_push() + .push_to(p2_send_push), + ); + + // Phase 2 Response: collect subordinate acks until we have them all + // Then write to local log and send End message to all subordinates. + let mut ack_ctr = HashMap::new(); + hf.add_subgraph( + "collect acks and end", + p2_recv_pull + .flatten() + .map(|msg| { + println!("{:?}", msg); + msg + }) + .filter_map(move |msg| { + *ack_ctr.entry((msg.xid, msg.mtype)).or_insert(0) += 1; + // HACK + if ack_ctr.get(&(msg.xid, MsgType::AckP2)).unwrap_or(&0) + 0 == 3 { + Some(msg) + } else { + None + } + }) + .map(|smsg| { + let cmsg = CoordMsg { + xid: smsg.xid, + mid: smsg.mid + 1, + mtype: MsgType::End, + }; + println!("Logging {:?}", cmsg); + cmsg + }) + .cross_join(subordinates_out_tee_3_pull.flatten()) + .map(|(msg, addr)| Some((addr, msg))) + .pull_to_push() + .push_to(p2_send_end_push), + ); + + // start the data flowing. + let mut hf = hf.build(); + // first populate the static subordinates into the flow + subordinates + .into_iter() + .map(Some) + .for_each(|x| subordinates_in.give(x)); + subordinates_in.flush(); + hf.run_async().await.unwrap(); +} diff --git a/hydroflow/examples/two_pc/main.rs b/hydroflow/examples/two_pc/main.rs new file mode 100644 index 00000000000..2f2d4ee30f3 --- /dev/null +++ b/hydroflow/examples/two_pc/main.rs @@ -0,0 +1,68 @@ +use clap::{ArgEnum, Parser}; +use coordinator::run_coordinator; +use hydroflow::tokio; +use serde::Deserialize; +use subordinate::run_subordinate; + +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::path::Path; + +mod coordinator; +mod protocol; +mod subordinate; + +/// This is a remedial 2PC implementation. + +#[derive(Clone, ArgEnum, Debug)] +enum Role { + Coordinator, + Subordinate, +} + +#[derive(Parser, Debug)] +struct Opts { + #[clap(long)] + path: String, + #[clap(arg_enum, long)] + role: Role, + #[clap(long)] + port: u16, + #[clap(long)] + addr: String, +} + +#[derive(Deserialize, Debug)] +struct Addresses { + coordinator: String, + subordinates: Vec, +} + +fn read_addresses_from_file>(path: P) -> Result> { + // Open the file in read-only mode with buffer. + let file = File::open(path)?; + let reader = BufReader::new(file); + + // Read the JSON contents of the file as an instance of `Subordinates`. + let u = serde_json::from_reader(reader)?; + + // Return the `Subordinates`. + Ok(u) +} + +#[tokio::main] +async fn main() { + let opts = Opts::parse(); + let path = Path::new(&opts.path); + let subordinates = read_addresses_from_file(path).unwrap().subordinates; + let coordinator = read_addresses_from_file(path).unwrap().coordinator; + match opts.role { + Role::Coordinator => { + run_coordinator(opts, subordinates).await; + } + Role::Subordinate => { + run_subordinate(opts, coordinator).await; + } + } +} diff --git a/hydroflow/examples/two_pc/members.json b/hydroflow/examples/two_pc/members.json new file mode 100644 index 00000000000..9aee3a470f8 --- /dev/null +++ b/hydroflow/examples/two_pc/members.json @@ -0,0 +1,8 @@ +{ + "coordinator": "localhost:12346", + "subordinates": [ + "localhost:12347", + "localhost:12348", + "localhost:12349" + ] +} \ No newline at end of file diff --git a/hydroflow/examples/two_pc/protocol.rs b/hydroflow/examples/two_pc/protocol.rs new file mode 100644 index 00000000000..12c8b18fd4d --- /dev/null +++ b/hydroflow/examples/two_pc/protocol.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Serialize}; + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash, Copy)] +pub enum MsgType { + Prepare, + Commit, + Abort, + AckP2, + End, + Ended, + Err, +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct CoordMsg { + pub xid: u16, + pub mid: u16, + pub mtype: MsgType, +} +/// Member Response +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct SubordResponse { + pub xid: u16, + pub mid: u16, + pub addr: String, + pub mtype: MsgType, +} diff --git a/hydroflow/examples/two_pc/subordinate.rs b/hydroflow/examples/two_pc/subordinate.rs new file mode 100644 index 00000000000..2043409d83e --- /dev/null +++ b/hydroflow/examples/two_pc/subordinate.rs @@ -0,0 +1,143 @@ +use crate::protocol::{CoordMsg, MsgType, SubordResponse}; +use crate::Opts; +use hydroflow::builder::prelude::*; +use hydroflow::scheduled::handoff::VecHandoff; + +use rand::Rng; +fn decide(odds: u8) -> bool { + let mut rng = rand::thread_rng(); + if rng.gen_range(0..100) > odds { + return false; + } else { + return true; + }; +} + +pub(crate) async fn run_subordinate(opts: Opts, coordinator: String) { + let mut hf = HydroflowBuilder::default(); + + // setup message send/recv ports + let msg_recv = hf + .hydroflow + .inbound_tcp_vertex_port::(opts.port) + .await; + let msg_recv = hf.wrap_input(msg_recv); + let msg_send = hf.hydroflow.outbound_tcp_vertex::().await; + let msg_send = hf.wrap_output(msg_send); + + // Demultiplex received messages into phase1, phase2, and end-of-phase2 flows + let (p1_recv_push, p1_recv_pull) = hf.make_edge::<_, VecHandoff, _>("p1 recv"); + let (p2_recv_push, p2_recv_pull) = hf.make_edge::<_, VecHandoff, _>("p2 recv"); + let (p2_end_recv_push, p2_end_recv_pull) = + hf.make_edge::<_, VecHandoff, _>("p2 end recv"); + hf.add_subgraph( + "demux", + msg_recv.flatten().pull_to_push().partition( + |m| m.mtype == MsgType::Prepare, + hf.start_tee().map(Some).push_to(p1_recv_push), + hf.start_tee().partition( + |m| m.mtype == MsgType::End, + hf.start_tee().map(Some).push_to(p2_end_recv_push), + hf.start_tee().map(Some).push_to(p2_recv_push), + ), + ), + ); + + // Multiplex messages to send + let (p1_send_push, p1_send_pull) = + hf.make_edge::<_, VecHandoff<(String, SubordResponse)>, _>("p1 send"); + let (p2_resp_push, p2_resp_pull) = + hf.make_edge::<_, VecHandoff<(String, SubordResponse)>, _>("p2 respond"); + hf.add_subgraph( + "mux", + p1_send_pull + .chain(p2_resp_pull) + .pull_to_push() + .push_to(msg_send), + ); + + // set up addressing. There has to be a nicer way to reuse strings! + let coord_addr = coordinator.clone(); + let coord_addr_2 = coordinator.clone(); + let my_addr = format!("{}:{}", opts.addr, opts.port); + let my_addr_2 = format!("{}:{}", opts.addr, opts.port); + let my_addr_3 = format!("{}:{}", opts.addr, opts.port); + + // Phase one (Prepare) request handling + // We flip a coin to decide if we will commit or abort + // and then send a phase-1 response to the coordinator + hf.add_subgraph( + "p1 request handler", + p1_recv_pull + .flatten() + .map(move |msg| { + println!("Xid {:?}: got a {:?}", msg.xid, msg.mtype); + let ret = SubordResponse { + xid: msg.xid, + mid: msg.mid + 1, + addr: my_addr.clone(), + mtype: match msg.mtype { + MsgType::Prepare if decide(67) => MsgType::Commit, + MsgType::Prepare => MsgType::Abort, + _ => MsgType::Err, + }, + }; + println!("Xid {:?}: returned a {:?}", ret.xid, ret.mtype); + Some((coord_addr.clone(), ret)) + }) + .pull_to_push() + .push_to(p1_send_push), + ); + + // Phase two (Commit/Abort) request handling + // We should log this, and then we respond with an AckP2 + hf.add_subgraph( + "p2 command handler", + p2_recv_pull + .flatten() + .map(move |msg| { + println!("Xid {:?}: got a {:?}", msg.xid, msg.mtype); + let ret = SubordResponse { + xid: msg.xid, + mid: msg.mid + 1, + addr: my_addr_2.clone(), + mtype: match msg.mtype { + MsgType::Abort | MsgType::Commit => MsgType::AckP2, + _ => MsgType::Err, + }, + }; + println!("Xid {:?}: returned a {:?}", ret.xid, ret.mtype); + Some((coord_addr_2.clone(), ret)) + }) + .pull_to_push() + .push_to(p2_resp_push), + ); + + // Phase three (End) request handling + // We should log this, and then we respond with Ended (allows coordinator to GC the transaction) + hf.add_subgraph( + "p2 end handler", + p2_end_recv_pull + .flatten() + .map(move |msg| { + println!("Xid {:?}: got a {:?}", msg.xid, msg.mtype); + let ret = SubordResponse { + xid: msg.xid, + mid: msg.mid + 1, + addr: my_addr_3.clone(), + mtype: match msg.mtype { + MsgType::End => MsgType::Ended, + _ => MsgType::Err, + }, + }; + println!("Xid {:?}: returned a {:?}", ret.xid, ret.mtype); + Some(ret) + }) + .pull_to_push() + .for_each(|m| println!("Logging final message {:?}", m.unwrap())), + ); + + let mut hf = hf.build(); + println!("Opening on port {}", opts.port); + hf.run_async().await.unwrap(); +}