Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSR/Client: Add vsr.Client.register() #1946

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/aof.zig
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ pub const AOFReplayClient = struct {
);
errdefer client.deinit(allocator);

client.register(register_callback, undefined);
while (client.request_inflight != null) {
client.tick();
try io.run_for_ns(constants.tick_ms * std.time.ns_per_ms);
}

return Self{
.io = io,
.message_pool = message_pool,
Expand Down Expand Up @@ -380,6 +386,14 @@ pub const AOFReplayClient = struct {
}
}

fn register_callback(
user_data: u128,
result: *const vsr.RegisterResult,
) void {
_ = user_data;
_ = result;
}

fn replay_callback(
user_data: u128,
operation: StateMachine.Operation,
Expand Down
22 changes: 22 additions & 0 deletions src/clients/c/tb_client/context.zig
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn ContextType(
io: IO,
message_pool: MessagePool,
client: Client,
registered: bool,

completion_fn: tb_completion_t,
implementation: ContextImplementation,
Expand Down Expand Up @@ -218,6 +219,9 @@ pub fn ContextType(
};
};

context.registered = false;
context.client.register(client_register_callback, @intFromPtr(context));

return context;
}

Expand All @@ -237,6 +241,14 @@ pub fn ContextType(
}
}

fn client_register_callback(user_data: u128, result: *const vsr.RegisterResult) void {
const self: *Context = @ptrFromInt(@as(usize, @intCast(user_data)));
_ = result;
self.registered = true;
// Some requests may have queued up while the client was registering.
self.signal.notify();
}

pub fn tick(self: *Context) void {
self.client.tick();
}
Expand Down Expand Up @@ -269,12 +281,22 @@ pub fn ContextType(

fn on_signal(signal: *Signal) void {
const self = @fieldParentPtr(Context, "signal", signal);

// Don't send any requests until registration completes.
if (!self.registered) {
assert(self.client.request_inflight != null);
assert(self.client.request_inflight.?.message.header.operation == .register);
return;
}

while (self.submitted.pop()) |packet| {
self.request(packet);
}
}

fn request(self: *Context, packet: *Packet) void {
assert(self.registered);

// Get the size of each request structure in the packet.data:
const event_size: usize = operation_event_size(packet.operation) orelse {
return self.on_complete(packet, error.InvalidOperation);
Expand Down
52 changes: 44 additions & 8 deletions src/clients/c/tb_client/echo_client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn EchoClient(comptime StateMachine_: type, comptime MessageBus: type) type
id: u128,
cluster: u128,
release: vsr.Release = vsr.Release.minimum,
request_number: u32 = 1,
request_number: u32 = 0,
request_inflight: ?Request = null,
message_pool: *MessagePool,

Expand Down Expand Up @@ -90,7 +90,7 @@ pub fn EchoClient(comptime StateMachine_: type, comptime MessageBus: type) type
defer self.release_message(reply.base());

// Copy the request message's entire content including header into the reply.
const operation = inflight.message.header.operation.cast(Self.StateMachine);
const operation = inflight.message.header.operation;
stdx.copy_disjoint(
.exact,
u8,
Expand All @@ -101,11 +101,47 @@ pub fn EchoClient(comptime StateMachine_: type, comptime MessageBus: type) type
// Similarly to the real client, release the request message before invoking the
// callback. This necessitates a `copy_disjoint` above.
self.release_message(inflight.message.base());
inflight.callback(
inflight.user_data,
operation,
reply.body(),
);

switch (inflight.callback) {
.request => |callback| {
callback(inflight.user_data, operation.cast(Self.StateMachine), reply.body());
},
.register => |callback| {
const result = vsr.RegisterResult{};
callback(inflight.user_data, &result);
},
}
}

pub fn register(
self: *Self,
callback: Request.RegisterCallback,
user_data: u128,
) void {
assert(self.request_inflight == null);
assert(self.request_number == 0);

const message = self.get_message().build(.request);
errdefer self.release_message(message.base());

// We will set parent, session, view and checksums only when sending for the first time:
message.header.* = .{
.client = self.id,
.request = self.request_number,
.cluster = self.cluster,
.command = .request,
.operation = .register,
.release = vsr.Release.minimum,
};

assert(self.request_number == 0);
self.request_number += 1;

self.request_inflight = .{
.message = message,
.user_data = user_data,
.callback = .{ .register = callback },
};
}

pub fn request(
Expand Down Expand Up @@ -158,7 +194,7 @@ pub fn EchoClient(comptime StateMachine_: type, comptime MessageBus: type) type
self.request_inflight = .{
.message = message,
.user_data = user_data,
.callback = callback,
.callback = .{ .request = callback },
};
}

Expand Down
1 change: 0 additions & 1 deletion src/message_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub const Options = union(vsr.ProcessType) {
sum += constants.replicas_max; // Connection.recv_message
// Connection.send_queue:
sum += constants.replicas_max * constants.connection_send_queue_max_client;
sum += 1; // Client.register_inflight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

sum += 1; // Client.request_inflight
// Handle bursts.
// (e.g. Connection.parse_message(), or sending a ping when the send queue is full).
Expand Down
19 changes: 19 additions & 0 deletions src/repl.zig
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,13 @@ pub fn ReplType(comptime MessageBus: type) type {
);
repl.client = &client;

client.register(register_callback, @intCast(@intFromPtr(&repl)));
while (!repl.event_loop_done) {
repl.client.tick();
try io.run_for_ns(constants.tick_ms * std.time.ns_per_ms);
}
repl.event_loop_done = false;

if (statements.len > 0) {
var statements_iterator = std.mem.split(u8, statements, ";");
while (statements_iterator.next()) |statement_string| {
Expand Down Expand Up @@ -688,6 +695,18 @@ pub fn ReplType(comptime MessageBus: type) type {
}
}

fn register_callback(
user_data: u128,
result: *const vsr.RegisterResult,
) void {
_ = result;

const repl: *Repl = @ptrFromInt(@as(usize, @intCast(user_data)));
assert(!repl.event_loop_done);

repl.event_loop_done = true;
}

fn send(
repl: *Repl,
operation: StateMachine.Operation,
Expand Down
9 changes: 7 additions & 2 deletions src/simulator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,13 @@ pub const Simulator = struct {
assert(simulator.requests_sent == simulator.options.requests_max);
assert(simulator.reply_sequence.empty());
for (simulator.cluster.clients) |*client| {
assert(client.register_inflight == null);
assert(client.request_inflight == null);
if (client.request_inflight) |request| {
// Registration isn't counted by requests_sent, so an operation=register may still
// be in-flight. Any other requests should already be complete before done() is
// called.
assert(request.message.header.operation == .register);
return false;
}
}

// Even though there are no client requests in progress, the cluster may be upgrading.
Expand Down
13 changes: 12 additions & 1 deletion src/testing/cluster.zig
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
client.on_reply_context = cluster;
client.on_reply_callback = client_on_reply;
network.link(client.message_bus.process, &client.message_bus);
// TODO Move this up a level -- I think we could simplify/improve replica_test if
// this wasn't automatic.
client.register(register_callback, undefined);
}

return cluster;
Expand Down Expand Up @@ -634,7 +637,6 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
/// instead because:
/// - Cluster needs access to the request
/// - Cluster needs access to the reply message (not just the body)
/// - Cluster needs to know about command=register messages
///
/// See `on_reply`.
fn request_callback(
Expand All @@ -647,6 +649,15 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
_ = result;
}

/// See request_callback().
fn register_callback(
user_data: u128,
result: *const vsr.RegisterResult,
) void {
_ = user_data;
_ = result;
}

fn client_on_reply(
client: *Client,
request_message: *Message.Request,
Expand Down
6 changes: 2 additions & 4 deletions src/testing/cluster/state_checker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,11 @@ pub fn StateCheckerType(comptime Client: type, comptime Replica: type) type {
if (client.id == header_b.?.client) break client;
} else unreachable;

if (client.register_inflight == null and
client.request_inflight == null)
{
if (client.request_inflight == null) {
return error.ReplicaTransitionedToInvalidState;
}

const request = client.register_inflight orelse client.request_inflight.?.message;
const request = client.request_inflight.?.message;
assert(request.header.client == header_b.?.client);
assert(request.header.checksum == header_b.?.request_checksum);
assert(request.header.request == header_b.?.request);
Expand Down
18 changes: 18 additions & 0 deletions src/tigerbeetle/benchmark_load.zig
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ pub fn main(
.id_order = cli_args.id_order,
};

benchmark.client.register(Benchmark.register_callback, @intCast(@intFromPtr(&benchmark)));
while (!benchmark.done) {
benchmark.client.tick();
try benchmark.io.run_for_ns(constants.tick_ms * std.time.ns_per_ms);
}
benchmark.done = false;

benchmark.create_accounts();

while (!benchmark.done) {
Expand Down Expand Up @@ -498,6 +505,17 @@ const Benchmark = struct {

callback(b, operation, result);
}

fn register_callback(
user_data: u128,
result: *const vsr.RegisterResult,
) void {
_ = result;

const b: *Benchmark = @ptrFromInt(@as(usize, @intCast(user_data)));
assert(!b.done);
b.done = true;
}
};

fn print_deciles(
Expand Down
Loading
Loading