Skip to content

Commit

Permalink
Templated replicated container
Browse files Browse the repository at this point in the history
  • Loading branch information
amdrozdov committed Apr 22, 2024
1 parent 9f67e8b commit 5040aa8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
7 changes: 4 additions & 3 deletions examples/async_replication/async_replicated_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ struct ReplicationConfig final {
uint32_t max_waits;
};

template <class CLOCK_T>
class AsyncReplicatedContainer {
private:
struct SharedState final {
bool die = false;
std::map<std::string, uint32_t> data;
std::map<std::string, VectorClock> clock;
std::map<std::string, CLOCK_T> clock;
std::map<std::string, std::queue<std::pair<std::string, uint32_t> > > replication_out;
};

Expand Down Expand Up @@ -73,7 +74,7 @@ class AsyncReplicatedContainer {
// Set clock and send update to replication thread
if (replicate) {
if (state.clock.find(tuple.first) == state.clock.end()) {
state.clock[tuple.first] = VectorClock(nodes.size(), clock_id);
state.clock[tuple.first] = CLOCK_T(nodes.size(), clock_id);
}
state.clock[tuple.first].step();

Expand Down Expand Up @@ -155,7 +156,7 @@ class AsyncReplicatedContainer {
state.MutableUse([&buffer, this](SharedState& state) {
bool is_insert = state.data.find(buffer.key) == state.data.end();
if (is_insert) {
state.clock[buffer.key] = VectorClock(nodes.size(), clock_id);
state.clock[buffer.key] = CLOCK_T(nodes.size(), clock_id);
}
bool is_valid_update = state.clock[buffer.key].merge(buffer.clock, is_insert);
if (is_valid_update) {
Expand Down
3 changes: 2 additions & 1 deletion examples/async_replication/node.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "../../bricks/dflags/dflags.h"
#include "async_replicated_container.h"
#include "vector_clock.h"
#include <random>

DEFINE_string(host, "127.0.0.1", "Hostname for the server");
Expand Down Expand Up @@ -29,7 +30,7 @@ int main(int argc, char** argv) {
false,
10};

AsyncReplicatedContainer storage(conf);
AsyncReplicatedContainer<StrictVectorClock> storage(conf);
storage.start();
storage.start_monitor(keys, FLAGS_monitor_delay);
while (true) {
Expand Down
9 changes: 6 additions & 3 deletions examples/async_replication/vector_clock.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ class VectorClock {
uint32_t local_id;

public:
VectorClock(uint32_t size, uint32_t node_id) {
explicit VectorClock(uint32_t size, uint32_t node_id) {
// Set local process id and cluster size
local_id = node_id;
clock.resize(size, current::time::Now());
}
VectorClock() {
explicit VectorClock() {
// Lamport clocks for size=1
VectorClock(1, 0);
}
Expand Down Expand Up @@ -86,7 +86,10 @@ class VectorClock {
}
};

class StrictVectorClock : VectorClock {
class StrictVectorClock : public VectorClock {
using VectorClock::VectorClock;

public:
static bool is_conflicting(Clocks &v1, Clocks &v2) {
// Check if v1 is in sync with v2 and v1 is strictly early then v2
return !is_parallel(v1, v2) && is_early(v1, v2);
Expand Down

0 comments on commit 5040aa8

Please sign in to comment.