From 222adba56a09c0f8fb3961b6b480d14d2795e081 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 2 Sep 2024 12:09:59 -0400 Subject: [PATCH 01/12] Rewrite leader__barrier and leader__exec Signed-off-by: Cole Miller --- src/gateway.c | 99 ++++--- src/leader.c | 508 ++++++++++++++++++++++------------- src/leader.h | 56 ++-- test/unit/test_gateway.c | 9 +- test/unit/test_replication.c | 47 ++-- 5 files changed, 461 insertions(+), 258 deletions(-) diff --git a/src/gateway.c b/src/gateway.c index 8cfe4fac1..19ae9e709 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -309,10 +309,8 @@ static int handle_open(struct gateway *g, struct handle *req) return 0; } -static void prepareBarrierCb(struct barrier *barrier, int status) +static void prepare_bottom_half(struct gateway *g, int status) { - tracef("prepare barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; struct response_stmt response_v0 = { 0 }; struct response_stmt_with_offset response_v1 = { 0 }; @@ -381,6 +379,13 @@ static void prepareBarrierCb(struct barrier *barrier, int status) } } +static void prepare_barrier_cb(struct barrier *barrier, int status) +{ + tracef("prepare barrier cb status:%d", status); + struct gateway *g = barrier->data; + prepare_bottom_half(g, status); +} + static int handle_prepare(struct gateway *g, struct handle *req) { tracef("handle prepare"); @@ -413,8 +418,10 @@ static int handle_prepare(struct gateway *g, struct handle *req) req->stmt_id = stmt->id; req->sql = request.sql; g->req = req; - rc = leader__barrier(g->leader, &g->barrier, prepareBarrierCb); - if (rc != 0) { + rc = leader_barrier_v2(g->leader, &g->barrier, prepare_barrier_cb); + if (rc == LEADER_NOT_ASYNC) { + prepare_bottom_half(g, 0); + } else if (rc != 0) { tracef("handle prepare barrier failed %d", rc); stmt__registry_del(&g->stmts, stmt); g->req = NULL; @@ -478,7 +485,6 @@ static int handle_exec(struct gateway *g, struct handle *req) struct stmt *stmt; struct request_exec request = { 0 }; int tuple_format; - uint64_t req_id; int rv; switch (req->schema) { @@ -513,10 +519,11 @@ static int handle_exec(struct gateway *g, struct handle *req) } req->stmt_id = stmt->id; g->req = req; - req_id = idNext(&g->random_state); - rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id, - leader_exec_cb); - if (rv != 0) { + rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, leader_exec_cb); + if (rv == LEADER_NOT_ASYNC) { + /* XXX */ + leader_exec_cb(&g->exec, g->exec.status); + } else if (rv != 0) { tracef("handle exec leader exec failed %d", rv); g->req = NULL; return rv; @@ -600,10 +607,8 @@ static void query_batch(struct gateway *g) #endif } -static void query_barrier_cb(struct barrier *barrier, int status) +static void query_bottom_half(struct gateway *g, int status) { - tracef("query barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -620,6 +625,13 @@ static void query_barrier_cb(struct barrier *barrier, int status) query_batch(g); } +static void query_barrier_cb(struct barrier *barrier, int status) +{ + tracef("query barrier cb status:%d", status); + struct gateway *g = barrier->data; + query_bottom_half(g, status); +} + static void leaderModifyingQueryCb(struct exec *exec, int status) { struct gateway *g = exec->data; @@ -646,7 +658,6 @@ static int handle_query(struct gateway *g, struct handle *req) struct request_query request = { 0 }; int tuple_format; bool is_readonly; - uint64_t req_id; int rv; switch (req->schema) { @@ -684,11 +695,18 @@ static int handle_query(struct gateway *g, struct handle *req) is_readonly = (bool)sqlite3_stmt_readonly(stmt->stmt); if (is_readonly) { - rv = leader__barrier(g->leader, &g->barrier, query_barrier_cb); + rv = leader_barrier_v2(g->leader, &g->barrier, query_barrier_cb); + if (rv == LEADER_NOT_ASYNC) { + query_bottom_half(g, 0); + rv = 0; + } } else { - req_id = idNext(&g->random_state); - rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id, - leaderModifyingQueryCb); + rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, + leaderModifyingQueryCb); + if (rv == LEADER_NOT_ASYNC) { + /* XXX */ + leaderModifyingQueryCb(&g->exec, g->exec.status); + } } if (rv != 0) { g->req = NULL; @@ -813,10 +831,8 @@ static void handle_exec_sql_next(struct gateway *g, g->req = NULL; } -static void execSqlBarrierCb(struct barrier *barrier, int status) +static void exec_sql_bottom_half(struct gateway *g, int status) { - tracef("exec sql barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -829,6 +845,13 @@ static void execSqlBarrierCb(struct barrier *barrier, int status) handle_exec_sql_next(g, req, false); } +static void exec_sql_barrier_cb(struct barrier *barrier, int status) +{ + tracef("exec sql barrier cb status:%d", status); + struct gateway *g = barrier->data; + exec_sql_bottom_half(g, status); +} + static int handle_exec_sql(struct gateway *g, struct handle *req) { tracef("handle exec sql schema:%" PRIu8, req->schema); @@ -856,8 +879,10 @@ static int handle_exec_sql(struct gateway *g, struct handle *req) req->sql = request.sql; req->exec_count = 0; g->req = req; - rc = leader__barrier(g->leader, &g->barrier, execSqlBarrierCb); - if (rc != 0) { + rc = leader_barrier_v2(g->leader, &g->barrier, exec_sql_barrier_cb); + if (rc == LEADER_NOT_ASYNC) { + exec_sql_bottom_half(g, 0); + } else if (rc != 0) { tracef("handle exec sql barrier failed %d", rc); g->req = NULL; return rc; @@ -884,10 +909,8 @@ static void leaderModifyingQuerySqlCb(struct exec *exec, int status) } } -static void querySqlBarrierCb(struct barrier *barrier, int status) +static void query_sql_bottom_half(struct gateway *g, int status) { - tracef("query sql barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -898,7 +921,6 @@ static void querySqlBarrierCb(struct barrier *barrier, int status) sqlite3_stmt *tail_stmt; int tuple_format; bool is_readonly; - uint64_t req_id; int rv; if (status != 0) { @@ -953,10 +975,12 @@ static void querySqlBarrierCb(struct barrier *barrier, int status) if (is_readonly) { query_batch(g); } else { - req_id = idNext(&g->random_state); - rv = leader__exec(g->leader, &g->exec, stmt, req_id, - leaderModifyingQuerySqlCb); - if (rv != 0) { + rv = leader_exec_v2(g->leader, &g->exec, stmt, + leaderModifyingQuerySqlCb); + if (rv == LEADER_NOT_ASYNC) { + /* XXX */ + leaderModifyingQuerySqlCb(&g->exec, g->exec.status); + } else if (rv != 0) { sqlite3_finalize(stmt); g->req = NULL; failure(req, rv, "leader exec"); @@ -964,6 +988,13 @@ static void querySqlBarrierCb(struct barrier *barrier, int status) } } +static void query_sql_barrier_cb(struct barrier *barrier, int status) +{ + tracef("query sql barrier cb status:%d", status); + struct gateway *g = barrier->data; + query_sql_bottom_half(g, status); +} + static int handle_query_sql(struct gateway *g, struct handle *req) { tracef("handle query sql schema:%" PRIu8, req->schema); @@ -988,8 +1019,10 @@ static int handle_query_sql(struct gateway *g, struct handle *req) FAIL_IF_CHECKPOINTING; req->sql = request.sql; g->req = req; - rv = leader__barrier(g->leader, &g->barrier, querySqlBarrierCb); - if (rv != 0) { + rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb); + if (rv == LEADER_NOT_ASYNC) { + query_sql_bottom_half(g, 0); + } else if (rv != 0) { tracef("handle query sql barrier failed %d", rv); g->req = NULL; return rv; diff --git a/src/leader.c b/src/leader.c index 61ef91639..6e1d5ded6 100644 --- a/src/leader.c +++ b/src/leader.c @@ -16,17 +16,6 @@ #include "utils.h" #include "vfs.h" -/* Called when a leader exec request terminates and the associated callback can - * be invoked. */ -static void leaderExecDone(struct exec *req) -{ - tracef("leader exec done id:%" PRIu64, req->id); - req->leader->exec = NULL; - if (req->cb != NULL) { - req->cb(req, req->status); - } -} - /* Open a SQLite connection and set it to leader replication mode. */ static int openConnection(const char *filename, const char *vfs, @@ -146,6 +135,8 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft) return 0; } +static void exec_done(struct exec *, int); + void leader__close(struct leader *l) { tracef("leader close"); @@ -154,7 +145,7 @@ void leader__close(struct leader *l) if (l->exec != NULL) { assert(l->inflight == NULL); l->exec->status = SQLITE_ERROR; - leaderExecDone(l->exec); + exec_done(l->exec, 0); } rc = sqlite3_close(l->conn); assert(rc == 0); @@ -241,66 +232,10 @@ static void leaderMaybeCheckpointLegacy(struct leader *l) raft_free(buf.base); } -static void leaderApplyFramesCb(struct raft_apply *req, - int status, - void *result) -{ - tracef("apply frames cb id:%" PRIu64, idExtract(req->req_id)); - struct apply *apply = req->data; - struct leader *l = apply->leader; - if (l == NULL) { - raft_free(apply); - return; - } - - (void)result; - - if (status != 0) { - tracef("apply frames cb failed status %d", status); - sqlite3_vfs *vfs = sqlite3_vfs_find(l->db->config->name); - switch (status) { - case RAFT_LEADERSHIPLOST: - l->exec->status = SQLITE_IOERR_LEADERSHIP_LOST; - break; - case RAFT_NOSPACE: - l->exec->status = SQLITE_IOERR_WRITE; - break; - case RAFT_SHUTDOWN: - /* If we got here it means we have manually - * fired the apply callback from - * gateway__close(). In this case we don't - * free() the apply object, since it will be - * freed when the callback is fired again by - * raft. - * - * TODO: we should instead make gatewa__close() - * itself asynchronous. */ - apply->leader = NULL; - l->exec->status = SQLITE_ABORT; - goto finish; - break; - default: - l->exec->status = SQLITE_IOERR; - break; - } - VfsAbort(vfs, l->db->path); - } - - raft_free(apply); - - if (status == 0) { - leaderMaybeCheckpointLegacy(l); - } - -finish: - l->inflight = NULL; - l->db->tx_id = 0; - leaderExecDone(l->exec); -} - static int leaderApplyFrames(struct exec *req, dqlite_vfs_frame *frames, - unsigned n) + unsigned n, + raft_apply_cb cb) { tracef("leader apply frames id:%" PRIu64, req->id); struct leader *l = req->leader; @@ -337,11 +272,11 @@ static int leaderApplyFrames(struct exec *req, idSet(apply->req.req_id, req->id); #ifdef USE_SYSTEM_RAFT - rv = raft_apply(l->raft, &apply->req, &buf, 1, leaderApplyFramesCb); + rv = raft_apply(l->raft, &apply->req, &buf, 1, cb); #else /* TODO actual WAL slice goes here */ struct raft_entry_local_data local_data = {}; - rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, leaderApplyFramesCb); + rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, cb); #endif if (rv != 0) { tracef("raft apply failed %d", rv); @@ -362,9 +297,204 @@ static int leaderApplyFrames(struct exec *req, return rv; } -static void leaderExecV2(struct exec *req, enum pool_half half) +enum { + BARRIER_START, + BARRIER_PASSED, + BARRIER_DONE, + BARRIER_FAIL, + BARRIER_NR, +}; + +static const struct sm_conf barrier_states[BARRIER_NR] = { + [BARRIER_START] = { + .name = "start", + .allowed = BITS(BARRIER_PASSED) + |BITS(BARRIER_DONE) + |BITS(BARRIER_FAIL), + .flags = SM_INITIAL, + }, + [BARRIER_PASSED] = { + .name = "passed", + .allowed = BITS(BARRIER_DONE) + |BITS(BARRIER_FAIL), + }, + [BARRIER_DONE] = { + .name = "done", + .flags = SM_FINAL, + }, + [BARRIER_FAIL] = { + .name = "fail", + .flags = SM_FINAL|SM_FAILURE, + }, +}; + +static bool barrier_invariant(const struct sm *sm, int prev) +{ + (void)sm; + (void)prev; + return true; +} + +static void barrier_done(struct barrier *barrier, int status) +{ + PRE(barrier != NULL); + int state = sm_state(&barrier->sm); + PRE(state == BARRIER_START || state == BARRIER_PASSED); + void (*cb)(struct barrier *, int) = barrier->cb; + PRE(cb != NULL); + + if (status != 0) { + sm_fail(&barrier->sm, BARRIER_FAIL, status); + } else { + sm_move(&barrier->sm, BARRIER_DONE); + } + sm_fini(&barrier->sm); + /* TODO(cole) uncommment this once the barrier-callback-runs-twice + * issue is fixed. */ + /* barrier->req.data = NULL; */ + barrier->leader = NULL; + barrier->cb = NULL; + + if (state == BARRIER_PASSED) { + cb(barrier, status); + } +} + +static void barrier_raft_cb(struct raft_barrier *, int); + +static int barrier_async(struct barrier *barrier, int status) +{ + int rv; + + if (sm_state(&barrier->sm) == BARRIER_START) { + PRE(status == 0); + rv = raft_barrier(barrier->leader->raft, &barrier->req, barrier_raft_cb); + if (rv != 0) { + barrier_done(barrier, rv); + } + return rv; + } + + PRE(sm_state(&barrier->sm) == BARRIER_PASSED); + status = status == 0 ? 0 : + status == RAFT_LEADERSHIPLOST ? SQLITE_IOERR_LEADERSHIP_LOST : + SQLITE_ERROR; + barrier_done(barrier, status); + return 0; +} + +static void barrier_raft_cb(struct raft_barrier *rb, int status) +{ + struct barrier *barrier = rb->data; + PRE(barrier != NULL); + /* TODO(cole) it seems that raft can invoke this callback more than + * once, investigate and fix that and then remove this workaround. */ + if (sm_state(&barrier->sm) > BARRIER_START) { + return; + } + sm_move(&barrier->sm, BARRIER_PASSED); + (void)barrier_async(rb->data, status); +} + +int leader_barrier_v2(struct leader *l, + struct barrier *barrier, + barrier_cb cb) +{ + int rv; + + if (!needsBarrier(l)) { + return LEADER_NOT_ASYNC; + } + + sm_init(&barrier->sm, barrier_invariant, NULL, barrier_states, "barrier", + BARRIER_START); + barrier->cb = cb; + barrier->leader = l; + barrier->req.data = barrier; + rv = barrier_async(barrier, 0); + POST(rv != LEADER_NOT_ASYNC); + return rv; +} + +enum { + EXEC_START, + EXEC_BARRIER, + EXEC_STEPPED, + EXEC_POLLED, + EXEC_APPLIED, + EXEC_DONE, + EXEC_FAILED, + EXEC_NR, +}; + +static const struct sm_conf exec_states[EXEC_NR] = { + [EXEC_START] = { + .name = "start", + .allowed = BITS(EXEC_BARRIER) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + .flags = SM_INITIAL, + }, + [EXEC_BARRIER] = { + .name = "barrier", + .allowed = BITS(EXEC_STEPPED) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_STEPPED] = { + .name = "stepped", + .allowed = BITS(EXEC_POLLED) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_POLLED] = { + .name = "polled", + .allowed = BITS(EXEC_APPLIED) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_APPLIED] = { + .name = "applied", + .allowed = BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_DONE] = { + .name = "done", + .flags = SM_FINAL, + }, + [EXEC_FAILED] = { + .name = "failed", + .flags = SM_FAILURE|SM_FINAL, + }, +}; + +static bool exec_invariant(const struct sm *sm, int prev) +{ + (void)sm; + (void)prev; + return true; +} + +static void exec_done(struct exec *req, int asyncness) +{ + int status = req->status; + status = status ? status : SQLITE_ERROR; + if (status == SQLITE_DONE) { + sm_move(&req->sm, EXEC_DONE); + } else { + sm_fail(&req->sm, EXEC_FAILED, status); + } + sm_fini(&req->sm); + req->leader->exec = NULL; + if (req->cb != NULL && asyncness == 0) { + req->cb(req, status); + } +} + +static void exec_apply_cb(struct raft_apply *, int, void *); + +static int exec_apply(struct exec *req) { - tracef("leader exec v2 id:%" PRIu64, req->id); struct leader *l = req->leader; struct db *db = l->db; sqlite3_vfs *vfs = sqlite3_vfs_find(db->config->name); @@ -374,159 +504,175 @@ static void leaderExecV2(struct exec *req, enum pool_half half) unsigned i; int rv; - if (half == POOL_TOP_HALF) { - req->status = sqlite3_step(req->stmt); - return; - } /* else POOL_BOTTOM_HALF => */ + req->status = sqlite3_step(req->stmt); + sm_move(&req->sm, EXEC_STEPPED); rv = VfsPoll(vfs, db->path, &frames, &n); - if (rv != 0 || n == 0) { - tracef("vfs poll"); - goto finish; + if (rv != 0) { + return rv; + } + sm_move(&req->sm, EXEC_POLLED); + if (n == 0) { + return LEADER_NOT_ASYNC; } /* Check if the new frames would create an overfull database */ size = VfsDatabaseSize(vfs, db->path, n, db->config->page_size); if (size > VfsDatabaseSizeLimit(vfs)) { rv = SQLITE_FULL; - goto abort; + goto err; } - rv = leaderApplyFrames(req, frames, n); + rv = leaderApplyFrames(req, frames, n, exec_apply_cb); if (rv != 0) { - goto abort; - } - - for (i = 0; i < n; i++) { - sqlite3_free(frames[i].data); + goto err; } - sqlite3_free(frames); - return; -abort: +err: for (i = 0; i < n; i++) { sqlite3_free(frames[i].data); } sqlite3_free(frames); - VfsAbort(vfs, l->db->path); -finish: if (rv != 0) { - tracef("exec v2 failed %d", rv); - l->exec->status = rv; + VfsAbort(vfs, l->db->path); } - leaderExecDone(l->exec); + return rv; } -#ifdef DQLITE_NEXT +static int exec_async(struct exec *, int); -static void exec_top(pool_work_t *w) +static void exec_apply_cb(struct raft_apply *req, + int status, + void *result) { - struct exec *req = CONTAINER_OF(w, struct exec, work); - leaderExecV2(req, POOL_TOP_HALF); + (void)result; + struct apply *apply = req->data; + struct leader *l; + struct exec *exec; + + l = apply->leader; + if (l == NULL) { + raft_free(apply); + return; + } + + exec = l->exec; + PRE(exec != NULL); + sm_move(&exec->sm, EXEC_APPLIED); + if (status == RAFT_SHUTDOWN) { + apply->leader = NULL; + } else { + raft_free(apply); + } + exec_async(exec, status); } -static void exec_bottom(pool_work_t *w) +static int exec_status(int r) { - struct exec *req = CONTAINER_OF(w, struct exec, work); - leaderExecV2(req, POOL_BOTTOM_HALF); + PRE(r != 0); + return r == RAFT_LEADERSHIPLOST ? SQLITE_IOERR_LEADERSHIP_LOST : + r == RAFT_NOSPACE ? SQLITE_IOERR_WRITE : + r == RAFT_SHUTDOWN ? SQLITE_ABORT : + SQLITE_IOERR; } -#endif - -static void execBarrierCb(struct barrier *barrier, int status) +static void exec_barrier_cb(struct barrier *barrier, int status) { - tracef("exec barrier cb status %d", status); struct exec *req = barrier->data; - struct leader *l = req->leader; + PRE(req != NULL); + sm_move(&req->sm, EXEC_BARRIER); + exec_async(req, status); +} - if (status != 0) { - l->exec->status = status; - leaderExecDone(l->exec); - return; +/** + * Exec request pseudo-coroutine, encapsulating the whole lifecycle. + */ +int exec_async(struct exec *req, int status) +{ + struct leader *l; + sqlite3_vfs *vfs; + int barrier_rv = 0; + int apply_rv = 0; + int ret = 0; + + switch (sm_state(&req->sm)) { + case EXEC_START: + PRE(status == 0); + l = req->leader; + PRE(l != NULL); + barrier_rv = leader_barrier_v2(l, &req->barrier, exec_barrier_cb); + ret = barrier_rv; + if (barrier_rv == 0) { + break; + } else if (barrier_rv != LEADER_NOT_ASYNC) { + l->exec = NULL; + break; + } /* else barrier_rv == LEADER_NOT_ASYNC => */ + sm_move(&req->sm, EXEC_BARRIER); + POST(status == 0); + /* fallthrough */ + case EXEC_BARRIER: + if (status != 0) { + req->status = status; + exec_done(req, ret); + break; + } + apply_rv = exec_apply(req); + if (apply_rv == 0) { + ret = 0; + break; + } else if (apply_rv != LEADER_NOT_ASYNC) { + req->status = apply_rv; + exec_done(req, ret); + ret = 0; + break; + } /* else apply_rv == LEADER_NOT_ASYNC => */ + ret &= LEADER_NOT_ASYNC; + sm_move(&req->sm, EXEC_APPLIED); + POST(status == 0); + /* fallthrough */ + case EXEC_APPLIED: + l = req->leader; + PRE(l != NULL); + vfs = sqlite3_vfs_find(l->db->config->name); + PRE(vfs != NULL); + if (apply_rv == 0) { + if (status == 0) { + leaderMaybeCheckpointLegacy(l); + } else { + req->status = exec_status(status); + VfsAbort(vfs, l->db->path); + } + l->inflight = NULL; + l->db->tx_id = 0; + } + exec_done(req, ret); + break; + default: + POST(false && "impossible!"); } -#ifdef DQLITE_NEXT - struct dqlite_node *node = l->raft->data; - pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT) - ? pool_ut_fallback() : &node->pool; - pool_queue_work(pool, &req->work, l->db->cookie, - WT_UNORD, exec_top, exec_bottom); -#else - leaderExecV2(req, POOL_TOP_HALF); - leaderExecV2(req, POOL_BOTTOM_HALF); -#endif + return ret; } -int leader__exec(struct leader *l, - struct exec *req, - sqlite3_stmt *stmt, - uint64_t id, - exec_cb cb) +int leader_exec_v2(struct leader *l, + struct exec *req, + sqlite3_stmt *stmt, + exec_cb cb) { - tracef("leader exec id:%" PRIu64, id); - int rv; if (l->exec != NULL) { - tracef("busy"); return SQLITE_BUSY; } l->exec = req; + sm_init(&req->sm, exec_invariant, NULL, exec_states, "exec", + EXEC_START); req->leader = l; req->stmt = stmt; - req->id = id; req->cb = cb; req->barrier.data = req; req->barrier.cb = NULL; req->work = (pool_work_t){}; - rv = leader__barrier(l, &req->barrier, execBarrierCb); - if (rv != 0) { - l->exec = NULL; - return rv; - } - return 0; -} - -static void raftBarrierCb(struct raft_barrier *req, int status) -{ - tracef("raft barrier cb status %d", status); - struct barrier *barrier = req->data; - int rv = 0; - if (status != 0) { - if (status == RAFT_LEADERSHIPLOST) { - rv = SQLITE_IOERR_LEADERSHIP_LOST; - } else { - rv = SQLITE_ERROR; - } - } - barrier_cb cb = barrier->cb; - if (cb == NULL) { - tracef("barrier cb already fired"); - return; - } - barrier->cb = NULL; - cb(barrier, rv); -} - -int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb) -{ - tracef("leader barrier"); - int rv; - if (!needsBarrier(l)) { - tracef("not needed"); - cb(barrier, 0); - return 0; - } - barrier->cb = cb; - barrier->leader = l; - barrier->req.data = barrier; - rv = raft_barrier(l->raft, &barrier->req, raftBarrierCb); - if (rv != 0) { - tracef("raft barrier failed %d", rv); - barrier->req.data = NULL; - barrier->leader = NULL; - barrier->cb = NULL; - return rv; - } - return 0; + return exec_async(req, 0); } diff --git a/src/leader.h b/src/leader.h index 9d022d3e9..2a078755d 100644 --- a/src/leader.h +++ b/src/leader.h @@ -8,14 +8,17 @@ #include #include -#include "./lib/queue.h" #include "db.h" +#include "lib/queue.h" +#include "lib/sm.h" #include "lib/threadpool.h" #include "raft.h" #define SQLITE_IOERR_NOT_LEADER (SQLITE_IOERR | (40 << 8)) #define SQLITE_IOERR_LEADERSHIP_LOST (SQLITE_IOERR | (41 << 8)) +#define LEADER_NOT_ASYNC INT_MAX + struct exec; struct barrier; struct leader; @@ -47,6 +50,7 @@ struct leader { struct barrier { void *data; + struct sm sm; struct leader *leader; struct raft_barrier req; barrier_cb cb; @@ -57,6 +61,7 @@ struct barrier { */ struct exec { void *data; + struct sm sm; struct leader *leader; struct barrier barrier; sqlite3_stmt *stmt; @@ -79,32 +84,39 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft); void leader__close(struct leader *l); /** - * Submit a request to step a SQLite statement. + * Submit a raft barrier request if there is no transaction in progress on the + * underlying database and the FSM is behind the last log index. + * + * The callback will only be invoked asynchronously: if no barrier is needed, + * this function will return without invoking the callback. * - * The request will be dispatched to the leader loop coroutine, which will be - * resumed and will invoke sqlite_step(). If the statement triggers the - * replication hooks and one or more new Raft log entries need to be appended, - * then the loop coroutine will be paused and control will be transferred back - * to the main coroutine. In this state the leader loop coroutine call stack - * will be "blocked" on the xFrames() replication hook call triggered by the top - * sqlite_step() call. The leader loop coroutine will be resumed once the Raft - * append request completes (either successfully or not) and at that point the - * stack will rewind back to the @sqlite_step() call, returning to the leader - * loop which will then have completed the request and transfer control back to - * the main coroutine, pausing until the next request. + * Returns 0 if the callback was scheduled successfully or LEADER_NOT_ASYNC + * if no barrier is needed. Any other value indicates an error. */ -int leader__exec(struct leader *l, - struct exec *req, - sqlite3_stmt *stmt, - uint64_t id, - exec_cb cb); +int leader_barrier_v2(struct leader *l, + struct barrier *barrier, + barrier_cb cb); /** - * Submit a raft barrier request if there is no transaction in progress in the - * underlying database and the FSM is behind the last log index. + * Submit a request to step a SQLite statement. + * + * This is an asynchronous operation in general. It can yield to the event + * loop at two points: + * + * - When running the preliminary barrier (see leader_barrier_v2). This + * is skipped if no barrier is necessary. + * - When replicating the transaction in raft. This is skipped if the + * statement doesn't generate any changed pages. * - * Otherwise, just invoke the given @cb immediately. + * If both of these yields are skipped, this function returns LEADER_NOT_ASYNC + * and does not invoke the callback. In this case the caller must examine + * `req->status` to determine whether the exec was successful. Otherwise, + * this function returns 0 if it successfully scheduled the callback and + * yielded, or any other value to indicate an error. */ -int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb); +int leader_exec_v2(struct leader *l, + struct exec *req, + sqlite3_stmt *stmt, + exec_cb cb); #endif /* LEADER_H_*/ diff --git a/test/unit/test_gateway.c b/test/unit/test_gateway.c index d0e1fb7c6..8539c3ee2 100644 --- a/test/unit/test_gateway.c +++ b/test/unit/test_gateway.c @@ -567,7 +567,7 @@ TEST_CASE(prepare, barrier_error, NULL) f->request.db_id = 0; f->request.sql = "SELECT n FROM test"; ENCODE(&f->request, prepare); - /* We rely on leader__barrier (called by handle_prepare) attempting + /* We rely on leader_barrier_v2 (called by handle_prepare) attempting * an allocation using raft_malloc. */ test_raft_heap_fault_config(0, 1); test_raft_heap_fault_enable(); @@ -918,8 +918,11 @@ TEST_CASE(exec, close_while_in_flight, NULL) EXEC("INSERT INTO test(n) VALUES(1)"); } + /* Trigger a second page cache flush to the WAL, and abort before it's * done. */ + /* FIXME(cole) it seems that this may no longer be successfully triggering + * the page cache flush */ EXEC_SQL_SUBMIT("INSERT INTO test(n) VALUES(1)"); return MUNIT_OK; } @@ -1869,7 +1872,7 @@ TEST_CASE(exec_sql, barrier_error, NULL) f->request.db_id = 0; f->request.sql = "INSERT INTO test VALUES(123)"; ENCODE(&f->request, exec_sql); - /* We rely on leader__barrier (called by handle_exec_sql) attempting + /* We rely on leader_barrier_v2 (called by handle_exec_sql) attempting * an allocation using raft_malloc. */ test_raft_heap_fault_config(0, 1); test_raft_heap_fault_enable(); @@ -2243,7 +2246,7 @@ TEST_CASE(query_sql, barrier_error, NULL) f->request.db_id = 0; f->request.sql = "SELECT n FROM test"; ENCODE(&f->request, query_sql); - /* We rely on leader__barrier (called by handle_query_sql) attempting + /* We rely on leader_barrier_v2 (called by handle_query_sql) attempting * an allocation using raft_malloc. */ test_raft_heap_fault_config(0, 1); test_raft_heap_fault_enable(); diff --git a/test/unit/test_replication.c b/test/unit/test_replication.c index 00fdd9ba7..dfd0fce68 100644 --- a/test/unit/test_replication.c +++ b/test/unit/test_replication.c @@ -88,12 +88,16 @@ TEST_MODULE(replication_v1); } /* Submit an exec request using the I'th leader. */ -#define EXEC(I) \ - { \ - int rc2; \ - rc2 = leader__exec(LEADER(I), &f->req, f->stmt, 0, \ - fixture_exec_cb); \ - munit_assert_int(rc2, ==, 0); \ +#define EXEC(I) \ + { \ + int rc2; \ + rc2 = leader_exec_v2(LEADER(I), &f->req, f->stmt, \ + fixture_exec_cb); \ + if (rc2 == LEADER_NOT_ASYNC) { \ + fixture_exec_cb(&f->req, f->req.status); \ + } else { \ + munit_assert_int(rc2, ==, 0); \ + } \ } /* Convenience to prepare, execute and finalize a statement. */ @@ -167,7 +171,7 @@ TEST_CASE(init, conn, NULL) /****************************************************************************** * - * leader__exec + * leader_exec_v2 * ******************************************************************************/ @@ -349,33 +353,41 @@ static void execCb(struct exec *req, int status) f->status = status; } +static void fixture_exec(struct fixture *f, unsigned i) +{ + int rv; + + rv = leader_exec_v2(LEADER(i), &f->req, f->stmt, execCb); + if (rv == LEADER_NOT_ASYNC) { + execCb(&f->req, f->req.status); + return; + } + munit_assert_int(rv, ==, 0); +} + TEST(replication, exec, setUp, tearDown, 0, NULL) { struct fixture *f = data; - int rv; CLUSTER_ELECT(0); PREPARE(0, "BEGIN"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); + fixture_exec(f, 0); CLUSTER_APPLIED(3); - munit_assert_int(rv, ==, 0); munit_assert_true(f->invoked); munit_assert_int(f->status, ==, SQLITE_DONE); f->invoked = false; FINALIZE; PREPARE(0, "CREATE TABLE test (a INT)"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); munit_assert_true(f->invoked); munit_assert_int(f->status, ==, SQLITE_DONE); f->invoked = false; FINALIZE; PREPARE(0, "COMMIT"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); munit_assert_false(f->invoked); FINALIZE; @@ -400,21 +412,18 @@ TEST(replication, checkpoint, setUp, tearDown, 0, NULL) { struct fixture *f = data; struct config *config = CLUSTER_CONFIG(0); - int rv; config->checkpoint_threshold = 3; CLUSTER_ELECT(0); PREPARE(0, "CREATE TABLE test (n INT)"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); CLUSTER_APPLIED(4); FINALIZE; PREPARE(0, "INSERT INTO test(n) VALUES(1)"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); CLUSTER_APPLIED(6); FINALIZE; From f8a5aee97dc9480e8b5a321677efafd14a59f8a5 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 2 Sep 2024 12:09:41 -0400 Subject: [PATCH 02/12] Rewrite handle_exec_sql_next Signed-off-by: Cole Miller --- src/gateway.c | 96 +++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/src/gateway.c b/src/gateway.c index 19ae9e709..2eb9b7948 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -761,66 +761,66 @@ static void handle_exec_sql_next(struct gateway *g, bool done) { tracef("handle exec sql next"); + struct leader *l = g->leader; + PRE(l != NULL); struct cursor *cursor = &req->cursor; - struct response_result response = { 0 }; - sqlite3_stmt *stmt = NULL; + int schema = req->schema; + struct response_result response = {}; + sqlite3_stmt *stmt; + const char *sql; const char *tail; int tuple_format; - uint64_t req_id; int rv; - if (req->sql == NULL || strcmp(req->sql, "") == 0) { - goto success; - } - - /* stmt will be set to NULL by sqlite when an error occurs. */ - assert(g->leader != NULL); - rv = sqlite3_prepare_v2(g->leader->conn, req->sql, -1, &stmt, &tail); - if (rv != SQLITE_OK) { - tracef("exec sql prepare failed %d", rv); - failure(req, rv, sqlite3_errmsg(g->leader->conn)); - goto done; - } - - if (stmt == NULL) { - goto success; - } - - if (!done) { - switch (req->schema) { - case DQLITE_REQUEST_PARAMS_SCHEMA_V0: - tuple_format = TUPLE__PARAMS; - break; - case DQLITE_REQUEST_PARAMS_SCHEMA_V1: - tuple_format = TUPLE__PARAMS32; - break; - default: - /* Should have been caught by handle_exec_sql */ - assert(0); + tuple_format = schema == DQLITE_REQUEST_PARAMS_SCHEMA_V0 ? + TUPLE__PARAMS : + schema == DQLITE_REQUEST_PARAMS_SCHEMA_V1 ? + TUPLE__PARAMS32 : + (POST(false && "impossible"), 0); + + for (;;) { + stmt = NULL; + sql = req->sql; + if (sql == NULL || strcmp(sql, "") == 0) { + goto success; } - rv = bind__params(stmt, cursor, tuple_format); + /* stmt will be set to NULL by sqlite when an error occurs. */ + rv = sqlite3_prepare_v2(l->conn, sql, -1, &stmt, &tail); if (rv != SQLITE_OK) { - failure(req, rv, "bind parameters"); - goto done_after_prepare; + tracef("exec sql prepare failed %d", rv); + failure(req, rv, sqlite3_errmsg(l->conn)); + goto done; + } + if (stmt == NULL) { + goto success; + } + if (!done) { + rv = bind__params(stmt, cursor, tuple_format); + if (rv != SQLITE_OK) { + failure(req, rv, "bind parameters"); + goto done_after_prepare; + } } - } - - req->sql = tail; - g->req = req; - req_id = idNext(&g->random_state); - /* At this point, leader__exec takes ownership of stmt */ - rv = - leader__exec(g->leader, &g->exec, stmt, req_id, handle_exec_sql_cb); - if (rv != SQLITE_OK) { - failure(req, rv, sqlite3_errmsg(g->leader->conn)); - goto done_after_prepare; + req->sql = tail; + g->req = req; + rv = leader_exec_v2(g->leader, &g->exec, stmt, handle_exec_sql_cb); + if (rv == 0) { + return; + } else if (rv != LEADER_NOT_ASYNC) { + failure(req, rv, sqlite3_errmsg(l->conn)); + goto done_after_prepare; + } else if (g->exec.status != SQLITE_DONE) { + /* XXX */ + failure(req, g->exec.status, sqlite3_errmsg(l->conn)); + goto done_after_prepare; + } + done = true; + sqlite3_finalize(stmt); + req->exec_count++; } - return; - success: - tracef("handle exec sql next success"); if (req->exec_count > 0) { fill_result(g, &response); } From 5f139a222313e1c9247c1b06859b82fc010b5d61 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 2 Sep 2024 12:13:01 -0400 Subject: [PATCH 03/12] Add a regression test Signed-off-by: Cole Miller --- test/integration/test_client.c | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/integration/test_client.c b/test/integration/test_client.c index 98fccd428..c2a1cfec2 100644 --- a/test/integration/test_client.c +++ b/test/integration/test_client.c @@ -145,3 +145,29 @@ TEST(client, querySql, setUp, tearDown, 0, client_params) return MUNIT_OK; } + +/* Stress test of an EXEC_SQL with many ';'-separated statements. */ +TEST(client, semicolons, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + (void)params; + + static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);"; + + size_t n = 1000; + size_t unit = sizeof(trivial_stmt) - 1; + char *sql = munit_malloc(n * unit); + char *p = sql; + for (size_t i = 0; i < n; i++) { + memcpy(p, trivial_stmt, unit); + p += unit; + } + sql[n * unit - 1] = '\0'; + + uint64_t last_insert_id; + uint64_t rows_affected; + EXEC_SQL(sql, &last_insert_id, &rows_affected); + + free(sql); + return MUNIT_OK; +} From fc98dcc7a68605b6741d4f6665839b9055e74570 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 8 Oct 2024 10:57:07 -0400 Subject: [PATCH 04/12] Fix after merge Signed-off-by: Cole Miller --- src/leader.c | 164 +++++++++++++++------------------------------------ src/leader.h | 1 - 2 files changed, 49 insertions(+), 116 deletions(-) diff --git a/src/leader.c b/src/leader.c index fc4fb41dc..e23c38c65 100644 --- a/src/leader.c +++ b/src/leader.c @@ -17,34 +17,7 @@ #include "utils.h" #include "vfs.h" -/** - * State machine for exec requests. - */ -enum { - EXEC_START, - EXEC_BARRIER, - EXEC_STEPPED, - EXEC_POLLED, - EXEC_DONE, - EXEC_FAILED, - EXEC_NR, -}; - -#define A(ident) BITS(EXEC_##ident) -#define S(ident, allowed_, flags_) \ - [EXEC_##ident] = { .name = #ident, .allowed = (allowed_), .flags = (flags_) } - -static const struct sm_conf exec_states[EXEC_NR] = { - S(START, A(BARRIER)|A(FAILED)|A(DONE), SM_INITIAL), - S(BARRIER, A(STEPPED)|A(FAILED)|A(DONE), 0), - S(STEPPED, A(POLLED)|A(FAILED)|A(DONE), 0), - S(POLLED, A(APPLIED)|A(FAILED)|A(DONE), 0), - S(APPLIED, A(FAILED)|A(DONE), 0, - S(DONE, 0, SM_FINAL), - S(FAILED, 0, SM_FAILURE|SM_FINAL), -}; - -static bool exec_invariant(const struct sm *sm, int prev) +static bool barrier_invariant(const struct sm *sm, int prev) { (void)sm; (void)prev; @@ -169,6 +142,44 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft) return 0; } +/** + * State machine for exec requests. + */ +enum { + EXEC_START, + EXEC_BARRIER, + EXEC_STEPPED, + EXEC_POLLED, + EXEC_APPLIED, + EXEC_DONE, + EXEC_FAILED, + EXEC_NR, +}; + +#define A(ident) BITS(EXEC_##ident) +#define S(ident, allowed_, flags_) \ + [EXEC_##ident] = { .name = #ident, .allowed = (allowed_), .flags = (flags_) } + +static const struct sm_conf exec_states[EXEC_NR] = { + S(START, A(BARRIER)|A(FAILED)|A(DONE), SM_INITIAL), + S(BARRIER, A(STEPPED)|A(FAILED)|A(DONE), 0), + S(STEPPED, A(POLLED)|A(FAILED)|A(DONE), 0), + S(POLLED, A(APPLIED)|A(FAILED)|A(DONE), 0), + S(APPLIED, A(FAILED)|A(DONE), 0), + S(DONE, 0, SM_FINAL), + S(FAILED, 0, SM_FAILURE|SM_FINAL), +}; + +#undef S +#undef A + +static bool exec_invariant(const struct sm *sm, int prev) +{ + (void)sm; + (void)prev; + return true; +} + static void exec_done(struct exec *, int); void leader__close(struct leader *l) @@ -329,35 +340,19 @@ enum { BARRIER_NR, }; +#define A(ident) BITS(BARRIER_##ident) +#define S(ident, allowed_, flags_) \ + [BARRIER_##ident] = { .name = #ident, .allowed = (allowed_), .flags = (flags_) } + static const struct sm_conf barrier_states[BARRIER_NR] = { - [BARRIER_START] = { - .name = "start", - .allowed = BITS(BARRIER_PASSED) - |BITS(BARRIER_DONE) - |BITS(BARRIER_FAIL), - .flags = SM_INITIAL, - }, - [BARRIER_PASSED] = { - .name = "passed", - .allowed = BITS(BARRIER_DONE) - |BITS(BARRIER_FAIL), - }, - [BARRIER_DONE] = { - .name = "done", - .flags = SM_FINAL, - }, - [BARRIER_FAIL] = { - .name = "fail", - .flags = SM_FINAL|SM_FAILURE, - }, + S(START, A(PASSED)|A(DONE)|A(FAIL), SM_INITIAL), + S(PASSED, A(DONE)|A(FAIL), 0), + S(DONE, 0, SM_FINAL), + S(FAIL, 0, SM_FINAL|SM_FAILURE), }; -static bool barrier_invariant(const struct sm *sm, int prev) -{ - (void)sm; - (void)prev; - return true; -} +#undef S +#undef A static void barrier_done(struct barrier *barrier, int status) { @@ -440,65 +435,6 @@ int leader_barrier_v2(struct leader *l, return rv; } -enum { - EXEC_START, - EXEC_BARRIER, - EXEC_STEPPED, - EXEC_POLLED, - EXEC_APPLIED, - EXEC_DONE, - EXEC_FAILED, - EXEC_NR, -}; - -static const struct sm_conf exec_states[EXEC_NR] = { - [EXEC_START] = { - .name = "start", - .allowed = BITS(EXEC_BARRIER) - |BITS(EXEC_FAILED) - |BITS(EXEC_DONE), - .flags = SM_INITIAL, - }, - [EXEC_BARRIER] = { - .name = "barrier", - .allowed = BITS(EXEC_STEPPED) - |BITS(EXEC_FAILED) - |BITS(EXEC_DONE), - }, - [EXEC_STEPPED] = { - .name = "stepped", - .allowed = BITS(EXEC_POLLED) - |BITS(EXEC_FAILED) - |BITS(EXEC_DONE), - }, - [EXEC_POLLED] = { - .name = "polled", - .allowed = BITS(EXEC_APPLIED) - |BITS(EXEC_FAILED) - |BITS(EXEC_DONE), - }, - [EXEC_APPLIED] = { - .name = "applied", - .allowed = BITS(EXEC_FAILED) - |BITS(EXEC_DONE), - }, - [EXEC_DONE] = { - .name = "done", - .flags = SM_FINAL, - }, - [EXEC_FAILED] = { - .name = "failed", - .flags = SM_FAILURE|SM_FINAL, - }, -}; - -static bool exec_invariant(const struct sm *sm, int prev) -{ - (void)sm; - (void)prev; - return true; -} - static void exec_done(struct exec *req, int asyncness) { int status = req->status; @@ -697,8 +633,6 @@ int leader_exec_v2(struct leader *l, req->barrier.data = req; req->barrier.cb = NULL; req->work = (pool_work_t){}; - sm_init(&req->sm, exec_invariant, NULL, exec_states, "exec", - EXEC_START); return exec_async(req, 0); } diff --git a/src/leader.h b/src/leader.h index 939717020..f8ce05790 100644 --- a/src/leader.h +++ b/src/leader.h @@ -70,7 +70,6 @@ struct exec { queue queue; exec_cb cb; pool_work_t work; - struct sm sm; }; /** From 63e0edee277ff934aef342580ca4ab9ad2316a8b Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 13:37:22 -0400 Subject: [PATCH 05/12] Fix handling of error from exec_apply Signed-off-by: Cole Miller --- src/leader.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/leader.c b/src/leader.c index e23c38c65..816dd148f 100644 --- a/src/leader.c +++ b/src/leader.c @@ -584,7 +584,7 @@ int exec_async(struct exec *req, int status) } else if (apply_rv != LEADER_NOT_ASYNC) { req->status = apply_rv; exec_done(req, ret); - ret = 0; + ret = LEADER_NOT_ASYNC; break; } /* else apply_rv == LEADER_NOT_ASYNC => */ ret &= LEADER_NOT_ASYNC; From c7d70ad91efa6c1d02dd5a8d8b5a4651323d99b8 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 10 Oct 2024 15:43:36 -0400 Subject: [PATCH 06/12] Docs Signed-off-by: Cole Miller --- src/leader.c | 48 ++++++++++++++++++++++++++++++++++++++++++++---- src/leader.h | 9 ++++----- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/leader.c b/src/leader.c index 816dd148f..a0d12062c 100644 --- a/src/leader.c +++ b/src/leader.c @@ -546,6 +546,24 @@ static void exec_barrier_cb(struct barrier *barrier, int status) /** * Exec request pseudo-coroutine, encapsulating the whole lifecycle. + * + * Exec processing is a sequence of steps, tracked by the embedded SM, in + * between which we possibly suspend execution. After every suspend, control + * returns to this function, and we jump to the appropriate arm of the switch + * statement based on the SM state. If we never suspend, control remains in + * this function, passing through each state from top to bottom. + * + * When invoked by leader_exec_v2, the return value indicates + * whether we suspended (0), finished without suspending (LEADER_NOT_ASYNC), + * or encountered an error (any other value). When invoked by a callback, + * the `status` argument indicates whether the async operation succeeded + * or failed, and the return value is ignored. + * + * There are some backward-compatibility warts here. In particular, when + * an error occurs, we sometimes signal it by returning an error code + * and sometimes by just setting `req->status` (and returning one of the two + * "success" codes). This is done to preserve exactly how each error was handled in + * the previous exec code. */ int exec_async(struct exec *req, int status) { @@ -553,6 +571,9 @@ int exec_async(struct exec *req, int status) sqlite3_vfs *vfs; int barrier_rv = 0; int apply_rv = 0; + /* Eventual return value of this function. Also tracks whether we + * previously suspended while processing this request (0) or not + * (LEADER_NOT_ASYNC). */ int ret = 0; switch (sm_state(&req->sm)) { @@ -561,30 +582,44 @@ int exec_async(struct exec *req, int status) l = req->leader; PRE(l != NULL); barrier_rv = leader_barrier_v2(l, &req->barrier, exec_barrier_cb); - ret = barrier_rv; if (barrier_rv == 0) { + /* suspended */ + ret = 0; break; } else if (barrier_rv != LEADER_NOT_ASYNC) { - l->exec = NULL; + /* return error to caller, don't invoke callback, + * but set req->status so that that the SM will + * record the failure */ + req->status = barrier_rv; + exec_done(req, LEADER_NOT_ASYNC); + ret = barrier_rv; break; } /* else barrier_rv == LEADER_NOT_ASYNC => */ + ret = LEADER_NOT_ASYNC; sm_move(&req->sm, EXEC_BARRIER); POST(status == 0); /* fallthrough */ case EXEC_BARRIER: if (status != 0) { + /* error, we must have suspended, so invoke the callback */ + PRE(ret == 0); req->status = status; - exec_done(req, ret); + exec_done(req, 0); break; } apply_rv = exec_apply(req); if (apply_rv == 0) { + /* suspended */ ret = 0; break; } else if (apply_rv != LEADER_NOT_ASYNC) { + /* error, record it in `req->status` and either invoke + * the callback (if we suspended) or tell the caller to + * invoke it (otherwise---it would be more consistent + * to return the error code in this case, but for the + * sake of compatibility we do it this way instead) */ req->status = apply_rv; exec_done(req, ret); - ret = LEADER_NOT_ASYNC; break; } /* else apply_rv == LEADER_NOT_ASYNC => */ ret &= LEADER_NOT_ASYNC; @@ -596,6 +631,10 @@ int exec_async(struct exec *req, int status) PRE(l != NULL); vfs = sqlite3_vfs_find(l->db->config->name); PRE(vfs != NULL); + /* apply_rv == 0 if and only if we suspended at the previous step, + * if and only if the transaction generated frames---this logic is + * copied carefully from the previous version of the code */ + PRE(apply_rv == 0 || apply_rv == LEADER_NOT_ASYNC); if (apply_rv == 0) { if (status == 0) { leaderMaybeCheckpointLegacy(l); @@ -606,6 +645,7 @@ int exec_async(struct exec *req, int status) l->inflight = NULL; l->db->tx_id = 0; } + /* finished successfully */ exec_done(req, ret); break; default: diff --git a/src/leader.h b/src/leader.h index f8ce05790..723c7e1f3 100644 --- a/src/leader.h +++ b/src/leader.h @@ -108,11 +108,10 @@ int leader_barrier_v2(struct leader *l, * - When replicating the transaction in raft. This is skipped if the * statement doesn't generate any changed pages. * - * If both of these yields are skipped, this function returns LEADER_NOT_ASYNC - * and does not invoke the callback. In this case the caller must examine - * `req->status` to determine whether the exec was successful. Otherwise, - * this function returns 0 if it successfully scheduled the callback and - * yielded, or any other value to indicate an error. + * This function returns 0 if it successfully suspended for one of these + * async operations. It returns LEADER_NOT_ASYNC to indicate that it + * did not suspend, and in this case `req->status` shows whether an error + * occurred. Any other return value indicates an error. */ int leader_exec_v2(struct leader *l, struct exec *req, From 8ae4d3fee61d84731bb0ee677997c07d2a4913a4 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 13:53:24 -0400 Subject: [PATCH 07/12] Remove remaining traces of old request IDs Signed-off-by: Cole Miller --- src/conn.c | 3 +-- src/conn.h | 2 -- src/gateway.c | 17 ++----------- src/gateway.h | 5 +--- src/id.c | 70 --------------------------------------------------- src/id.h | 46 --------------------------------- src/leader.c | 3 +-- src/leader.h | 1 - src/server.c | 4 --- src/server.h | 2 -- 10 files changed, 5 insertions(+), 148 deletions(-) delete mode 100644 src/id.c delete mode 100644 src/id.h diff --git a/src/conn.c b/src/conn.c index f6adfa55d..905523c26 100644 --- a/src/conn.c +++ b/src/conn.c @@ -296,7 +296,6 @@ int conn__start(struct conn *c, struct raft *raft, struct uv_stream_s *stream, struct raft_uv_transport *uv_transport, - struct id_state seed, conn_close_cb close_cb) { int rv; @@ -311,7 +310,7 @@ int conn__start(struct conn *c, c->transport.data = c; c->uv_transport = uv_transport; c->close_cb = close_cb; - gateway__init(&c->gateway, config, registry, raft, seed); + gateway__init(&c->gateway, config, registry, raft); rv = buffer__init(&c->read); if (rv != 0) { goto err_after_transport_init; diff --git a/src/conn.h b/src/conn.h index 0ae2b1299..e48ef3462 100644 --- a/src/conn.h +++ b/src/conn.h @@ -10,7 +10,6 @@ #include "lib/transport.h" #include "gateway.h" -#include "id.h" #include "message.h" #include "raft.h" @@ -50,7 +49,6 @@ int conn__start(struct conn *c, struct raft *raft, struct uv_stream_s *stream, struct raft_uv_transport *uv_transport, - struct id_state seed, conn_close_cb close_cb); /** diff --git a/src/gateway.c b/src/gateway.c index 3b673b0d1..a20fc5500 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -2,7 +2,6 @@ #include "bind.h" #include "conn.h" -#include "id.h" #include "lib/threadpool.h" #include "protocol.h" #include "query.h" @@ -17,8 +16,7 @@ void gateway__init(struct gateway *g, struct config *config, struct registry *registry, - struct raft *raft, - struct id_state seed) + struct raft *raft) { tracef("gateway init"); g->config = config; @@ -33,7 +31,6 @@ void gateway__init(struct gateway *g, g->barrier.leader = NULL; g->protocol = DQLITE_PROTOCOL_VERSION; g->client_id = 0; - g->random_state = seed; } /* FIXME: This function becomes unsound when using the new thread pool, since @@ -1028,8 +1025,7 @@ struct change { static void raftChangeCb(struct raft_change *change, int status) { - tracef("raft change cb id:%" PRIu64 " status:%d", - idExtract(change->req_id), status); + tracef("raft change cb status:%d", status); struct change *r = change->data; struct gateway *g = r->gateway; struct handle *req = g->req; @@ -1049,7 +1045,6 @@ static int handle_add(struct gateway *g, struct handle *req) tracef("handle add"); struct cursor *cursor = &req->cursor; struct change *r; - uint64_t req_id; int rv; START_V0(add, empty); (void)response; @@ -1062,8 +1057,6 @@ static int handle_add(struct gateway *g, struct handle *req) } r->gateway = g; r->req.data = r; - req_id = idNext(&g->random_state); - idSet(r->req.req_id, req_id); g->req = req; rv = raft_add(g->raft, &r->req, request.id, request.address, @@ -1085,7 +1078,6 @@ static int handle_promote_or_assign(struct gateway *g, struct handle *req) struct cursor *cursor = &req->cursor; struct change *r; uint64_t role = DQLITE_VOTER; - uint64_t req_id; int rv; START_V0(promote_or_assign, empty); (void)response; @@ -1109,8 +1101,6 @@ static int handle_promote_or_assign(struct gateway *g, struct handle *req) } r->gateway = g; r->req.data = r; - req_id = idNext(&g->random_state); - idSet(r->req.req_id, req_id); g->req = req; rv = raft_assign(g->raft, &r->req, request.id, @@ -1131,7 +1121,6 @@ static int handle_remove(struct gateway *g, struct handle *req) tracef("handle remove"); struct cursor *cursor = &req->cursor; struct change *r; - uint64_t req_id; int rv; START_V0(remove, empty); (void)response; @@ -1145,8 +1134,6 @@ static int handle_remove(struct gateway *g, struct handle *req) } r->gateway = g; r->req.data = r; - req_id = idNext(&g->random_state); - idSet(r->req.req_id, req_id); g->req = req; rv = raft_remove(g->raft, &r->req, request.id, raftChangeCb); diff --git a/src/gateway.h b/src/gateway.h index 2a5805084..9a86a9a9f 100644 --- a/src/gateway.h +++ b/src/gateway.h @@ -11,7 +11,6 @@ #include "lib/serialize.h" #include "config.h" -#include "id.h" #include "leader.h" #include "raft.h" #include "registry.h" @@ -34,14 +33,12 @@ struct gateway { struct barrier barrier; /* Barrier for query requests */ uint64_t protocol; /* Protocol format version */ uint64_t client_id; - struct id_state random_state; /* For generating IDs */ }; void gateway__init(struct gateway *g, struct config *config, struct registry *registry, - struct raft *raft, - struct id_state seed); + struct raft *raft); void gateway__close(struct gateway *g); diff --git a/src/id.c b/src/id.c deleted file mode 100644 index 14ed29f4a..000000000 --- a/src/id.c +++ /dev/null @@ -1,70 +0,0 @@ -#include "id.h" - -#include - -/* The PRNG used for generating request IDs is xoshiro256**, developed by - * David Blackman and Sebastiano Vigna and released into the public domain. - * See . */ - -static uint64_t rotl(uint64_t x, int k) -{ - return (x << k) | (x >> (64 - k)); -} - -uint64_t idNext(struct id_state *state) -{ - uint64_t result = rotl(state->data[1] * 5, 7) * 9; - uint64_t t = state->data[1] << 17; - - state->data[2] ^= state->data[0]; - state->data[3] ^= state->data[1]; - state->data[1] ^= state->data[2]; - state->data[0] ^= state->data[3]; - - state->data[2] ^= t; - - state->data[3] = rotl(state->data[3], 45); - - return result; -} - -void idJump(struct id_state *state) -{ - static const uint64_t JUMP[] = {0x180ec6d33cfd0aba, 0xd5a61266f0c9392c, - 0xa9582618e03fc9aa, 0x39abdc4529b1661c}; - - uint64_t s0 = 0; - uint64_t s1 = 0; - uint64_t s2 = 0; - uint64_t s3 = 0; - for (size_t i = 0; i < sizeof(JUMP) / sizeof(*JUMP); i++) { - for (size_t b = 0; b < 64; b++) { - if (JUMP[i] & UINT64_C(1) << b) { - s0 ^= state->data[0]; - s1 ^= state->data[1]; - s2 ^= state->data[2]; - s3 ^= state->data[3]; - } - idNext(state); - } - } - - state->data[0] = s0; - state->data[1] = s1; - state->data[2] = s2; - state->data[3] = s3; -} - -uint64_t idExtract(const uint8_t buf[16]) -{ - uint64_t id; - memcpy(&id, buf, sizeof(id)); - return id; -} - -void idSet(uint8_t buf[16], uint64_t id) -{ - memset(buf, 0, 16); - memcpy(buf, &id, sizeof(id)); - buf[15] = (uint8_t)-1; -} diff --git a/src/id.h b/src/id.h deleted file mode 100644 index fd9c29693..000000000 --- a/src/id.h +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Generate, set, and extract dqlite-generated request IDs. - * - * A fresh ID is generated for each config or exec client request that - * arrives at a gateway. These IDs are passed down into raft via the - * req_id field of RAFT__REQUEST, and are suitable for diagnostic use - * only. - */ - -#ifndef DQLITE_ID_H_ -#define DQLITE_ID_H_ - -#include - -/** - * State used to generate a request ID. - */ -struct id_state -{ - uint64_t data[4]; -}; - -/** - * Generate a request ID, mutating the input state in the process. - */ -uint64_t idNext(struct id_state *state); - -/** - * Cause the given state to yield a different sequence of IDs. - * - * This is used to ensure that the sequences of IDs generated for - * distinct clients are (in practice) disjoint. - */ -void idJump(struct id_state *state); - -/** - * Read a request ID from the req_id field of RAFT__REQUEST. - */ -uint64_t idExtract(const uint8_t buf[16]); - -/** - * Write a request ID to the req_id field of RAFT__REQUEST. - */ -void idSet(uint8_t buf[16], uint64_t id); - -#endif /* DQLITE_ID_H_ */ diff --git a/src/leader.c b/src/leader.c index a0d12062c..a04c72bce 100644 --- a/src/leader.c +++ b/src/leader.c @@ -278,7 +278,7 @@ static int leaderApplyFrames(struct exec *req, unsigned n, raft_apply_cb cb) { - tracef("leader apply frames id:%" PRIu64, req->id); + tracef("leader apply frames"); struct leader *l = req->leader; struct db *db = l->db; struct command_frames c; @@ -310,7 +310,6 @@ static int leaderApplyFrames(struct exec *req, apply->leader = req->leader; apply->req.data = apply; apply->type = COMMAND_FRAMES; - idSet(apply->req.req_id, req->id); rv = raft_apply(l->raft, &apply->req, &buf, 1, cb); if (rv != 0) { diff --git a/src/leader.h b/src/leader.h index 723c7e1f3..5d5356768 100644 --- a/src/leader.h +++ b/src/leader.h @@ -65,7 +65,6 @@ struct exec { struct leader *leader; struct barrier barrier; sqlite3_stmt *stmt; - uint64_t id; int status; queue queue; exec_cb cb; diff --git a/src/server.c b/src/server.c index 26e5bf3a9..f7fad3e00 100644 --- a/src/server.c +++ b/src/server.c @@ -574,7 +574,6 @@ static void listenCb(uv_stream_t *listener, int status) struct dqlite_node *t = listener->data; struct uv_stream_s *stream; struct conn *conn; - struct id_state seed; int rv; if (!t->running) { @@ -644,9 +643,6 @@ static void listenCb(uv_stream_t *listener, int status) #endif } - seed = t->random_state; - idJump(&t->random_state); - conn = sqlite3_malloc(sizeof *conn); if (conn == NULL) { goto err; diff --git a/src/server.h b/src/server.h index 443da461b..85728aa67 100644 --- a/src/server.h +++ b/src/server.h @@ -7,7 +7,6 @@ #include "client/protocol.h" #include "config.h" -#include "id.h" #include "lib/assert.h" #include "lib/threadpool.h" #include "logger.h" @@ -56,7 +55,6 @@ struct dqlite_node { int *); /* Connection function for role management */ void *connect_func_arg; /* User data for connection function */ char errmsg[DQLITE_ERRMSG_BUF_SIZE]; /* Last error occurred */ - struct id_state random_state; /* For seeding ID generation */ }; /* Dynamic array of node info objects. This is the in-memory representation of From 9ff34d6bbeaa4783f279a270acfb4018cd49c13c Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 14:24:53 -0400 Subject: [PATCH 08/12] Address review comments Signed-off-by: Cole Miller --- Makefile.am | 1 - src/gateway.c | 23 ++++++++++++----------- src/leader.c | 23 +++++++++++------------ src/server.c | 10 +--------- test/unit/test_concurrency.c | 3 +-- test/unit/test_conn.c | 3 +-- test/unit/test_gateway.c | 3 +-- 7 files changed, 27 insertions(+), 39 deletions(-) diff --git a/Makefile.am b/Makefile.am index 899eccacb..7802947c5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -41,7 +41,6 @@ basic_dqlite_sources = \ src/format.c \ src/fsm.c \ src/gateway.c \ - src/id.c \ src/leader.c \ src/lib/addr.c \ src/lib/buffer.c \ diff --git a/src/gateway.c b/src/gateway.c index a20fc5500..3a4dbe605 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -518,7 +518,6 @@ static int handle_exec(struct gateway *g, struct handle *req) g->req = req; rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, leader_exec_cb); if (rv == LEADER_NOT_ASYNC) { - /* XXX */ leader_exec_cb(&g->exec, g->exec.status); } else if (rv != 0) { tracef("handle exec leader exec failed %d", rv); @@ -677,8 +676,8 @@ static int handle_query(struct gateway *g, struct handle *req) rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, leaderModifyingQueryCb); if (rv == LEADER_NOT_ASYNC) { - /* XXX */ leaderModifyingQueryCb(&g->exec, g->exec.status); + rv = 0; } } if (rv != 0) { @@ -729,6 +728,13 @@ static void handle_exec_sql_cb(struct exec *exec, int status) } } +/** + * handle_exec_sql_next does the bulk of processing for an EXEC_SQL request. + * A single call to this function iterates over the input SQL text, preparing + * and executing statements until execution of some statement needs to yield + * to the event loop. When that happens, a callback is scheduled that will + * call this function again to process more input. + */ static void handle_exec_sql_next(struct gateway *g, struct handle *req, bool done) @@ -745,19 +751,15 @@ static void handle_exec_sql_next(struct gateway *g, int tuple_format; int rv; + PRE(schema == DQLITE_REQUEST_PARAMS_SCHEMA_V0 || + schema == DQLITE_REQUEST_PARAMS_SCHEMA_V1); tuple_format = schema == DQLITE_REQUEST_PARAMS_SCHEMA_V0 ? TUPLE__PARAMS : - schema == DQLITE_REQUEST_PARAMS_SCHEMA_V1 ? - TUPLE__PARAMS32 : - (POST(false && "impossible"), 0); + TUPLE__PARAMS32; for (;;) { stmt = NULL; sql = req->sql; - if (sql == NULL || strcmp(sql, "") == 0) { - goto success; - } - /* stmt will be set to NULL by sqlite when an error occurs. */ rv = sqlite3_prepare_v2(l->conn, sql, -1, &stmt, &tail); if (rv != SQLITE_OK) { tracef("exec sql prepare failed %d", rv); @@ -765,6 +767,7 @@ static void handle_exec_sql_next(struct gateway *g, goto done; } if (stmt == NULL) { + /* nothing in the string to prepare */ goto success; } if (!done) { @@ -784,7 +787,6 @@ static void handle_exec_sql_next(struct gateway *g, failure(req, rv, sqlite3_errmsg(l->conn)); goto done_after_prepare; } else if (g->exec.status != SQLITE_DONE) { - /* XXX */ failure(req, g->exec.status, sqlite3_errmsg(l->conn)); goto done_after_prepare; } @@ -951,7 +953,6 @@ static void query_sql_bottom_half(struct gateway *g, int status) rv = leader_exec_v2(g->leader, &g->exec, stmt, leaderModifyingQuerySqlCb); if (rv == LEADER_NOT_ASYNC) { - /* XXX */ leaderModifyingQuerySqlCb(&g->exec, g->exec.status); } else if (rv != 0) { sqlite3_finalize(stmt); diff --git a/src/leader.c b/src/leader.c index a04c72bce..5fd5a85f9 100644 --- a/src/leader.c +++ b/src/leader.c @@ -8,7 +8,6 @@ #include "command.h" #include "conn.h" #include "gateway.h" -#include "id.h" #include "leader.h" #include "lib/sm.h" #include "lib/threadpool.h" @@ -380,7 +379,7 @@ static void barrier_done(struct barrier *barrier, int status) static void barrier_raft_cb(struct raft_barrier *, int); -static int barrier_async(struct barrier *barrier, int status) +static int barrier_tick(struct barrier *barrier, int status) { int rv; @@ -411,7 +410,7 @@ static void barrier_raft_cb(struct raft_barrier *rb, int status) return; } sm_move(&barrier->sm, BARRIER_PASSED); - (void)barrier_async(rb->data, status); + (void)barrier_tick(rb->data, status); } int leader_barrier_v2(struct leader *l, @@ -429,7 +428,7 @@ int leader_barrier_v2(struct leader *l, barrier->cb = cb; barrier->leader = l; barrier->req.data = barrier; - rv = barrier_async(barrier, 0); + rv = barrier_tick(barrier, 0); POST(rv != LEADER_NOT_ASYNC); return rv; } @@ -479,15 +478,15 @@ static int exec_apply(struct exec *req) size = VfsDatabaseSize(vfs, db->path, n, db->config->page_size); if (size > VfsDatabaseSizeLimit(vfs)) { rv = SQLITE_FULL; - goto err; + goto finish; } rv = leaderApplyFrames(req, frames, n, exec_apply_cb); if (rv != 0) { - goto err; + goto finish; } -err: +finish: for (i = 0; i < n; i++) { sqlite3_free(frames[i].data); } @@ -498,7 +497,7 @@ static int exec_apply(struct exec *req) return rv; } -static int exec_async(struct exec *, int); +static int exec_tick(struct exec *, int); static void exec_apply_cb(struct raft_apply *req, int status, @@ -523,7 +522,7 @@ static void exec_apply_cb(struct raft_apply *req, } else { raft_free(apply); } - exec_async(exec, status); + exec_tick(exec, status); } static int exec_status(int r) @@ -540,7 +539,7 @@ static void exec_barrier_cb(struct barrier *barrier, int status) struct exec *req = barrier->data; PRE(req != NULL); sm_move(&req->sm, EXEC_BARRIER); - exec_async(req, status); + exec_tick(req, status); } /** @@ -564,7 +563,7 @@ static void exec_barrier_cb(struct barrier *barrier, int status) * "success" codes). This is done to preserve exactly how each error was handled in * the previous exec code. */ -int exec_async(struct exec *req, int status) +int exec_tick(struct exec *req, int status) { struct leader *l; sqlite3_vfs *vfs; @@ -673,5 +672,5 @@ int leader_exec_v2(struct leader *l, req->barrier.cb = NULL; req->work = (pool_work_t){}; - return exec_async(req, 0); + return exec_tick(req, 0); } diff --git a/src/server.c b/src/server.c index f7fad3e00..8b6ab3444 100644 --- a/src/server.c +++ b/src/server.c @@ -12,7 +12,6 @@ #include "client/protocol.h" #include "conn.h" #include "fsm.h" -#include "id.h" #include "lib/addr.h" #include "lib/assert.h" #include "lib/fs.h" @@ -60,8 +59,6 @@ int dqlite__init(struct dqlite_node *d, { int rv; char db_dir_path[1024]; - int urandom; - ssize_t count; d->initialized = false; d->lock_fd = -1; @@ -161,11 +158,6 @@ int dqlite__init(struct dqlite_node *d, d->connect_func = transportDefaultConnect; d->connect_func_arg = NULL; - urandom = open("/dev/urandom", O_RDONLY); - assert(urandom != -1); - count = read(urandom, d->random_state.data, sizeof(uint64_t[4])); - (void)count; - close(urandom); d->initialized = true; return 0; @@ -648,7 +640,7 @@ static void listenCb(uv_stream_t *listener, int status) goto err; } rv = conn__start(conn, &t->config, &t->loop, &t->registry, &t->raft, - stream, &t->raft_transport, seed, destroy_conn); + stream, &t->raft_transport, destroy_conn); if (rv != 0) { goto err_after_conn_alloc; } diff --git a/test/unit/test_concurrency.c b/test/unit/test_concurrency.c index 6008efacc..bf4d66375 100644 --- a/test/unit/test_concurrency.c +++ b/test/unit/test_concurrency.c @@ -48,9 +48,8 @@ struct connection { struct connection *c = &f->connections[i]; \ struct request_open open; \ struct response_db db; \ - struct id_state seed = { { 1 } }; \ gateway__init(&c->gateway, CLUSTER_CONFIG(0), \ - CLUSTER_REGISTRY(0), CLUSTER_RAFT(0), seed); \ + CLUSTER_REGISTRY(0), CLUSTER_RAFT(0)); \ c->handle.data = &c->context; \ rc = buffer__init(&c->request); \ munit_assert_int(rc, ==, 0); \ diff --git a/test/unit/test_conn.c b/test/unit/test_conn.c index ca336ac3a..25bc232f8 100644 --- a/test/unit/test_conn.c +++ b/test/unit/test_conn.c @@ -47,7 +47,6 @@ static void connCloseCb(struct conn *conn) #define SETUP \ struct uv_stream_s *stream; \ - struct id_state seed = { { 1 } }; \ int rv; \ SETUP_HEAP; \ SETUP_SQLITE; \ @@ -67,7 +66,7 @@ static void connCloseCb(struct conn *conn) f->conn_test.closed = false; \ rv = conn__start(&f->conn_test.conn, &f->config, &f->loop, \ &f->registry, &f->raft, stream, &f->raft_transport, \ - seed, connCloseCb); \ + connCloseCb); \ munit_assert_int(rv, ==, 0) #define TEAR_DOWN \ diff --git a/test/unit/test_gateway.c b/test/unit/test_gateway.c index 8539c3ee2..097e2b89a 100644 --- a/test/unit/test_gateway.c +++ b/test/unit/test_gateway.c @@ -52,11 +52,10 @@ struct connection { for (i = 0; i < N_SERVERS; i++) { \ struct connection *c = &f->connections[i]; \ struct config *config; \ - struct id_state seed = { { 1 } }; \ config = CLUSTER_CONFIG(i); \ config->page_size = 512; \ gateway__init(&c->gateway, config, CLUSTER_REGISTRY(i), \ - CLUSTER_RAFT(i), seed); \ + CLUSTER_RAFT(i)); \ c->handle.data = &c->context; \ rc = buffer__init(&c->buf1); \ munit_assert_int(rc, ==, 0); \ From f9c4ca1425a2e84bbde6293bec3133534acd202c Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 14:45:06 -0400 Subject: [PATCH 09/12] No more bottom halves Signed-off-by: Cole Miller --- src/gateway.c | 64 ++++++++++++++++++--------------------------------- 1 file changed, 22 insertions(+), 42 deletions(-) diff --git a/src/gateway.c b/src/gateway.c index 3a4dbe605..af17e3246 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -306,8 +306,10 @@ static int handle_open(struct gateway *g, struct handle *req) return 0; } -static void prepare_bottom_half(struct gateway *g, int status) +static void prepare_barrier_cb(struct barrier *barrier, int status) { + tracef("prepare barrier cb status:%d", status); + struct gateway *g = barrier->data; struct handle *req = g->req; struct response_stmt response_v0 = { 0 }; struct response_stmt_with_offset response_v1 = { 0 }; @@ -376,13 +378,6 @@ static void prepare_bottom_half(struct gateway *g, int status) } } -static void prepare_barrier_cb(struct barrier *barrier, int status) -{ - tracef("prepare barrier cb status:%d", status); - struct gateway *g = barrier->data; - prepare_bottom_half(g, status); -} - static int handle_prepare(struct gateway *g, struct handle *req) { tracef("handle prepare"); @@ -417,7 +412,7 @@ static int handle_prepare(struct gateway *g, struct handle *req) g->req = req; rc = leader_barrier_v2(g->leader, &g->barrier, prepare_barrier_cb); if (rc == LEADER_NOT_ASYNC) { - prepare_bottom_half(g, 0); + prepare_barrier_cb(&g->barrier, 0); } else if (rc != 0) { tracef("handle prepare barrier failed %d", rc); stmt__registry_del(&g->stmts, stmt); @@ -579,8 +574,10 @@ static void query_batch(struct gateway *g) query_batch_async(req, POOL_BOTTOM_HALF); } -static void query_bottom_half(struct gateway *g, int status) +static void query_barrier_cb(struct barrier *barrier, int status) { + tracef("query barrier cb status:%d", status); + struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -597,14 +594,7 @@ static void query_bottom_half(struct gateway *g, int status) query_batch(g); } -static void query_barrier_cb(struct barrier *barrier, int status) -{ - tracef("query barrier cb status:%d", status); - struct gateway *g = barrier->data; - query_bottom_half(g, status); -} - -static void leaderModifyingQueryCb(struct exec *exec, int status) +static void modifying_query_exec_cb(struct exec *exec, int status) { struct gateway *g = exec->data; struct handle *req = g->req; @@ -669,14 +659,14 @@ static int handle_query(struct gateway *g, struct handle *req) if (is_readonly) { rv = leader_barrier_v2(g->leader, &g->barrier, query_barrier_cb); if (rv == LEADER_NOT_ASYNC) { - query_bottom_half(g, 0); + query_barrier_cb(&g->barrier, 0); rv = 0; } } else { rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, - leaderModifyingQueryCb); + modifying_query_exec_cb); if (rv == LEADER_NOT_ASYNC) { - leaderModifyingQueryCb(&g->exec, g->exec.status); + modifying_query_exec_cb(&g->exec, g->exec.status); rv = 0; } } @@ -806,8 +796,10 @@ static void handle_exec_sql_next(struct gateway *g, g->req = NULL; } -static void exec_sql_bottom_half(struct gateway *g, int status) +static void exec_sql_barrier_cb(struct barrier *barrier, int status) { + tracef("exec sql barrier cb status:%d", status); + struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -820,13 +812,6 @@ static void exec_sql_bottom_half(struct gateway *g, int status) handle_exec_sql_next(g, req, false); } -static void exec_sql_barrier_cb(struct barrier *barrier, int status) -{ - tracef("exec sql barrier cb status:%d", status); - struct gateway *g = barrier->data; - exec_sql_bottom_half(g, status); -} - static int handle_exec_sql(struct gateway *g, struct handle *req) { tracef("handle exec sql schema:%" PRIu8, req->schema); @@ -856,7 +841,7 @@ static int handle_exec_sql(struct gateway *g, struct handle *req) g->req = req; rc = leader_barrier_v2(g->leader, &g->barrier, exec_sql_barrier_cb); if (rc == LEADER_NOT_ASYNC) { - exec_sql_bottom_half(g, 0); + exec_sql_barrier_cb(&g->barrier, 0); } else if (rc != 0) { tracef("handle exec sql barrier failed %d", rc); g->req = NULL; @@ -865,7 +850,7 @@ static int handle_exec_sql(struct gateway *g, struct handle *req) return 0; } -static void leaderModifyingQuerySqlCb(struct exec *exec, int status) +static void modifying_query_sql_exec_cb(struct exec *exec, int status) { struct gateway *g = exec->data; struct handle *req = g->req; @@ -884,8 +869,10 @@ static void leaderModifyingQuerySqlCb(struct exec *exec, int status) } } -static void query_sql_bottom_half(struct gateway *g, int status) +static void query_sql_barrier_cb(struct barrier *barrier, int status) { + tracef("query sql barrier cb status:%d", status); + struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -951,9 +938,9 @@ static void query_sql_bottom_half(struct gateway *g, int status) query_batch(g); } else { rv = leader_exec_v2(g->leader, &g->exec, stmt, - leaderModifyingQuerySqlCb); + modifying_query_sql_exec_cb); if (rv == LEADER_NOT_ASYNC) { - leaderModifyingQuerySqlCb(&g->exec, g->exec.status); + modifying_query_sql_exec_cb(&g->exec, g->exec.status); } else if (rv != 0) { sqlite3_finalize(stmt); g->req = NULL; @@ -962,13 +949,6 @@ static void query_sql_bottom_half(struct gateway *g, int status) } } -static void query_sql_barrier_cb(struct barrier *barrier, int status) -{ - tracef("query sql barrier cb status:%d", status); - struct gateway *g = barrier->data; - query_sql_bottom_half(g, status); -} - static int handle_query_sql(struct gateway *g, struct handle *req) { tracef("handle query sql schema:%" PRIu8, req->schema); @@ -995,7 +975,7 @@ static int handle_query_sql(struct gateway *g, struct handle *req) g->req = req; rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb); if (rv == LEADER_NOT_ASYNC) { - query_sql_bottom_half(g, 0); + query_sql_barrier_cb(&g->barrier, 0); } else if (rv != 0) { tracef("handle query sql barrier failed %d", rv); g->req = NULL; From 1f3ac633e18da2258ebb598187ff770837988af3 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 15:02:10 -0400 Subject: [PATCH 10/12] Tighten up semicolons test Signed-off-by: Cole Miller --- test/integration/test_client.c | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/test/integration/test_client.c b/test/integration/test_client.c index c2a1cfec2..9537b6b69 100644 --- a/test/integration/test_client.c +++ b/test/integration/test_client.c @@ -152,22 +152,38 @@ TEST(client, semicolons, setUp, tearDown, 0, NULL) struct fixture *f = data; (void)params; - static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);"; + static const char create_sql[] = "CREATE TABLE IF NOT EXISTS test (n INT);"; + static const char insert_sql[] = "INSERT INTO test (n) VALUES (17);"; size_t n = 1000; - size_t unit = sizeof(trivial_stmt) - 1; - char *sql = munit_malloc(n * unit); + size_t create_len = sizeof(create_sql) - 1; + size_t insert_len = sizeof(insert_sql) - 1; + size_t len = n * create_len + insert_len + 1; + char *sql = munit_malloc(len); char *p = sql; for (size_t i = 0; i < n; i++) { - memcpy(p, trivial_stmt, unit); - p += unit; + memcpy(p, create_sql, create_len); + p += create_len; } - sql[n * unit - 1] = '\0'; + memcpy(p, insert_sql, insert_len); + p += insert_len; + munit_assert_ptr(p, ==, sql + len - 1); + *p = '\0'; uint64_t last_insert_id; uint64_t rows_affected; EXEC_SQL(sql, &last_insert_id, &rows_affected); - free(sql); + + /* Check that all the statements were executed. */ + struct row *row; + QUERY_SQL("SELECT n FROM test", &f->rows); + munit_assert_uint(f->rows.column_count, ==, 1); + munit_assert_string_equal(f->rows.column_names[0], "n"); + row = f->rows.next; + munit_assert_int(row->values[0].type, ==, SQLITE_INTEGER); + munit_assert_int64(row->values[0].integer, ==, 17); + munit_assert_ptr_null(row->next); + return MUNIT_OK; } From de2758d98d8305cdbbe9db5147599575226ed110 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 15:04:57 -0400 Subject: [PATCH 11/12] Bump up the number of statements Testing again locally revealed that (contrary to what I remembered) 1000 isn't enough to cause a stack overflow with the old implementation on my machine. Signed-off-by: Cole Miller --- test/integration/test_client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/test_client.c b/test/integration/test_client.c index 9537b6b69..86bb5e712 100644 --- a/test/integration/test_client.c +++ b/test/integration/test_client.c @@ -155,7 +155,7 @@ TEST(client, semicolons, setUp, tearDown, 0, NULL) static const char create_sql[] = "CREATE TABLE IF NOT EXISTS test (n INT);"; static const char insert_sql[] = "INSERT INTO test (n) VALUES (17);"; - size_t n = 1000; + size_t n = 10000; size_t create_len = sizeof(create_sql) - 1; size_t insert_len = sizeof(insert_sql) - 1; size_t len = n * create_len + insert_len + 1; From 06992a6aabdf0ec80291b5dc84d26c28701a85d3 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 11 Oct 2024 15:17:02 -0400 Subject: [PATCH 12/12] Remove confused FIXME I don't remember why I thought there was a problem here originally, but investigation with printf shows that the second flush is happening as intended. Signed-off-by: Cole Miller --- test/unit/test_gateway.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/unit/test_gateway.c b/test/unit/test_gateway.c index 097e2b89a..c5d0d0c02 100644 --- a/test/unit/test_gateway.c +++ b/test/unit/test_gateway.c @@ -920,8 +920,6 @@ TEST_CASE(exec, close_while_in_flight, NULL) /* Trigger a second page cache flush to the WAL, and abort before it's * done. */ - /* FIXME(cole) it seems that this may no longer be successfully triggering - * the page cache flush */ EXEC_SQL_SUBMIT("INSERT INTO test(n) VALUES(1)"); return MUNIT_OK; }