Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent log #10

Merged
merged 83 commits into from
Apr 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
f5aebf0
Start saving every incoming record to a log in Base types.
jmftrindade Feb 28, 2017
9dfeef4
Address comments on persistent-log code.
jmftrindade Mar 1, 2017
899255b
Address a few more comments.
jmftrindade Mar 1, 2017
8e145d9
Nits.
jmftrindade Mar 1, 2017
63df9d5
Configurable durability levels for base nodes
ms705 Mar 5, 2017
90e34cd
Set base durability levels correctly in tests
ms705 Mar 5, 2017
7dc6059
Use buf_redux BufWriter with 4k buffer and WhenFull strategy.
jmftrindade Mar 6, 2017
989b3ed
cherrypick commit c04e67f55b657b171da42287e005584e65f7d268
jonhoo Mar 8, 2017
7f53514
date on durable log filename, and stub out where to save global addre…
jmftrindade Mar 13, 2017
7d54e7a
delete log file on Base Drop
jmftrindade Mar 13, 2017
bcd716c
add tiny unit tests to Base to help with investigating why Base is no…
jmftrindade Mar 13, 2017
595c108
Start saving every incoming record to a log in Base types.
jmftrindade Feb 28, 2017
20ebe28
Address comments on persistent-log code.
jmftrindade Mar 1, 2017
1dac639
Address a few more comments.
jmftrindade Mar 1, 2017
0b9ca62
Nits.
jmftrindade Mar 1, 2017
ef4ad8e
Configurable durability levels for base nodes
ms705 Mar 5, 2017
ed5e44f
Set base durability levels correctly in tests
ms705 Mar 5, 2017
f7ccf34
Use buf_redux BufWriter with 4k buffer and WhenFull strategy.
jmftrindade Mar 6, 2017
0e86c5f
cherrypick commit c04e67f55b657b171da42287e005584e65f7d268
jonhoo Mar 8, 2017
f329d2c
date on durable log filename, and stub out where to save global addre…
jmftrindade Mar 13, 2017
495f26c
delete log file on Base Drop
jmftrindade Mar 13, 2017
7fff86c
add tiny unit tests to Base to help with investigating why Base is no…
jmftrindade Mar 13, 2017
015fb1a
rebase stuff
jmftrindade Mar 15, 2017
71aace5
timekeeper on Mac OS X
jmftrindade Mar 15, 2017
d206961
rebase
jmftrindade Mar 15, 2017
63d8264
rebase master
jmftrindade Mar 15, 2017
0af6305
custom Drop impl for Domain
jmftrindade Mar 17, 2017
e341076
merge master into branch
jmftrindade Mar 17, 2017
6c96dd0
manual merge conflict resolution: remove incorrectly added dupe impl …
jmftrindade Mar 17, 2017
40fe135
impl `Drop` does not work with `[cfg(test)]`
ms705 Mar 19, 2017
08f18c4
Change on_input api to optionally return Records.
jmftrindade Mar 20, 2017
dab2110
Add force-write for buffered durability (WIP).
jmftrindade Mar 21, 2017
fc3067d
Remove unnecessary code dupe. Still have potentially unnecessary clo…
jmftrindade Mar 22, 2017
dbeed9e
Use BaseDurabilityLevel::None in integration tests.
jmftrindade Mar 22, 2017
aeb9b25
Add tests for sync immediately and buffered durability levels.
jmftrindade Mar 22, 2017
1f97159
Add tests for sync_immediately and buffered durability levels.
jmftrindade Mar 22, 2017
98b4c16
Correctly deal with column aliases
ms705 Mar 21, 2017
113ef03
Merge master.
jmftrindade Mar 22, 2017
c9472d4
Merge branch 'master' into persistent-log to pickup nom_sql fixes.
jmftrindade Mar 22, 2017
348fb38
Return Records instead of Option<Records> from MockGraph::one in ops …
jmftrindade Mar 22, 2017
c928130
(nit) unused var on buffered durability test.
jmftrindade Mar 22, 2017
5b0cead
Merge branch 'master' into persistent-log
jmftrindade Mar 22, 2017
7c5b204
Add base log cleanup on drop.
jmftrindade Mar 24, 2017
6e6e2c6
Merge branch master into persistent-log
jmftrindade Mar 24, 2017
4c69bc6
Add interval-based flush to Base durable log.
jmftrindade Mar 25, 2017
1726fe6
Increase `SETTLE_TIME` for durable log tests
ms705 Mar 25, 2017
d379789
Add flush method to Base nodes.
jmftrindade Apr 3, 2017
556ea74
Merge branch 'persistent-log' of github.com:mit-pdos/distributary int…
jmftrindade Apr 3, 2017
f73884a
Merge branch 'master' into persistent-log
jmftrindade Apr 3, 2017
71c99fd
Manually merged files from merge branch master into persistent-log.
jmftrindade Apr 3, 2017
53810ef
Use None as default base durability level.
jmftrindade Apr 4, 2017
e6f97e0
Merge branch 'master' into persistent-log
jmftrindade Apr 4, 2017
f27031b
Merge branch 'master' into persistent-log
jmftrindade Apr 6, 2017
0e5e49d
Merge branch 'master' into persistent-log
jmftrindade Apr 11, 2017
fdef1ef
Cleanup before merge: remove unused set_global_address method, and ca…
jmftrindade Apr 11, 2017
a8bce37
Remove unnecessary files from previous merges.
jmftrindade Apr 11, 2017
53661ea
Use optional durability, and provide a new_durable ctor on Base.
jmftrindade Apr 11, 2017
7131374
Revert formatting and otherwise silly changes.
jmftrindade Apr 11, 2017
5c74fdf
Revert another change, now that we have Base::new_durable.
jmftrindade Apr 11, 2017
ae90dc4
Revert formatting change from src/ops/mod.rs
jmftrindade Apr 11, 2017
0a0a48d
Address more comments.
jmftrindade Apr 12, 2017
fcb06ae
Address a few more comments.
jmftrindade Apr 12, 2017
9391c56
Address more comments: remove excessive cloning, and return None inst…
jmftrindade Apr 12, 2017
40655f2
Return Records instead of Option<Records> from Base::on_input.
jmftrindade Apr 12, 2017
191568e
Remove cloning from Base::flush.
jmftrindade Apr 13, 2017
c7c2fd3
Revert timekeeper change from Cargo.toml
jmftrindade Apr 13, 2017
ab6db4c
Avoid path unwrap on delete_durable_log.
jmftrindade Apr 13, 2017
5914147
Remove unnecessary as_path() calls.
jmftrindade Apr 13, 2017
8d08399
Merge branch 'master' into persistent-log
jonhoo Apr 13, 2017
94c5c20
Avoid excessive Cargo.lock changes
jonhoo Apr 13, 2017
6c36373
target should not be a symlink to itself
fintelia Apr 13, 2017
8f06306
Merge with master; use builders for with_key and with_durability.
jmftrindade Apr 13, 2017
574ac31
Merge branch 'master' into persistent-log
Apr 13, 2017
3e7fd41
Merge lock.
jmftrindade Apr 14, 2017
fcd59a6
Updates to bank benchmark to try and measure Buffered durability late…
jmftrindade Apr 14, 2017
9e70207
Buffered -> SyncImmediately on bank benchmark.
jmftrindade Apr 14, 2017
aa12b55
Merge branch 'master' into persistent-log
jmftrindade Apr 14, 2017
eda1d3c
Transactional Base nodes use SyncImmediately durability.
jmftrindade Apr 14, 2017
18ec1aa
(nit) tab -> space
jmftrindade Apr 14, 2017
36c449a
Merge branch 'master' into persistent-log
jmftrindade Apr 16, 2017
9a65e04
Update piazza benchmark.
jmftrindade Apr 16, 2017
921302e
Remove deprecated transactional_maintain from tests (why did the merg…
jmftrindade Apr 16, 2017
3f41330
Use 1000 accounts and single client-thread on bank benchmark, so we g…
jmftrindade Apr 16, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ web = ["rustful", "rustc-serialize"]
b_memcached = ["memcached-rs"]
b_mysql = ["mysql"]
b_mssql = ["futures", "futures-state-stream", "tiberius", "tokio-core"]
b_netsoup = ["futures", "tokio-core", "tarpc", "tarpc-plugins", "serde", "serde_derive"]
b_netsoup = ["futures", "tokio-core", "tarpc", "tarpc-plugins"]
b_hybrid = ["mysql", "memcached-rs"]
default = ["web", "b_netsoup", "b_memcached", "b_mysql", "b_mssql", "b_hybrid"]
profiling = ["timekeeper/default"]
Expand All @@ -20,6 +20,8 @@ itertools = "0.5"
petgraph = "0.4"
regex = "0.1"
fnv = "1.0"
serde = "0.9"
serde_derive = "0.9"
slog = "1.5.2"
#slog = { version = "1.5.2", features = ["max_level_trace", "release_max_level_debug"] }
slog-term = "1.5.0"
Expand Down Expand Up @@ -51,12 +53,16 @@ futures = { version ="0.1.9", optional = true }
tokio-core = { version = "0.1", optional = true }
tarpc = {git="https://github.com/ms705/tarpc.git", branch = "hack", optional = true}
tarpc-plugins = { git = "https://github.com/ms705/tarpc", branch = "hack", optional = true }
serde = { version = "0.9", optional = true }
serde_derive = { version = "0.9", optional = true }

# for web
rustc-serialize = { version = "0.3", optional = true }

# durable log
snowflake = "1.2.0"
serde_json = "0.9.8"
buf_redux = "0.6.1"
time = "0.1"

[dependencies.rustful]
version = "0.9"
default-features = false
Expand Down
14 changes: 11 additions & 3 deletions benchmarks/bank/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time;

use std::collections::HashMap;

use distributary::{Blender, Base, Aggregation, Join, JoinType, Datas, DataType, Token, Mutator};
use distributary::{Blender, Base, BaseDurabilityLevel, Aggregation, Join, JoinType, Datas, DataType, Token, Mutator};

use rand::Rng;

Expand Down Expand Up @@ -58,9 +58,10 @@ pub fn setup(num_putters: usize) -> Box<Bank> {
let mut mig = g.start_migration();

// add transfers base table
let d = BaseDurabilityLevel::SyncImmediately; // Buffered makes assert on getter fail.
transfers = mig.add_transactional_base("transfers",
&["src_acct", "dst_acct", "amount"],
Base::default());
Base::default().with_durability(d));

// add all debits
debits = mig.add_ingredient("debits",
Expand Down Expand Up @@ -317,10 +318,13 @@ fn client(i: usize,
let rl: u64 = read_latencies.iter().sum();
let wl: u64 = write_latencies.iter().sum();
let sl: u64 = settle_latencies.iter().sum();
let wsl: u64 = write_start_to_txn_end_latencies.iter().sum();

let n = write_latencies.len() as f64;
println!("read latency: {:.3} μs", rl as f64 / n * 0.001);
println!("write latency: {:.3} μs", wl as f64 / n * 0.001);
println!("settle latency: {:.3} μs", sl as f64 / n * 0.001);
println!("write + settle latency: {:.3} μs", wsl as f64 / n * 0.001);

let mut latencies_hist = Histogram::<i64>::new_with_bounds(10, 10000000, 4).unwrap();
for sample_nanos in write_start_to_txn_end_latencies {
Expand Down Expand Up @@ -417,7 +421,7 @@ fn main() {
};

// let system settle
// thread::sleep(time::Duration::new(1, 0));
// thread::sleep(time::Duration::new(2, 0));
let start = time::Instant::now();

// benchmark
Expand All @@ -432,6 +436,10 @@ fn main() {

if i == 0 {
populate(naccounts, &mut transfers_put);

//println!("Let those accounts settle...");
//thread::sleep(time::Duration::new(2, 0));
//println!("Done!");
}

thread::Builder::new()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/piazza/piazza.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Piazza {
// add a post base table
post = mig.add_ingredient("post",
&["pid", "cid", "author", "content"],
Base::with_key(vec![1], vec![]));
Base::default().with_key(vec![1]));

// add a class base table
class = mig.add_ingredient("class", &["cid", "classname"], Base::default());
Expand Down
7 changes: 3 additions & 4 deletions src/flow/core/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use std::fmt;
///
/// Having this be an enum allows for our code to be agnostic about the types of user data except
/// when type information is specifically necessary.
#[derive(Eq, PartialOrd, Ord, Hash, Debug, Clone)]
#[cfg_attr(feature="b_netsoup", derive(Serialize, Deserialize))]
#[derive(Eq, PartialOrd, Ord, Hash, Debug, Clone, Serialize, Deserialize)]
pub enum DataType {
/// An empty value.
None,
Expand Down Expand Up @@ -195,7 +194,7 @@ impl fmt::Display for DataType {
}

/// A record is a single positive or negative data record with an associated time stamp.
#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub enum Record {
Positive(sync::Arc<Vec<DataType>>),
Negative(sync::Arc<Vec<DataType>>),
Expand Down Expand Up @@ -321,7 +320,7 @@ impl<'a> IntoIterator for &'a Records {
/// Represents a set of records returned from a query.
pub type Datas = Vec<Vec<DataType>>;

#[derive(Clone, Default, PartialEq, Debug)]
#[derive(Clone, Default, PartialEq, Debug, Serialize, Deserialize)]
pub struct Records(Vec<Record>);

impl Deref for Records {
Expand Down
7 changes: 0 additions & 7 deletions src/flow/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,10 +1478,3 @@ impl Domain {
.unwrap()
}
}

#[cfg(test)]
impl Drop for Domain {
fn drop(&mut self) {
//println!("Dropping Domain!")
}
}
11 changes: 4 additions & 7 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use petgraph;
use petgraph::graph::NodeIndex;
use checktable;
use ops::base::Base;
use ops::base::{Base, BaseDurabilityLevel};
use vec_map::VecMap;

use std::sync::mpsc;
Expand Down Expand Up @@ -593,7 +593,9 @@ impl<'a> Migration<'a> {
S2: ToString,
FS: IntoIterator<Item = S2>
{
let mut i: node::Type = b.into();
// transactions require a base node with SyncImmediately durability
let d = BaseDurabilityLevel::SyncImmediately;
let mut i: node::Type = b.with_durability(d).into();
i.on_connected(&self.mainline.ingredients);

// add to the graph
Expand Down Expand Up @@ -1109,18 +1111,13 @@ impl<'a> Migration<'a> {

impl Drop for Blender {
fn drop(&mut self) {
//println!("Blender started dropping.");

for (_, tx) in &mut self.txs {
// don't unwrap, because given domain may already have terminated
drop(tx.send(payload::Packet::Quit));
}
for d in self.domains.drain(..) {
//println!("Waiting for domain thread to join.");
d.join().unwrap();
}

//println!("Blender is done dropping.")
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ extern crate vec_map;
#[cfg(feature="b_netsoup")]
extern crate tokio_core;

extern crate buf_redux;
extern crate serde_json;
extern crate snowflake;
extern crate time;

mod backlog;
mod checktable;
mod flow;
Expand All @@ -371,7 +376,7 @@ pub use flow::{Blender, Migration, Mutator};
pub use flow::core::{NodeAddress, DataType, Datas};
pub use flow::node::StreamUpdate;
pub use flow::domain::Index;
pub use ops::base::Base;
pub use ops::base::{Base, BaseDurabilityLevel};
pub use ops::grouped::aggregate::{Aggregator, Aggregation};
pub use ops::grouped::concat::{GroupConcat, TextComponent};
pub use ops::grouped::extremum::{Extremum, ExtremumOperator};
Expand Down
2 changes: 1 addition & 1 deletion src/mir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ fn make_base_node(name: &str,
.collect();
mig.add_ingredient(name,
column_names.as_slice(),
ops::base::Base::with_key(pkey_column_ids, vec![]))
ops::base::Base::new(vec![]).with_key(pkey_column_ids))
} else {
mig.add_ingredient(name, column_names.as_slice(), ops::base::Base::new(vec![]))
};
Expand Down
Loading