Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join from an existing snapshot #1532

Merged
merged 69 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
7d8a619
Champ correct size
Jul 16, 2020
6dd9e0b
Snapshot from raft
Jul 17, 2020
2b283c2
Generate snapshots and store to disk
Jul 17, 2020
a3705e2
Snapshot protocol WIP
Jul 20, 2020
af0a2e2
Merge remote-tracking branch 'upstream/master' into generate_snapshot
Aug 13, 2020
0b020c9
ledger max chunk -> ledger min chunk
Aug 14, 2020
2158750
Snapshots are written to disk
Aug 14, 2020
12e0158
Snapshotter returns snapshot version to Raft
Aug 14, 2020
09428ee
Fix unit tests
Aug 14, 2020
0b965de
Merge remote-tracking branch 'upstream/master' into generate_snapshot
Aug 14, 2020
bcc8b31
Format
Aug 14, 2020
9a8aa43
black
Aug 14, 2020
f4683e0
SnaPshotter
Aug 18, 2020
432675c
Unsigned idx
Aug 18, 2020
e2d519c
Merge branch 'master' into generate_snapshot
jumaffre Aug 18, 2020
33b4cb1
snapshot_min_tx -> snapshot_max_tx
Aug 18, 2020
40d0712
Merge branch 'generate_snapshot' of github.com:jumaffre/CCF into gene…
Aug 18, 2020
8920fe2
Remove type of Tmsg when adding task
Aug 18, 2020
07e3b76
And the other half...
Aug 18, 2020
a6acae5
Merge branch 'generate_snapshot' into async_snapshot_generation
Aug 18, 2020
7e24cc2
Snapshot generation is async
Aug 18, 2020
143f3a8
Commit snapshot evidence
Aug 19, 2020
ec87487
Merge remote-tracking branch 'upstream/master' into async_snapshot_ge…
Aug 19, 2020
a07e6b0
Add snapshot idx to evidence table
Aug 19, 2020
e33694c
Split snapshot generation and serialisation
Aug 19, 2020
426b580
Format
Aug 19, 2020
e83b546
Merge branch 'master' into async_snapshot_generation
jumaffre Aug 19, 2020
e2332e3
Actually remove last_snapshot_idx from Raft
Aug 19, 2020
ea7dcef
snapsot evidence singular
Aug 19, 2020
bc9b328
Merge branch 'async_snapshot_generation' of github.com:jumaffre/CCF i…
Aug 19, 2020
b6ab777
Merge branch 'master' into async_snapshot_generation
jumaffre Aug 19, 2020
21908b2
Format
Aug 19, 2020
8d743a1
Merge branch 'async_snapshot_generation' of github.com:jumaffre/CCF i…
Aug 19, 2020
0ddfdfc
WIP
Aug 19, 2020
f731498
Support for local hooks from snapshot
Aug 19, 2020
b0bb32f
Add unit test for deletion
Aug 19, 2020
ff8666a
Merge remote-tracking branch 'upstream/master' into snapshot_hooks
Aug 19, 2020
b7175e1
Global hooks too
Aug 19, 2020
271a679
Merge branch 'master' into snapshot_hooks
jumaffre Aug 19, 2020
2a5b51c
Merge branch 'master' into snapshot_hooks
jumaffre Aug 20, 2020
7407333
Merge branch 'master' into snapshot_hooks
jumaffre Aug 20, 2020
e13693c
Merge branch 'master' into snapshot_hooks
jumaffre Aug 21, 2020
e12e5d4
WIP python infra
Aug 24, 2020
e4e9c3d
Merge remote-tracking branch 'upstream/master' into join_from_snapshot
Aug 24, 2020
43c60c8
e2e infra works
Aug 24, 2020
ef23253
Works with hack around term history
Aug 24, 2020
4d5a84c
Fix to have same ledgers on all nodes
Aug 24, 2020
ecaf307
Fix term history bug
Aug 25, 2020
d436dc6
WIP
Aug 25, 2020
e8da9d8
Set the commit idx when restoring snapshot
Aug 25, 2020
b0fba41
Cleanup snapshot application on join
Aug 25, 2020
a36c144
Add test suite for snapshots
Aug 25, 2020
359f418
Cleanup before PR
Aug 25, 2020
c8df8f3
Format
Aug 26, 2020
56ce322
Merge branch 'master' into join_from_snapshot
jumaffre Aug 26, 2020
5905657
VERBOSE LOGGING (to revert)
Aug 26, 2020
a9a2eeb
Merge branch 'join_from_snapshot' of github.com:jumaffre/CCF into joi…
Aug 26, 2020
87594cc
Fix dangling reference issue
Aug 26, 2020
0d55f1e
Fix raft unit test (view)
Aug 26, 2020
9e53fce
Merge branch 'master' into join_from_snapshot
jumaffre Aug 26, 2020
100e21b
Quiet
Aug 26, 2020
b709168
Merge branch 'join_from_snapshot' of github.com:jumaffre/CCF into joi…
Aug 26, 2020
4be1eed
Merge branch 'master' into join_from_snapshot
jumaffre Aug 26, 2020
7a528e4
Update src/host/main.cpp
jumaffre Aug 27, 2020
a723675
Update src/host/snapshot.h
jumaffre Aug 27, 2020
fc7690e
Merge remote-tracking branch 'upstream/master' into join_from_snapshot
Aug 27, 2020
7296be7
Oops
Aug 27, 2020
28ec74f
Format
Aug 27, 2020
4a6aabe
Fixup
Aug 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,23 @@ if(BUILD_TESTS)
4000
)

add_e2e_test(
NAME snapshots_test_suite
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
CONSENSUS raft
LABEL suite
ADDITIONAL_ARGS
--test-duration
150
--enforce-reqs
--test-suite
snapshots
--raft-election-timeout
4000
--snapshot-tx-interval
5
)

add_e2e_test(
NAME full_test_suite
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
Expand Down Expand Up @@ -571,13 +588,14 @@ if(BUILD_TESTS)
NAME reconfiguration_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py
CONSENSUS raft
ADDITIONAL_ARGS --raft-election-timeout 4000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lowering the raft election timeout here (default is 100s) as the reconfiguration test now also waits for a new election to complete.

)

add_e2e_test(
NAME reconfiguration_snapshot_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py
CONSENSUS raft
ADDITIONAL_ARGS --snapshot-tx-interval 10
ADDITIONAL_ARGS --snapshot-tx-interval 10 --raft-election-timeout 4000
)

add_e2e_test(
Expand Down
10 changes: 10 additions & 0 deletions src/consensus/ledger_enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,15 @@ namespace consensus
{
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_commit, to_host, idx);
}

/**
* Initialise ledger at a given index (e.g. after a snapshot)
*
* @param idx Index to start ledger from
*/
void init(Index idx)
{
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_init, to_host, idx);
}
};
}
2 changes: 2 additions & 0 deletions src/consensus/ledger_enclave_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace consensus
DEFINE_RINGBUFFER_MSG_TYPE(ledger_append),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_commit),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_init),

/// Create a new snapshot. Enclave -> Host
DEFINE_RINGBUFFER_MSG_TYPE(ledger_snapshot),
Expand All @@ -45,6 +46,7 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_no_entry,
consensus::Index,
consensus::LedgerRequestPurpose);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_init, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_append,
bool /* committable */,
Expand Down
23 changes: 22 additions & 1 deletion src/consensus/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace raft
std::vector<Index> terms;

public:
static constexpr Term InvalidTerm = 0;
static constexpr Term InvalidTerm = ccf::VIEW_UNKNOWN;
jumaffre marked this conversation as resolved.
Show resolved Hide resolved

void initialise(const std::vector<Index>& terms_)
{
Expand Down Expand Up @@ -56,7 +56,9 @@ namespace raft
}

for (int64_t i = terms.size(); i < term; ++i)
{
terms.push_back(idx);
}
LOG_DEBUG_FMT("Resulting terms: {}", fmt::join(terms, ", "));
}

Expand All @@ -66,7 +68,9 @@ namespace raft

// Indices before the index of the first term are unknown
if (it == terms.begin())
{
return InvalidTerm;
}

return (it - terms.begin());
}
Expand Down Expand Up @@ -271,6 +275,23 @@ namespace raft
become_leader();
}

void init_as_follower(Index index, Term term)
{
// This should only be called when the node resumes from a snapshot and
// before it has received any append entries.
std::lock_guard<SpinLock> guard(lock);

last_idx = index;
commit_idx = index;

term_history.update(index, term);

ledger->init(index);
snapshotter->set_last_snapshot_idx(index);

become_follower(term);
}

Index get_last_idx()
{
return last_idx;
Expand Down
5 changes: 5 additions & 0 deletions src/consensus/raft/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ namespace raft
raft->force_become_leader(seqno, view, terms, commit_seqno);
}

void init_as_backup(SeqNo seqno, View view) override
{
raft->init_as_follower(seqno, view);
}

bool replicate(const kv::BatchVector& entries, View view) override
{
return raft->replicate(entries, view);
Expand Down
12 changes: 6 additions & 6 deletions src/consensus/raft/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ DOCTEST_TEST_CASE(
auto rvc = get<1>(rv);
DOCTEST_REQUIRE(rvc.term == 1);
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == raft::TermHistory::InvalidTerm);

r1.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));

Expand All @@ -172,7 +172,7 @@ DOCTEST_TEST_CASE(
rvc = get<1>(rv);
DOCTEST_REQUIRE(rvc.term == 1);
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == raft::TermHistory::InvalidTerm);

r2.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));

Expand Down Expand Up @@ -216,7 +216,7 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(aec.idx == 0);
DOCTEST_REQUIRE(aec.term == 1);
DOCTEST_REQUIRE(aec.prev_idx == 0);
DOCTEST_REQUIRE(aec.prev_term == 0);
DOCTEST_REQUIRE(aec.prev_term == raft::TermHistory::InvalidTerm);
DOCTEST_REQUIRE(aec.leader_commit_idx == 0);

ae = r0.channels->sent_append_entries.front();
Expand All @@ -226,7 +226,7 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(aec.idx == 0);
DOCTEST_REQUIRE(aec.term == 1);
DOCTEST_REQUIRE(aec.prev_idx == 0);
DOCTEST_REQUIRE(aec.prev_term == 0);
DOCTEST_REQUIRE(aec.prev_term == raft::TermHistory::InvalidTerm);
DOCTEST_REQUIRE(aec.leader_commit_idx == 0);
}

Expand Down Expand Up @@ -374,7 +374,7 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(msg.idx == 1);
DOCTEST_REQUIRE(msg.term == 1);
DOCTEST_REQUIRE(msg.prev_idx == 0);
DOCTEST_REQUIRE(msg.prev_term == 0);
DOCTEST_REQUIRE(msg.prev_term == raft::TermHistory::InvalidTerm);
DOCTEST_REQUIRE(msg.leader_commit_idx == 0);
}));

Expand Down Expand Up @@ -472,7 +472,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
DOCTEST_REQUIRE(msg.idx == 1);
DOCTEST_REQUIRE(msg.term == 1);
DOCTEST_REQUIRE(msg.prev_idx == 0);
DOCTEST_REQUIRE(msg.prev_term == 0);
DOCTEST_REQUIRE(msg.prev_term == raft::TermHistory::InvalidTerm);
DOCTEST_REQUIRE(msg.leader_commit_idx == 0);
}));

Expand Down
2 changes: 1 addition & 1 deletion src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ namespace enclave

if (start_type == StartType::Join)
{
node->join({ccf_config});
node->join(ccf_config);
}
else if (start_type == StartType::Recover)
{
Expand Down
4 changes: 3 additions & 1 deletion src/enclave/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ struct CCFConfig
std::string target_port;
std::vector<uint8_t> network_cert;
size_t join_timer;
MSGPACK_DEFINE(target_host, target_port, network_cert, join_timer);
std::vector<uint8_t> snapshot;
MSGPACK_DEFINE(
target_host, target_port, network_cert, join_timer, snapshot);
};
Joining joining = {};

Expand Down
11 changes: 11 additions & 0 deletions src/host/ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ namespace asynchost

Ledger(const Ledger& that) = delete;

void init_idx(size_t idx)
{
last_idx = idx;
}

std::optional<std::vector<uint8_t>> read_entry(size_t idx)
{
auto f = get_file_from_idx(idx);
Expand Down Expand Up @@ -796,6 +801,12 @@ namespace asynchost
void register_message_handlers(
messaging::Dispatcher<ringbuffer::Message>& disp)
{
DISPATCHER_SET_MESSAGE_HANDLER(
disp, consensus::ledger_init, [this](const uint8_t* data, size_t size) {
auto idx = serialized::read<consensus::Index>(data, size);
init_idx(idx);
});

DISPATCHER_SET_MESSAGE_HANDLER(
disp,
consensus::ledger_append,
Expand Down
30 changes: 25 additions & 5 deletions src/host/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ int main(int argc, char** argv)
"--rpc-address)");

std::string ledger_dir("ledger");
app.add_option("--ledger-dir", ledger_dir, "Ledger and snapshots directory")
app.add_option("--ledger-dir", ledger_dir, "Ledger directory")
->capture_default_str();

std::string snapshot_dir("snapshots");
app.add_option("--snapshot-dir", snapshot_dir, "Snapshots directory")
->capture_default_str();

size_t ledger_chunk_bytes = 5'000'000;
Expand Down Expand Up @@ -354,7 +358,7 @@ int main(int argc, char** argv)
"key)")
->required();

std::optional<size_t> recovery_threshold;
std::optional<size_t> recovery_threshold = std::nullopt;
start
->add_option(
"--recovery-threshold",
Expand Down Expand Up @@ -438,7 +442,8 @@ int main(int argc, char** argv)
if ((*start || *join) && files::exists(ledger_dir))
{
throw std::logic_error(fmt::format(
"On start/join, ledger directory should not exist ({})", ledger_dir));
"On start and join, ledger directory should not exist ({})",
ledger_dir));
}
else if (*recover && !files::exists(ledger_dir))
{
Expand Down Expand Up @@ -544,8 +549,8 @@ int main(int argc, char** argv)
asynchost::Ledger ledger(ledger_dir, writer_factory, ledger_chunk_bytes);
ledger.register_message_handlers(bp.get_dispatcher());

asynchost::SnapshotManager snapshot(ledger_dir);
snapshot.register_message_handlers(bp.get_dispatcher());
asynchost::SnapshotManager snapshots(snapshot_dir);
snapshots.register_message_handlers(bp.get_dispatcher());

// Begin listening for node-to-node and RPC messages.
// This includes DNS resolution and potentially dynamic port assignment (if
Expand Down Expand Up @@ -634,6 +639,21 @@ int main(int argc, char** argv)
ccf_config.joining.target_port = target_rpc_address.port;
ccf_config.joining.network_cert = files::slurp(network_cert_file);
ccf_config.joining.join_timer = join_timer;

auto snapshot_file = snapshots.find_latest_snapshot();
if (snapshot_file.has_value())
{
ccf_config.joining.snapshot = files::slurp(snapshot_file.value());
LOG_INFO_FMT(
"Found latest snapshot file: {} (size: {})",
snapshot_file.value(),
ccf_config.joining.snapshot.size());
}
else
{
LOG_INFO_FMT(
"No snapshot found, node will request transactions from the beginning");
}
}
else if (*recover)
{
Expand Down
35 changes: 33 additions & 2 deletions src/host/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <filesystem>
#include <fstream>
#include <iostream>
#include <optional>

namespace fs = std::filesystem;

Expand Down Expand Up @@ -49,7 +50,7 @@ namespace asynchost
if (fs::is_directory(snapshot_dir))
{
LOG_INFO_FMT(
"Snapshots will be stored in existing directory {}", snapshot_dir);
"Snapshots will be stored in existing directory: {}", snapshot_dir);
}
else if (!fs::create_directory(snapshot_dir))
{
Expand All @@ -58,6 +59,36 @@ namespace asynchost
}
}

std::optional<std::string> find_latest_snapshot()
{
std::optional<std::string> snapshot_file = std::nullopt;
size_t latest_idx = 0;

for (auto& f : fs::directory_iterator(snapshot_dir))
{
auto file_name = f.path().filename().string();
auto pos = file_name.find(fmt::format("{}.", snapshot_file_prefix));
if (pos == std::string::npos)
{
LOG_FAIL_FMT(
"Ignoring \"{}\" because it does not start with {}",
file_name,
snapshot_file_prefix);
continue;
}

pos = file_name.find(".");
size_t snapshot_idx = std::stol(file_name.substr(pos + 1));
if (snapshot_idx > latest_idx)
{
snapshot_file = f.path().string();
latest_idx = snapshot_idx;
}
}

return snapshot_file;
}

void register_message_handlers(
messaging::Dispatcher<ringbuffer::Message>& disp)
{
Expand All @@ -71,4 +102,4 @@ namespace asynchost
});
}
};
}
}
5 changes: 5 additions & 0 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ namespace kv
state = Primary;
}

virtual void init_as_backup(SeqNo, View)
{
state = Backup;
}

virtual bool replicate(const BatchVector& entries, View view) = 0;
virtual std::pair<View, SeqNo> get_committed_txid() = 0;

Expand Down
8 changes: 0 additions & 8 deletions src/node/call_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,4 @@ namespace ccf
tls::Pem network_enc_pubk;
};
};

struct Join
{
struct In
{
CCFConfig config;
};
};
}
Loading