Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent log #10

Merged
merged 83 commits into from
Apr 16, 2017
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
f5aebf0
Start saving every incoming record to a log in Base types.
jmftrindade Feb 28, 2017
9dfeef4
Address comments on persistent-log code.
jmftrindade Mar 1, 2017
899255b
Address a few more comments.
jmftrindade Mar 1, 2017
8e145d9
Nits.
jmftrindade Mar 1, 2017
63df9d5
Configurable durability levels for base nodes
ms705 Mar 5, 2017
90e34cd
Set base durability levels correctly in tests
ms705 Mar 5, 2017
7dc6059
Use buf_redux BufWriter with 4k buffer and WhenFull strategy.
jmftrindade Mar 6, 2017
989b3ed
cherrypick commit c04e67f55b657b171da42287e005584e65f7d268
jonhoo Mar 8, 2017
7f53514
date on durable log filename, and stub out where to save global addre…
jmftrindade Mar 13, 2017
7d54e7a
delete log file on Base Drop
jmftrindade Mar 13, 2017
bcd716c
add tiny unit tests to Base to help with investigating why Base is no…
jmftrindade Mar 13, 2017
595c108
Start saving every incoming record to a log in Base types.
jmftrindade Feb 28, 2017
20ebe28
Address comments on persistent-log code.
jmftrindade Mar 1, 2017
1dac639
Address a few more comments.
jmftrindade Mar 1, 2017
0b9ca62
Nits.
jmftrindade Mar 1, 2017
ef4ad8e
Configurable durability levels for base nodes
ms705 Mar 5, 2017
ed5e44f
Set base durability levels correctly in tests
ms705 Mar 5, 2017
f7ccf34
Use buf_redux BufWriter with 4k buffer and WhenFull strategy.
jmftrindade Mar 6, 2017
0e86c5f
cherrypick commit c04e67f55b657b171da42287e005584e65f7d268
jonhoo Mar 8, 2017
f329d2c
date on durable log filename, and stub out where to save global addre…
jmftrindade Mar 13, 2017
495f26c
delete log file on Base Drop
jmftrindade Mar 13, 2017
7fff86c
add tiny unit tests to Base to help with investigating why Base is no…
jmftrindade Mar 13, 2017
015fb1a
rebase stuff
jmftrindade Mar 15, 2017
71aace5
timekeeper on Mac OS X
jmftrindade Mar 15, 2017
d206961
rebase
jmftrindade Mar 15, 2017
63d8264
rebase master
jmftrindade Mar 15, 2017
0af6305
custom Drop impl for Domain
jmftrindade Mar 17, 2017
e341076
merge master into branch
jmftrindade Mar 17, 2017
6c96dd0
manual merge conflict resolution: remove incorrectly added dupe impl …
jmftrindade Mar 17, 2017
40fe135
impl `Drop` does not work with `[cfg(test)]`
ms705 Mar 19, 2017
08f18c4
Change on_input api to optionally return Records.
jmftrindade Mar 20, 2017
dab2110
Add force-write for buffered durability (WIP).
jmftrindade Mar 21, 2017
fc3067d
Remove unnecessary code dupe. Still have potentially unnecessary clo…
jmftrindade Mar 22, 2017
dbeed9e
Use BaseDurabilityLevel::None in integration tests.
jmftrindade Mar 22, 2017
aeb9b25
Add tests for sync immediately and buffered durability levels.
jmftrindade Mar 22, 2017
1f97159
Add tests for sync_immediately and buffered durability levels.
jmftrindade Mar 22, 2017
98b4c16
Correctly deal with column aliases
ms705 Mar 21, 2017
113ef03
Merge master.
jmftrindade Mar 22, 2017
c9472d4
Merge branch 'master' into persistent-log to pickup nom_sql fixes.
jmftrindade Mar 22, 2017
348fb38
Return Records instead of Option<Records> from MockGraph::one in ops …
jmftrindade Mar 22, 2017
c928130
(nit) unused var on buffered durability test.
jmftrindade Mar 22, 2017
5b0cead
Merge branch 'master' into persistent-log
jmftrindade Mar 22, 2017
7c5b204
Add base log cleanup on drop.
jmftrindade Mar 24, 2017
6e6e2c6
Merge branch master into persistent-log
jmftrindade Mar 24, 2017
4c69bc6
Add interval-based flush to Base durable log.
jmftrindade Mar 25, 2017
1726fe6
Increase `SETTLE_TIME` for durable log tests
ms705 Mar 25, 2017
d379789
Add flush method to Base nodes.
jmftrindade Apr 3, 2017
556ea74
Merge branch 'persistent-log' of github.com:mit-pdos/distributary int…
jmftrindade Apr 3, 2017
f73884a
Merge branch 'master' into persistent-log
jmftrindade Apr 3, 2017
71c99fd
Manually merged files from merge branch master into persistent-log.
jmftrindade Apr 3, 2017
53810ef
Use None as default base durability level.
jmftrindade Apr 4, 2017
e6f97e0
Merge branch 'master' into persistent-log
jmftrindade Apr 4, 2017
f27031b
Merge branch 'master' into persistent-log
jmftrindade Apr 6, 2017
0e5e49d
Merge branch 'master' into persistent-log
jmftrindade Apr 11, 2017
fdef1ef
Cleanup before merge: remove unused set_global_address method, and ca…
jmftrindade Apr 11, 2017
a8bce37
Remove unnecessary files from previous merges.
jmftrindade Apr 11, 2017
53661ea
Use optional durability, and provide a new_durable ctor on Base.
jmftrindade Apr 11, 2017
7131374
Revert formatting and otherwise silly changes.
jmftrindade Apr 11, 2017
5c74fdf
Revert another change, now that we have Base::new_durable.
jmftrindade Apr 11, 2017
ae90dc4
Revert formatting change from src/ops/mod.rs
jmftrindade Apr 11, 2017
0a0a48d
Address more comments.
jmftrindade Apr 12, 2017
fcb06ae
Address a few more comments.
jmftrindade Apr 12, 2017
9391c56
Address more comments: remove excessive cloning, and return None inst…
jmftrindade Apr 12, 2017
40655f2
Return Records instead of Option<Records> from Base::on_input.
jmftrindade Apr 12, 2017
191568e
Remove cloning from Base::flush.
jmftrindade Apr 13, 2017
c7c2fd3
Revert timekeeper change from Cargo.toml
jmftrindade Apr 13, 2017
ab6db4c
Avoid path unwrap on delete_durable_log.
jmftrindade Apr 13, 2017
5914147
Remove unnecessary as_path() calls.
jmftrindade Apr 13, 2017
8d08399
Merge branch 'master' into persistent-log
jonhoo Apr 13, 2017
94c5c20
Avoid excessive Cargo.lock changes
jonhoo Apr 13, 2017
6c36373
target should not be a symlink to itself
fintelia Apr 13, 2017
8f06306
Merge with master; use builders for with_key and with_durability.
jmftrindade Apr 13, 2017
574ac31
Merge branch 'master' into persistent-log
Apr 13, 2017
3e7fd41
Merge lock.
jmftrindade Apr 14, 2017
fcd59a6
Updates to bank benchmark to try and measure Buffered durability late…
jmftrindade Apr 14, 2017
9e70207
Buffered -> SyncImmediately on bank benchmark.
jmftrindade Apr 14, 2017
aa12b55
Merge branch 'master' into persistent-log
jmftrindade Apr 14, 2017
eda1d3c
Transactional Base nodes use SyncImmediately durability.
jmftrindade Apr 14, 2017
18ec1aa
(nit) tab -> space
jmftrindade Apr 14, 2017
36c449a
Merge branch 'master' into persistent-log
jmftrindade Apr 16, 2017
9a65e04
Update piazza benchmark.
jmftrindade Apr 16, 2017
921302e
Remove deprecated transactional_maintain from tests (why did the merg…
jmftrindade Apr 16, 2017
3f41330
Use 1000 accounts and single client-thread on bank benchmark, so we g…
jmftrindade Apr 16, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 69 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ b_memcached = ["memcached-rs"]
b_postgresql = ["postgres"]
b_mysql = ["mysql"]
b_mssql = ["futures", "futures-state-stream", "tiberius", "tokio-core"]
b_netsoup = ["futures", "tokio-core", "tarpc", "tarpc-plugins", "serde", "serde_derive"]
b_netsoup = ["futures", "tokio-core", "tarpc", "tarpc-plugins"]
b_hybrid = ["mysql", "memcached-rs"]
default = ["web", "b_netsoup", "b_memcached", "b_mysql", "b_mssql", "b_hybrid"]
default = ["web", "b_netsoup", "b_memcached", "b_mysql", "b_mssql", "b_hybrid", "serde", "serde_derive"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point you can probably just make serde and serde_derive be non-optional dependencies, and then not list them under default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

profiling = ["timekeeper/default"]
binaries = ["default"]

Expand Down Expand Up @@ -59,6 +59,12 @@ serde_derive = { version = "0.9", optional = true }
# for web
rustc-serialize = { version = "0.3", optional = true }

# durable log
snowflake = "1.2.0"
serde_json = "0.9.8"
buf_redux = "0.6.1"
time = "0.1"

[dependencies.rustful]
version = "0.9"
default-features = false
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/piazza/piazza.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::fs::{OpenOptions, File};
use std::io::Write;
use slog::DrainExt;

use distributary::{DataType, Join, JoinType, Blender, Base, NodeAddress, Filter, Mutator, Index};
use distributary::{DataType, Join, JoinType, Blender, Base, BaseDurabilityLevel, NodeAddress, Filter, Mutator, Index};

pub struct Piazza {
pub soup: Blender,
Expand Down Expand Up @@ -53,7 +53,7 @@ impl Piazza {
// add a post base table
post = mig.add_ingredient("post",
&["pid", "cid", "author", "content"],
Base::new(vec![1]));
Base::new(vec![1], BaseDurabilityLevel::None));

// add a class base table
class = mig.add_ingredient("class", &["cid", "classname"], Base::default());
Expand Down
7 changes: 3 additions & 4 deletions src/flow/core/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use std::fmt;
///
/// Having this be an enum allows for our code to be agnostic about the types of user data except
/// when type information is specifically necessary.
#[derive(Eq, PartialOrd, Ord, Hash, Debug, Clone)]
#[cfg_attr(feature="b_netsoup", derive(Serialize, Deserialize))]
#[derive(Eq, PartialOrd, Ord, Hash, Debug, Clone, Serialize, Deserialize)]
pub enum DataType {
/// An empty value.
None,
Expand Down Expand Up @@ -184,7 +183,7 @@ impl fmt::Display for DataType {
}

/// A record is a single positive or negative data record with an associated time stamp.
#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub enum Record {
Positive(sync::Arc<Vec<DataType>>),
Negative(sync::Arc<Vec<DataType>>),
Expand Down Expand Up @@ -293,7 +292,7 @@ impl IntoIterator for Records {
/// Represents a set of records returned from a query.
pub type Datas = Vec<Vec<DataType>>;

#[derive(Clone, Default, PartialEq, Debug)]
#[derive(Clone, Default, PartialEq, Debug, Serialize, Deserialize)]
pub struct Records(Vec<Record>);

impl Deref for Records {
Expand Down
2 changes: 1 addition & 1 deletion src/flow/core/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub trait Ingredient
data: prelude::Records,
domain: &prelude::DomainNodes,
states: &prelude::StateMap)
-> prelude::Records;
-> Option<prelude::Records>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agh, that's going to be annoying to incorporate into partial-materialization, but I guess it's necessary here.


fn can_query_through(&self) -> bool {
false
Expand Down
7 changes: 0 additions & 7 deletions src/flow/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,3 @@ impl Domain {
.unwrap()
}
}

#[cfg(test)]
impl Drop for Domain {
fn drop(&mut self) {
//println!("Dropping Domain!")
}
}
2 changes: 1 addition & 1 deletion src/flow/domain/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl NodeDescriptor {
}
flow::node::Type::Internal(ref mut i) => {
let from = m.link().src;
m.map_data(|data| i.on_input(from, data, nodes, state));
m.map_data(|data| i.on_input(from, data, nodes, state).unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this unwrap() okay? Can't a Base be processed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base::on_input is returning Option now, hence the unwrap(). Should we have a match here instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, unwrap indicates that you believe on_input can never return None in this context. Why is that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because ::on_input never actually returned None, only Some(empty_records_vector) when buffered writes weren't propagated downstream. Updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if never returns None, why does it need to return an Option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

materialize(m.data(), state.get_mut(&addr));
m
}
Expand Down
7 changes: 1 addition & 6 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ impl<'a> Migration<'a> {
self.mainline.ingredients.add_edge(*parent.as_global(), ni, false);
}
}
// and tell the caller its id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted

ni.into()
}

Expand Down Expand Up @@ -889,18 +889,13 @@ impl<'a> Migration<'a> {

impl Drop for Blender {
fn drop(&mut self) {
//println!("Blender started dropping.");

for (_, tx) in &mut self.txs {
// don't unwrap, because given domain may already have terminated
drop(tx.send(payload::Packet::Quit));
}
for d in self.domains.drain(..) {
//println!("Waiting for domain thread to join.");
d.join().unwrap();
}

//println!("Blender is done dropping.")
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/flow/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub enum Packet {
parents: Vec<LocalNodeIndex>,
},

/// Instruct base nodes to flush any buffered writes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cut this entirely for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

//FlushBufferedWrites,

/// Set up a fresh, empty state for a node, indexed by a particular column.
///
/// This is done in preparation of a subsequent state replay.
Expand Down
10 changes: 8 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@
//! # let mut g = Blender::new();
//! # let article = {
//! # let mut mig = g.start_migration();
//! # let article = mig.add_ingredient("article", &["id", "title"], Base::default());
//! # let article = mig.add_ingredient("article", &["id", "title"],
//! # Base::default());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted

//! # mig.commit();
//! # article
//! # };
Expand Down Expand Up @@ -356,6 +357,11 @@ extern crate futures;
#[cfg(feature="b_netsoup")]
extern crate tokio_core;

extern crate buf_redux;
extern crate serde_json;
extern crate snowflake;
extern crate time;

mod backlog;
mod checktable;
mod flow;
Expand All @@ -369,7 +375,7 @@ pub use flow::{Blender, Migration, Mutator};
pub use flow::core::{NodeAddress, DataType, Datas};
pub use flow::node::StreamUpdate;
pub use flow::domain::Index;
pub use ops::base::Base;
pub use ops::base::{Base, BaseDurabilityLevel};
pub use ops::grouped::aggregate::{Aggregator, Aggregation};
pub use ops::grouped::concat::{GroupConcat, TextComponent};
pub use ops::grouped::extremum::{Extremum, ExtremumOperator};
Expand Down
3 changes: 2 additions & 1 deletion src/mir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,8 @@ fn make_base_node(name: &str,
.collect();
mig.add_ingredient(name,
column_names.as_slice(),
ops::base::Base::new(pkey_column_ids))
ops::base::Base::new(pkey_column_ids,
ops::base::BaseDurabilityLevel::None))
} else {
mig.add_ingredient(name, column_names.as_slice(), ops::base::Base::default())
};
Expand Down
Loading