Skip to content

Commit

Permalink
core: initial faulty handling
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Sep 22, 2024
1 parent 32f2f38 commit 98398ea
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 50 deletions.
10 changes: 10 additions & 0 deletions src/scratch.zig
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,13 @@ test "view" {
// const parsed_body = try response_body.toOwnedSlice();
// dbg("RESPONSE: {s}\n", .{parsed_body});
// }

test "returnblock" {
{
dbg("block entry\n", .{});
defer dbg("block exit\n", .{});
if (true) return;
}

dbg("should not be here\n", .{});
}
125 changes: 75 additions & 50 deletions src/zgroup.zig
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ pub fn Fleet(UserData: type) type {
.confirm_alive,
=> try self.handleIsd(arena, msg, false),
.suspect => try self.handleSuspicion(arena, msg),
.confirm_faulty => {
log.debug(">>>>> todo: confirm faulty", .{});
},
.confirm_faulty => try self.handleConfirmFaulty(arena, msg),
else => {},
}

Expand Down Expand Up @@ -678,17 +676,17 @@ pub fn Fleet(UserData: type) type {
log.debug("[{d}] ack from {s}", .{ i, ping_key });

// TEST: start
if (i > 0 and i <= 100 and @mod(i, 20) == 0) {
log.debug("[{d}] --- trigger suspect for {s}", .{ i, ping_key });
self.isd_mtx.lock();
defer self.isd_mtx.unlock();
try self.isd_queue.append(.{
.key = ping_key,
.state = .suspected,
.incarnation = 0,
.isd_cmd = .suspect,
});
}
// if (i > 0 and i <= 100 and @mod(i, 20) == 0) {
// log.debug("[{d}] --- trigger suspect for {s}", .{ i, ping_key });
// self.isd_mtx.lock();
// defer self.isd_mtx.unlock();
// try self.isd_queue.append(.{
// .key = ping_key,
// .state = .suspected,
// .incarnation = 0,
// .isd_cmd = .suspect,
// });
// }
// TEST: end
},
}
Expand Down Expand Up @@ -934,7 +932,7 @@ pub fn Fleet(UserData: type) type {
.confirm_alive,
=> try self.handleIsd(arena, msg, false),
.suspect => try self.handleSuspicion(arena, msg),
.confirm_faulty => {},
.confirm_faulty => try self.handleConfirmFaulty(arena, msg),
else => {},
}

Expand Down Expand Up @@ -1006,7 +1004,7 @@ pub fn Fleet(UserData: type) type {
.confirm_alive,
=> try args.self.handleIsd(arena, msg, false),
.suspect => try args.self.handleSuspicion(arena, msg),
.confirm_faulty => {},
.confirm_faulty => try args.self.handleConfirmFaulty(arena, msg),
else => {},
}

Expand Down Expand Up @@ -1063,41 +1061,68 @@ pub fn Fleet(UserData: type) type {
.key = pkey.?,
.state = .alive,
.isd_cmd = .confirm_alive,
.incarnation = try self.getIncarnation(),
.incarnation = try self.getIncarnation(), // ok since atomic
});
} else b: {
var tmp = std.ArrayList(KeyInfo).init(allocator);

{
self.members_mtx.lock();
defer self.members_mtx.unlock();
const ptr = self.members.getPtr(key);
if (ptr) |_| {} else break :b;
return;
}

try tmp.append(.{
.key = key,
.state = .suspected,
.isd_cmd = .confirm_alive,
.incarnation = ptr.?.incarnation,
});
}
var suspected = std.ArrayList(KeyInfo).init(allocator);

if (tmp.items.len == 0) break :b;
{
self.members_mtx.lock();
defer self.members_mtx.unlock();
const ptr = self.members.getPtr(key);
if (ptr) |_| {} else return;

const pkey = self.getPersistentKeyFromKey(key);
if (pkey) |_| {} else break :b;
try suspected.append(.{
.key = key,
.state = .suspected,
.isd_cmd = .confirm_alive,
.incarnation = ptr.?.incarnation,
});
}

if (suspected.items.len == 0) return;

const pkey = self.getPersistentKeyFromKey(key);
if (pkey) |_| {} else return;

{
self.isd_mtx.lock();
defer self.isd_mtx.unlock();
try self.isd_queue.append(.{
.key = pkey.?,
.state = tmp.items[0].state,
.isd_cmd = tmp.items[0].isd_cmd,
.incarnation = tmp.items[0].incarnation,
.state = suspected.items[0].state,
.isd_cmd = suspected.items[0].isd_cmd,
.incarnation = suspected.items[0].incarnation,
});
}
}

// Handle the isd_* faulty protocol of the message payload.
// We are passing in an arena allocator here.
fn handleConfirmFaulty(self: *Self, allocator: std.mem.Allocator, msg: *Message) !void {
log.debug(">>>>> listen: confirm faulty", .{});
const key = try keyFromIpPort(allocator, msg.isd_ip, msg.isd_port);
if (!self.keyIsMe(key)) {
try self.setMemberInfo(key, .faulty, null, true);
return;
}

const pkey = self.getPersistentKeyFromKey(key);
if (pkey) |_| {} else return;

self.isd_mtx.lock();
defer self.isd_mtx.unlock();
try self.isd_queue.append(.{
.key = pkey.?,
.state = .alive,
.isd_cmd = .confirm_alive,
.incarnation = try self.getIncarnation(), // ok since atomic
});
}

// NOTE: Not using locks; only atomic.
fn getIncarnation(self: *Self) !u64 {
const me = try self.getOwnKey();
Expand Down Expand Up @@ -1300,7 +1325,7 @@ pub fn Fleet(UserData: type) type {

var apply = false;
var in_state: MemberState = .alive;
var in_inc: u64 = 0;
var in_inc: u64 = p.?.incarnation;
if (state) |s| in_state = s else return;
if (incarnation) |inc| in_inc = inc;

Expand Down Expand Up @@ -1336,17 +1361,17 @@ pub fn Fleet(UserData: type) type {
fn suspectToFaulty(args: *SuspectToFaulty) !void {
// Pause for a bit before we set to faulty.
std.time.sleep(args.self.suspected_time);

b: {
args.self.members_mtx.lock();
defer args.self.members_mtx.unlock();
const ptr = args.self.members.getPtr(args.key);
if (ptr) |_| {} else break :b;
if (ptr.?.state == .suspected) {
ptr.?.state = .faulty;
ptr.?.age_faulty.reset();
}
}
try args.self.setMemberInfo(args.key, .faulty, null, false);

// Broadcast confirm_faulty to the group.
args.self.isd_mtx.lock();
defer args.self.isd_mtx.unlock();
try args.self.isd_queue.append(.{
.key = args.key,
.state = .faulty,
.isd_cmd = .confirm_faulty,
.incarnation = try args.self.getIncarnation(), // ok since atomic
});
}

// Attempt removing faulty members after some time.
Expand All @@ -1358,7 +1383,7 @@ pub fn Fleet(UserData: type) type {
self.members_mtx.lock();
defer self.members_mtx.unlock();
var it = self.members.iterator();
const limit = std.time.ns_per_min * 10; // TODO: expose
const limit = self.protocol_time * 10; // TODO: expose
while (it.next()) |v| {
if (v.value_ptr.state != .faulty) continue;
if (v.value_ptr.age_faulty.read() > limit) {
Expand Down

0 comments on commit 98398ea

Please sign in to comment.