Skip to content

Commit

Permalink
core: ci skip: rename fn
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Sep 27, 2024
1 parent e72274c commit e271a8c
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions src/zgroup.zig
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn Fleet(UserData: type) type {
const me = try self.getOwnKey();
defer self.allocator.free(me);
_ = try self.ensureKeyRef(me);
try self.addOrSet(me, .alive, 0, true);
try self.upsertMember(me, .alive, 0, true);
self.election_tm.reset();
_ = try self.ensureKeyRef("0"); // dummy

Expand Down Expand Up @@ -303,7 +303,7 @@ pub fn Fleet(UserData: type) type {
dst_port,
});

try self.addOrSet(key, .alive, 0, true);
try self.upsertMember(key, .alive, 0, true);
self.allowed_tm.reset();
joined.* = true;

Expand Down Expand Up @@ -429,7 +429,7 @@ pub fn Fleet(UserData: type) type {
.join => b: {
if (msg.name == name) {
const key = try keyFromIpPort(arena, msg.src_ip, msg.src_port);
try self.addOrSet(key, .alive, msg.src_incarnation, true);
try self.upsertMember(key, .alive, msg.src_incarnation, true);

// Inform current leader (if any) of this new join.
msg.dst_ip = msg.src_ip;
Expand Down Expand Up @@ -478,11 +478,16 @@ pub fn Fleet(UserData: type) type {
if (msg.name == name) {
msg.cmd = .ack;
const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port);
try self.addOrSet(src, .alive, msg.src_incarnation, true);
try self.upsertMember(src, .alive, msg.src_incarnation, true);

if (msg.dst_cmd == .infect) {
const dst = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port);
try self.addOrSet(dst, msg.dst_state, msg.dst_incarnation, false);
try self.upsertMember(
dst,
msg.dst_state,
msg.dst_incarnation,
false,
);
}

if (msg.isd_cmd == .noop) {
Expand Down Expand Up @@ -548,7 +553,7 @@ pub fn Fleet(UserData: type) type {
//
if (msg.name == name) {
const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port);
try self.addOrSet(src, msg.src_state, msg.src_incarnation, true);
try self.upsertMember(src, msg.src_state, msg.src_incarnation, true);

const dst = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port);

Expand Down Expand Up @@ -577,7 +582,7 @@ pub fn Fleet(UserData: type) type {
msg.dst_state = msg.src_state;
msg.dst_incarnation = msg.src_incarnation;

try self.addOrSet(dst, .alive, msg.src_incarnation, true);
try self.upsertMember(dst, .alive, msg.src_incarnation, true);

// Handle leader protocol (ingress).
self.setLeaderProtoRecv(msg);
Expand Down Expand Up @@ -699,7 +704,7 @@ pub fn Fleet(UserData: type) type {
const dst = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port);
const pdst = try self.ensureKeyRef(dst);
log.debug("[{d}] received join2leader, add {s}", .{ i, pdst });
try self.addOrSet(pdst, .alive, 0, false);
try self.upsertMember(pdst, .alive, 0, false);
},
else => {},
}
Expand Down Expand Up @@ -876,7 +881,7 @@ pub fn Fleet(UserData: type) type {
const elapsed = tm.read();
if (elapsed < self.protocol_time) {
const left = self.protocol_time - elapsed;
log.debug("[{d}] sleep for {any}", .{ i, std.fmt.fmtDuration(left) });
// log.debug("[{d}] sleep for {any}", .{ i, std.fmt.fmtDuration(left) });
std.time.sleep(left);
}
}
Expand Down Expand Up @@ -1383,12 +1388,12 @@ pub fn Fleet(UserData: type) type {

return switch (msg.cmd) {
.ack => b: {
try self.addOrSet(key, .alive, msg.src_incarnation, true);
try self.upsertMember(key, .alive, msg.src_incarnation, true);

// Consume dst_* as piggybacked ISD info.
if (msg.dst_cmd == .infect) {
const k = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port);
try self.addOrSet(k, msg.dst_state, msg.dst_incarnation, false);
try self.upsertMember(k, msg.dst_state, msg.dst_incarnation, false);
}

// Consume isd_* as the main ISD info.
Expand Down Expand Up @@ -1465,8 +1470,19 @@ pub fn Fleet(UserData: type) type {

switch (msg.cmd) {
.ack => {
try args.self.addOrSet(args.src, msg.src_state, msg.src_incarnation, true);
try args.self.addOrSet(args.dst, msg.dst_state, msg.dst_incarnation, true);
try args.self.upsertMember(
args.src,
msg.src_state,
msg.src_incarnation,
true,
);

try args.self.upsertMember(
args.dst,
msg.dst_state,
msg.dst_incarnation,
true,
);

// Consume isd_* as the main ISD info.
switch (msg.isd_cmd) {
Expand All @@ -1481,7 +1497,7 @@ pub fn Fleet(UserData: type) type {
const ptr = &args.ack;
ptr.* = true;
},
.nack => try args.self.addOrSet(
.nack => try args.self.upsertMember(
args.src,
msg.src_state,
msg.src_incarnation,
Expand Down Expand Up @@ -1768,10 +1784,10 @@ pub fn Fleet(UserData: type) type {
});
}

// Add a new member or update an existing member's info. This function duplicates
// the key using self.allocator when adding a new member, not when updating an
// existing one.
fn addOrSet(
// Add a new member or update an existing member's info. This function
// duplicates the key using self.allocator when adding a new member,
// not when updating an existing one.
fn upsertMember(
self: *Self,
key: []const u8,
state: ?MemberState,
Expand Down

0 comments on commit e271a8c

Please sign in to comment.