diff --git a/src/main.zig b/src/main.zig index 7515a2e..26f417a 100644 --- a/src/main.zig +++ b/src/main.zig @@ -53,8 +53,8 @@ pub fn main() !void { } var iter = hm.iterator(); - while (iter.next()) |entry| { - log.info("{any}, {s}", .{ entry.key_ptr.*, entry.value_ptr.* }); + while (iter.next()) |v| { + log.info("args[{d}]: {s}", .{ v.key_ptr.*, v.value_ptr.* }); } const name = hm.getEntry(1).?.value_ptr.*; diff --git a/src/zgroup.zig b/src/zgroup.zig index 2831069..2ee7b7a 100644 --- a/src/zgroup.zig +++ b/src/zgroup.zig @@ -44,11 +44,11 @@ pub fn Fleet(UserData: type) type { ping_req_1: std.Thread.ResetEvent = .{}, // response // Internal queue for suspicion subprotocol. - isd_queue: std.ArrayList(KeyInfo), - isd_mtx: std.Thread.Mutex = .{}, + // isd_queue: std.ArrayList(KeyInfo), + // isd_mtx: std.Thread.Mutex = .{}, - // Leader's heartbeat timeout. - leader_hb: std.time.Timer, + // Join address heartbeat timeout. + join_addr_tm: std.time.Timer, callbacks: Callbacks, @@ -118,6 +118,7 @@ pub fn Fleet(UserData: type) type { src_port: u16 = 0, src_state: Liveness = .alive, src_incarnation: u64 = 0, + dst_cmd: IsdCommand = .noop, dst_ip: u32 = 0, dst_port: u16 = 0, @@ -125,20 +126,24 @@ pub fn Fleet(UserData: type) type { dst_incarnation: u64 = 0, // Infection-style dissemination section. - isd_cmd: IsdCommand = .noop, - isd_ip: u32 = 0, - isd_port: u16 = 0, - isd_state: Liveness = .alive, - isd_incarnation: u64 = 0, + // isd_cmd: IsdCommand = .noop, + // isd_ip: u32 = 0, + // isd_port: u16 = 0, + // isd_state: Liveness = .alive, + // isd_incarnation: u64 = 0, // Used for multiple subprotocols explained below: // // 1) For determining the highest node (for join). - // // Format: - // |----- cmd -----|-- port (u16) --|----------- IP address (u32) ----------| - // 0000000000000011.1111111111111111.0000000011111111111111111111111111111111 - extra: u64 = 0, + // |----- cmd -----|-- port (u16) --|------- IP address (u32) ------| + // 0000000000000011.1111111111111111.11111111111111111111111111111111 + proto1: u64 = 0, + + // Used for multiple subprotocols explained below: + // + // 1) For informing the sender's member count during SWIM pings. + proto2: u64 = 0, }; // Per-member context data. @@ -150,8 +155,7 @@ pub fn Fleet(UserData: type) type { targets: std.ArrayList([]const u8), }; - // Commands used for leader election protocol. - const LeaderCmd = enum(u8) { + const JoinCmd = enum(u8) { noop, heartbeat, invalidate, @@ -218,8 +222,8 @@ pub fn Fleet(UserData: type) type { .members = std.StringHashMap(MemberData).init(allocator), .refkeys = std.StringHashMap(void).init(allocator), .ping_queue = std.ArrayList([]const u8).init(allocator), - .isd_queue = std.ArrayList(KeyInfo).init(allocator), - .leader_hb = try std.time.Timer.start(), + // .isd_queue = std.ArrayList(KeyInfo).init(allocator), + .join_addr_tm = try std.time.Timer.start(), .callbacks = config.callbacks, .leader = try std.fmt.allocPrint(allocator, "", .{}), .voted_for = try std.fmt.allocPrint(allocator, "", .{}), @@ -239,10 +243,10 @@ pub fn Fleet(UserData: type) type { self.members.deinit(); var it = self.refkeys.iterator(); while (it.next()) |v| self.allocator.free(v.key_ptr.*); - self.allocator.destroy(self.ping_req_data); + // self.allocator.destroy(self.ping_req_data); self.refkeys.deinit(); self.ping_queue.deinit(); - self.isd_queue.deinit(); + // self.isd_queue.deinit(); } /// Start group membership tracking. @@ -266,10 +270,10 @@ pub fn Fleet(UserData: type) type { const ldr = try std.Thread.spawn(.{}, Self.leaderTick, .{self}); ldr.detach(); - self.ping_req_data = try self.allocator.create(RequestPing); - self.ping_req_data.self = self; - const rp = try std.Thread.spawn(.{}, Self.requestPing, .{self.ping_req_data}); - rp.detach(); + // self.ping_req_data = try self.allocator.create(RequestPing); + // self.ping_req_data.self = self; + // const rp = try std.Thread.spawn(.{}, Self.requestPing, .{self.ping_req_data}); + // rp.detach(); } /// Ask an instance to join an existing group. `joined` will be @@ -391,14 +395,14 @@ pub fn Fleet(UserData: type) type { defer aa.deinit(); // destroy arena in one go const arena = aa.allocator(); - switch (msg.isd_cmd) { - .infect, - .confirm_alive, - => try self.handleIsd(arena, msg, false), - .suspect => try self.handleSuspicion(arena, msg), - .confirm_faulty => try self.handleConfirmFaulty(arena, msg), - else => {}, - } + // switch (msg.isd_cmd) { + // .infect, + // .confirm_alive, + // => try self.handleIsd(arena, msg, false), + // .suspect => try self.handleSuspicion(arena, msg), + // .confirm_faulty => try self.handleConfirmFaulty(arena, msg), + // else => {}, + // } // Main protocol message handler. switch (msg.cmd) { @@ -466,23 +470,21 @@ pub fn Fleet(UserData: type) type { ); } - if (msg.isd_cmd == .noop) { - const n = self.getCounts(); - if ((n[0] + n[1]) < msg.isd_incarnation) { - self.elex_tm.reset(); - @atomicStore( - bool, - &self.elex_join, - false, - std.builtin.AtomicOrder.seq_cst, - ); - } else @atomicStore( + const n = self.getCounts(); + if ((n[0] + n[1]) < msg.proto2) { + self.elex_tm.reset(); + @atomicStore( bool, &self.elex_join, - true, + false, std.builtin.AtomicOrder.seq_cst, ); - } + } else @atomicStore( + bool, + &self.elex_join, + true, + std.builtin.AtomicOrder.seq_cst, + ); // Always set src_* to own info. try self.setMsgSrcToOwn(msg); @@ -491,23 +493,23 @@ pub fn Fleet(UserData: type) type { var excludes: [1][]const u8 = .{src}; try self.setMsgDstAndIsd(arena, msg, &excludes); - // Handle leader protocol. - var ipm = msg.extra & 0x00000000FFFFFFFF; - var portm = (msg.extra & 0x0000FFFF00000000) >> 32; - const cmdm: LeaderCmd = @enumFromInt((msg.extra & - 0xF000000000000000) >> 60); + // Handle join address protocol. + var ipm = msg.proto1 & 0x00000000FFFFFFFF; + var portm = (msg.proto1 & 0x0000FFFF00000000) >> 32; + const cmdm: JoinCmd = @enumFromInt((msg.proto1 & + 0xFFFF000000000000) >> 48); if (cmdm == .heartbeat) b: { const al = try self.getHighestNode(); if ((al[0] + al[1]) <= (ipm + portm)) { - _ = self.leader_hb.lap(); + _ = self.join_addr_tm.lap(); break :b; } - const hb: u64 = @intFromEnum(LeaderCmd.invalidate); + const hb: u64 = @intFromEnum(JoinCmd.invalidate); ipm = al[0] & 0x00000000FFFFFFFF; portm = (al[1] << 32) & 0x0000FFFF00000000; - msg.extra = (hb << 60) | ipm | portm; + msg.proto1 = (hb << 48) | ipm | portm; } } @@ -543,7 +545,7 @@ pub fn Fleet(UserData: type) type { try self.setMsgDstAndIsd(arena, msg, &excludes); // Handle leader protocol (egress). - try self.setLeaderProtoSend(msg); + try self.setJoinProtoSend(msg); const ack = self.ping(dst) catch false; @@ -560,21 +562,21 @@ pub fn Fleet(UserData: type) type { try self.upsertMember(dst, .alive, msg.src_incarnation, true); - // Handle leader protocol (ingress). - self.setLeaderProtoRecv(msg); + // Handle join address protocol (ingress). + self.setJoinProtoRecv(msg); } // Always set src_* to own info. try self.setMsgSrcToOwn(msg); - const isd = try self.getIsdInfo(arena, 1); - if (isd.items.len > 0) { - msg.isd_cmd = .infect; - try setMsgSection(msg, .isd, isd.items[0]); - } + // const isd = try self.getIsdInfo(arena, 1); + // if (isd.items.len > 0) { + // msg.isd_cmd = .infect; + // try setMsgSection(msg, .isd, isd.items[0]); + // } - // Handle leader protocol (egress). - try self.setLeaderProtoSend(msg); + // Handle join address protocol (egress). + try self.setJoinProtoSend(msg); _ = std.posix.sendto( sock, @@ -602,7 +604,7 @@ pub fn Fleet(UserData: type) type { .heartbeat => { msg.cmd = .nack; const term = self.getTerm(); - if (msg.extra >= term) { + if (msg.proto1 >= term) { msg.cmd = .ack; const tc = self.getTermAndN(msg); @@ -649,9 +651,9 @@ pub fn Fleet(UserData: type) type { // log.debug("req4votes: my_term={d}, in_term={d}", .{ term, msg.leader_proto }); // log.debug("req4votes: voted_for={s}, voted={any}", .{ self.voted_for, voted }); - if (msg.extra >= term and !voted and self.getState() != .leader) { + if (msg.proto1 >= term and !voted and self.getState() != .leader) { msg.cmd = .ack; - self.setTerm(msg.extra); + self.setTerm(msg.proto1); const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port); const vkey = try self.ensureKeyRef(src); @@ -700,33 +702,33 @@ pub fn Fleet(UserData: type) type { // log.debug("[{d}]", .{i}); // log separator var tm = try std.time.Timer.start(); - var me_key: ?[]const u8 = null; - var me_inc: u64 = 0; - - { - self.members_mtx.lock(); - defer self.members_mtx.unlock(); - var it = self.members.iterator(); - while (it.next()) |v| { - if (!self.keyIsMe(v.key_ptr.*)) continue; - if (v.value_ptr.liveness == .alive) break; - v.value_ptr.liveness = .alive; - me_key = v.key_ptr.*; - me_inc = v.value_ptr.incarnation + 1; - break; - } - } - - if (me_key) |mk| { - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - try self.isd_queue.append(.{ - .key = mk, - .liveness = .alive, - .isd_cmd = .confirm_alive, - .incarnation = me_inc, - }); - } + // var me_key: ?[]const u8 = null; + // var me_inc: u64 = 0; + + // { + // self.members_mtx.lock(); + // defer self.members_mtx.unlock(); + // var it = self.members.iterator(); + // while (it.next()) |v| { + // if (!self.keyIsMe(v.key_ptr.*)) continue; + // if (v.value_ptr.liveness == .alive) break; + // v.value_ptr.liveness = .alive; + // me_key = v.key_ptr.*; + // me_inc = v.value_ptr.incarnation + 1; + // break; + // } + // } + + // if (me_key) |mk| { + // self.isd_mtx.lock(); + // defer self.isd_mtx.unlock(); + // try self.isd_queue.append(.{ + // .key = mk, + // .liveness = .alive, + // .isd_cmd = .confirm_alive, + // .incarnation = me_inc, + // }); + // } // const counts = self.getCounts(); // log.debug("[{d}] members: alive={d}, suspected={d}, faulty={d}, total={d}", .{ @@ -975,7 +977,7 @@ pub fn Fleet(UserData: type) type { const port = std.fmt.parseUnsigned(u16, k[sep + 1 ..], 10) catch continue; - msg.extra = self.getTerm(); + msg.proto1 = self.getTerm(); self.send(ip, port, buf, null) catch continue; if (msg.cmd != .ack) continue; @@ -989,7 +991,7 @@ pub fn Fleet(UserData: type) type { const majority = ((n[0] + n[1]) / 2) + 1; const votes = self.incVotesAndGet(); if (votes >= majority) { - log.debug("[{d}:{d}] ********** attempt leader! got {d} votes, majority={d}, n={d}", .{ + log.debug("[{d}:{d}] got {d} votes, majority={d}, n={d}", .{ i, self.getTerm(), votes, @@ -1032,11 +1034,13 @@ pub fn Fleet(UserData: type) type { var deferlog = false; defer { if (deferlog) { - log.debug("[{d}:{d}] leader here, hb took {any}", .{ - i, - self.getTerm(), - std.fmt.fmtDuration(tm.read()), - }); + if (@mod(i, 40) == 0) { + log.debug("[{d}:{d}] leader here, hb took {any}", .{ + i, + self.getTerm(), + std.fmt.fmtDuration(tm.read()), + }); + } } } @@ -1064,11 +1068,13 @@ pub fn Fleet(UserData: type) type { continue; } - log.debug("[{d}:{d}] leader here, hb to {d} nodes", .{ - i, - self.getTerm(), - bl.items.len, - }); + if (@mod(i, 40) == 0) { + log.debug("[{d}:{d}] leader here, hb to {d} nodes", .{ + i, + self.getTerm(), + bl.items.len, + }); + } for (bl.items) |k| { deferlog = true; @@ -1079,7 +1085,7 @@ pub fn Fleet(UserData: type) type { const port = std.fmt.parseUnsigned(u16, k[sep + 1 ..], 10) catch continue; - msg.extra = self.getTerm(); + msg.proto1 = self.getTerm(); self.setTermAndN(msg); self.send(ip, port, buf, 50_000) catch |err| { log.debug("[{d}:{d}] send (heartbeat) failed: {any}", .{ @@ -1093,7 +1099,7 @@ pub fn Fleet(UserData: type) type { } // TODO: This needs to be very short. - std.time.sleep(std.time.ns_per_ms * 500); + std.time.sleep(std.time.ns_per_ms * 50); }, } } @@ -1244,11 +1250,11 @@ pub fn Fleet(UserData: type) type { } // Setup main ISD info. - const isd = try self.getIsdInfo(allocator, 1); - if (isd.items.len > 0) { - msg.isd_cmd = isd.items[0].isd_cmd; - try setMsgSection(msg, .isd, isd.items[0]); - } + // const isd = try self.getIsdInfo(allocator, 1); + // if (isd.items.len > 0) { + // msg.isd_cmd = isd.items[0].isd_cmd; + // try setMsgSection(msg, .isd, isd.items[0]); + // } } // Setup both dst_* and isd_* sections of the payload. @@ -1343,21 +1349,21 @@ pub fn Fleet(UserData: type) type { try self.setMsgDstAndIsd(arena, msg, &excludes); // Handle leader protocol (egress). - try self.setLeaderProtoSend(msg); + try self.setJoinProtoSend(msg); // Propagate number of members. - if (msg.isd_cmd == .noop) { - const n = self.getCounts(); - msg.isd_incarnation = n[0] + n[1]; - } + // if (msg.isd_cmd == .noop) { + const n = self.getCounts(); + msg.proto2 = n[0] + n[1]; + // } try self.send(ip, port, buf, null); - // Handle leader protocol (ingress). - const cmdm: LeaderCmd = @enumFromInt((msg.extra & - 0xF000000000000000) >> 60); + // Handle join address protocol (ingress). + const cmdm: JoinCmd = @enumFromInt((msg.proto1 & + 0xF000000000000000) >> 48); - if (cmdm != .invalidate) _ = self.leader_hb.lap(); + if (cmdm != .invalidate) _ = self.join_addr_tm.lap(); return switch (msg.cmd) { .ack => b: { @@ -1370,14 +1376,14 @@ pub fn Fleet(UserData: type) type { } // Consume isd_* as the main ISD info. - switch (msg.isd_cmd) { - .infect, - .confirm_alive, - => try self.handleIsd(arena, msg, false), - .suspect => try self.handleSuspicion(arena, msg), - .confirm_faulty => try self.handleConfirmFaulty(arena, msg), - else => {}, - } + // switch (msg.isd_cmd) { + // .infect, + // .confirm_alive, + // => try self.handleIsd(arena, msg, false), + // .suspect => try self.handleSuspicion(arena, msg), + // .confirm_faulty => try self.handleConfirmFaulty(arena, msg), + // else => {}, + // } break :b true; }, @@ -1392,6 +1398,7 @@ pub fn Fleet(UserData: type) type { ack: bool = false, }; + // NOTE: Not used at the moment. // Our only agent for doing indirect pings for suspicious nodes. Long-running. fn requestPing(args: *RequestPing) !void { while (true) { @@ -1433,13 +1440,13 @@ pub fn Fleet(UserData: type) type { try setMsgSection(msg, .isd, isd.items[0]); } - // Handle leader protocol (egress). - try args.self.setLeaderProtoSend(msg); + // Handle join address protocol (egress). + try args.self.setJoinProtoSend(msg); args.self.send(ip, port, buf, null) catch continue; - // Handle leader protocol (ingress). - args.self.setLeaderProtoRecv(msg); + // Handle join address protocol (ingress). + args.self.setJoinProtoRecv(msg); switch (msg.cmd) { .ack => { @@ -1701,21 +1708,21 @@ pub fn Fleet(UserData: type) type { return .{ ipl, portl, me }; } - fn setLeaderProtoSend(self: *Self, msg: *Message) !void { + fn setJoinProtoSend(self: *Self, msg: *Message) !void { const n = self.getCounts(); const lim = n[0] + n[1]; if (lim < 2) return; const al = try self.getHighestNode(); - const hb: u64 = @intFromEnum(LeaderCmd.heartbeat); + const hb: u64 = @intFromEnum(JoinCmd.heartbeat); const ipl: u32 = al[0] & 0x00000000FFFFFFFF; const portl: u64 = (al[1] << 32) & 0x0000FFFF00000000; - msg.extra = (hb << 60) | ipl | portl; + msg.proto1 = (hb << 48) | ipl | portl; } - fn setLeaderProtoRecv(self: *Self, msg: *Message) void { - const cmdm: LeaderCmd = @enumFromInt((msg.extra & - 0xF000000000000000) >> 60); - if (cmdm != .invalidate) _ = self.leader_hb.lap(); + fn setJoinProtoRecv(self: *Self, msg: *Message) void { + const cmdm: JoinCmd = @enumFromInt((msg.proto1 & + 0xFFFF000000000000) >> 48); + if (cmdm != .invalidate) _ = self.join_addr_tm.lap(); } fn setTermAndN(self: *Self, msg: *Message) void { @@ -1724,14 +1731,14 @@ pub fn Fleet(UserData: type) type { const term = @atomicLoad(u64, &self.elex_term, std.builtin.AtomicOrder.seq_cst); const mterm: u64 = term & 0x0000FFFFFFFFFFFF; const mcount: u64 = (total << 48) & 0xFFFF000000000000; - msg.extra = mcount | mterm; + msg.proto1 = mcount | mterm; } // [0] - term // [1] - count fn getTermAndN(_: *Self, msg: *Message) std.meta.Tuple(&.{ u64, u64 }) { - const term = msg.extra & 0x0000FFFFFFFFFFFF; - const count = (msg.extra & 0xFFFF000000000000) >> 48; + const term = msg.proto1 & 0x0000FFFFFFFFFFFF; + const count = (msg.proto1 & 0xFFFF000000000000) >> 48; return .{ term, count }; } @@ -1742,9 +1749,10 @@ pub fn Fleet(UserData: type) type { msg.src_state = .alive; msg.dst_cmd = .noop; msg.dst_state = .alive; - msg.isd_cmd = .noop; - msg.isd_state = .alive; - msg.extra = 0; + // msg.isd_cmd = .noop; + // msg.isd_state = .alive; + msg.proto1 = 0; + msg.proto2 = 0; } fn setMsgSrcToOwn(self: *Self, msg: *Message) !void { @@ -1919,7 +1927,7 @@ pub fn Fleet(UserData: type) type { const MsgSection = enum { src, dst, - isd, + // isd, }; // Set a section of the message payload with ip, port, and state info. @@ -1942,12 +1950,13 @@ pub fn Fleet(UserData: type) type { msg.dst_state = info.liveness; msg.dst_incarnation = info.incarnation; }, - .isd => { - msg.isd_ip = addr.in.sa.addr; - msg.isd_port = port; - msg.isd_state = info.liveness; - msg.isd_incarnation = info.incarnation; - }, + // .isd => { + // msg.isd_ip = addr.in.sa.addr; + // msg.isd_port = port; + // msg.isd_state = info.liveness; + // msg.isd_incarnation = info.incarnation; + // }, + } }