From 5040aa8a217ba7ea5f75248f3d83cc346d7f51af Mon Sep 17 00:00:00 2001 From: Andrei Drozdov Date: Mon, 22 Apr 2024 02:45:15 +0200 Subject: [PATCH] Templated replicated container --- examples/async_replication/async_replicated_container.h | 7 ++++--- examples/async_replication/node.cc | 3 ++- examples/async_replication/vector_clock.h | 9 ++++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/examples/async_replication/async_replicated_container.h b/examples/async_replication/async_replicated_container.h index 580f2d82..03ff65e0 100644 --- a/examples/async_replication/async_replicated_container.h +++ b/examples/async_replication/async_replicated_container.h @@ -31,12 +31,13 @@ struct ReplicationConfig final { uint32_t max_waits; }; +template class AsyncReplicatedContainer { private: struct SharedState final { bool die = false; std::map data; - std::map clock; + std::map clock; std::map > > replication_out; }; @@ -73,7 +74,7 @@ class AsyncReplicatedContainer { // Set clock and send update to replication thread if (replicate) { if (state.clock.find(tuple.first) == state.clock.end()) { - state.clock[tuple.first] = VectorClock(nodes.size(), clock_id); + state.clock[tuple.first] = CLOCK_T(nodes.size(), clock_id); } state.clock[tuple.first].step(); @@ -155,7 +156,7 @@ class AsyncReplicatedContainer { state.MutableUse([&buffer, this](SharedState& state) { bool is_insert = state.data.find(buffer.key) == state.data.end(); if (is_insert) { - state.clock[buffer.key] = VectorClock(nodes.size(), clock_id); + state.clock[buffer.key] = CLOCK_T(nodes.size(), clock_id); } bool is_valid_update = state.clock[buffer.key].merge(buffer.clock, is_insert); if (is_valid_update) { diff --git a/examples/async_replication/node.cc b/examples/async_replication/node.cc index ff5e5588..b996d86e 100644 --- a/examples/async_replication/node.cc +++ b/examples/async_replication/node.cc @@ -1,5 +1,6 @@ #include "../../bricks/dflags/dflags.h" #include "async_replicated_container.h" +#include "vector_clock.h" #include DEFINE_string(host, "127.0.0.1", "Hostname for the server"); @@ -29,7 +30,7 @@ int main(int argc, char** argv) { false, 10}; - AsyncReplicatedContainer storage(conf); + AsyncReplicatedContainer storage(conf); storage.start(); storage.start_monitor(keys, FLAGS_monitor_delay); while (true) { diff --git a/examples/async_replication/vector_clock.h b/examples/async_replication/vector_clock.h index 4baf778e..29ca0284 100644 --- a/examples/async_replication/vector_clock.h +++ b/examples/async_replication/vector_clock.h @@ -10,12 +10,12 @@ class VectorClock { uint32_t local_id; public: - VectorClock(uint32_t size, uint32_t node_id) { + explicit VectorClock(uint32_t size, uint32_t node_id) { // Set local process id and cluster size local_id = node_id; clock.resize(size, current::time::Now()); } - VectorClock() { + explicit VectorClock() { // Lamport clocks for size=1 VectorClock(1, 0); } @@ -86,7 +86,10 @@ class VectorClock { } }; -class StrictVectorClock : VectorClock { +class StrictVectorClock : public VectorClock { + using VectorClock::VectorClock; + + public: static bool is_conflicting(Clocks &v1, Clocks &v2) { // Check if v1 is in sync with v2 and v1 is strictly early then v2 return !is_parallel(v1, v2) && is_early(v1, v2);