Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt-in querying of non-leaders #709

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 111 additions & 58 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
g->client_id = 0;
}

/* FIXME: This function becomes unsound when using the new thread pool, since
* the request callbacks will race with operations running in the pool. */
void gateway__leader_close(struct gateway *g, int reason)
{
if (g == NULL || g->leader == NULL) {
tracef("gateway:%p or gateway->leader are NULL", g);
return;
}

/* If the client has opted into reading potentially stale data, don't
* shut down the connection unnecessarily when we lost leadership. */
if (g->leader->readonly && reason == RAFT_LEADERSHIPLOST) {
return;
}

if (g->req != NULL) {
if (g->leader->inflight != NULL) {
tracef("finish inflight apply request");
Expand Down Expand Up @@ -123,12 +127,6 @@
} \
}

#define CHECK_LEADER(REQ) \
if (raft_state(g->raft) != RAFT_LEADER) { \
failure(REQ, SQLITE_IOERR_NOT_LEADER, "not leader"); \
return 0; \
}

#define SUCCESS(LOWER, UPPER, RESP, SCHEMA) \
{ \
size_t _n = response_##LOWER##__sizeof(&RESP); \
Expand All @@ -146,15 +144,6 @@
* using schema version 0. */
#define SUCCESS_V0(LOWER, UPPER) SUCCESS(LOWER, UPPER, response, 0)

/* Lookup the database with the given ID.
*
* TODO: support more than one database per connection? */
#define LOOKUP_DB(ID) \
if (ID != 0 || g->leader == NULL) { \
failure(req, SQLITE_NOTFOUND, "no database opened"); \
return 0; \
}

/* Lookup the statement with the given ID. */
#define LOOKUP_STMT(ID) \
stmt = stmt__registry_get(&g->stmts, ID); \
Expand Down Expand Up @@ -188,6 +177,7 @@
/* Encode fa failure response and invoke the request callback */
static void failure(struct handle *req, int code, const char *message)
{
tracef("failure %s", message);
struct response_failure failure;
size_t n;
char *cursor;
Expand All @@ -203,6 +193,22 @@
req->cb(req, 0, DQLITE_RESPONSE_FAILURE, 0);
}

static struct leader *gw_get_leader(const struct gateway *g,
uint64_t db_id,
struct handle *req)
{
struct leader *l = g->leader;

if (db_id != 0 || l == NULL) {
failure(req, SQLITE_NOTFOUND, "no database opened");
return NULL;

Check warning on line 204 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L203-L204

Added lines #L203 - L204 were not covered by tests
} else if (!l->readonly && raft_state(g->raft) != RAFT_LEADER) {
failure(req, SQLITE_IOERR_NOT_LEADER, "not leader");
return NULL;
}
return l;
}

static void emptyRows(struct handle *req)
{
char *cursor = buffer__advance(req->buffer, 8 + 8);
Expand Down Expand Up @@ -294,7 +300,7 @@
tracef("malloc failed");
return DQLITE_NOMEM;
}
rc = leader__init(g->leader, db, g->raft);
rc = leader_init(g->leader, db, request.flags, g->raft);
if (rc != 0) {
tracef("leader init failed %d", rc);
sqlite3_free(g->leader);
Expand Down Expand Up @@ -384,6 +390,7 @@
struct cursor *cursor = &req->cursor;
struct stmt *stmt;
struct request_prepare request = { 0 };
struct leader *l;
int rc;

if (req->schema != DQLITE_PREPARE_STMT_SCHEMA_V0 &&
Expand All @@ -396,8 +403,11 @@
return rc;
}

CHECK_LEADER(req);
LOOKUP_DB(request.db_id);
l = gw_get_leader(g, request.db_id, req);
if (l == NULL) {
return 0;
}

rc = stmt__registry_add(&g->stmts, &stmt);
if (rc != 0) {
tracef("handle prepare registry add failed %d", rc);
Expand All @@ -410,7 +420,12 @@
req->stmt_id = stmt->id;
req->sql = request.sql;
g->req = req;
rc = leader_barrier_v2(g->leader, &g->barrier, prepare_barrier_cb);
if (l->readonly) {
g->barrier.data = g;
rc = LEADER_NOT_ASYNC;
} else {
rc = leader_barrier_v2(l, &g->barrier, prepare_barrier_cb);
}
if (rc == LEADER_NOT_ASYNC) {
prepare_barrier_cb(&g->barrier, 0);
} else if (rc != 0) {
Expand Down Expand Up @@ -477,6 +492,7 @@
struct stmt *stmt;
struct request_exec request = { 0 };
int tuple_format;
struct leader *l;
int rv;

switch (req->schema) {
Expand All @@ -499,8 +515,14 @@
return rv;
}

CHECK_LEADER(req);
LOOKUP_DB(request.db_id);
l = gw_get_leader(g, request.db_id, req);
if (l == NULL) {
return 0;
} else if (l->readonly) {
failure(req, SQLITE_READONLY, "dqlite connection is readonly");
return 0;
}

LOOKUP_STMT(request.stmt_id);
FAIL_IF_CHECKPOINTING;
rv = bind__params(stmt->stmt, cursor, tuple_format);
Expand Down Expand Up @@ -619,7 +641,7 @@
struct stmt *stmt;
struct request_query request = { 0 };
int tuple_format;
bool is_readonly;
struct leader *l;
int rv;

switch (req->schema) {
Expand All @@ -642,49 +664,64 @@
return rv;
}

CHECK_LEADER(req);
LOOKUP_DB(request.db_id);
l = gw_get_leader(g, request.db_id, req);
if (l == NULL) {
return 0;
}
LOOKUP_STMT(request.stmt_id);
FAIL_IF_CHECKPOINTING;

rv = bind__params(stmt->stmt, cursor, tuple_format);
if (rv != 0) {
tracef("handle query bind failed %d", rv);
failure(req, rv, "bind parameters");
return 0;
}

req->stmt_id = stmt->id;
g->req = req;

is_readonly = (bool)sqlite3_stmt_readonly(stmt->stmt);
if (is_readonly) {
rv = leader_barrier_v2(g->leader, &g->barrier, query_barrier_cb);
if (rv == LEADER_NOT_ASYNC) {
query_barrier_cb(&g->barrier, 0);
rv = 0;
if (!sqlite3_stmt_readonly(stmt->stmt)) {
if (l->readonly) {
failure(req, SQLITE_READONLY, "dqlite connection is readonly");
return 0;
}
} else {
rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt,
modifying_query_exec_cb);
rv = leader_exec_v2(l, &g->exec, stmt->stmt, modifying_query_exec_cb);
if (rv == LEADER_NOT_ASYNC) {
modifying_query_exec_cb(&g->exec, g->exec.status);
rv = 0;
}
return 0;
}
if (rv != 0) {

if (l->readonly) {
g->barrier.data = g;
rv = LEADER_NOT_ASYNC;
} else {
rv = leader_barrier_v2(l, &g->barrier, query_barrier_cb);
}
if (rv == LEADER_NOT_ASYNC) {
query_barrier_cb(&g->barrier, 0);
} else if (rv != 0) {
g->req = NULL;
return rv;
}

return 0;
}

static int handle_finalize(struct gateway *g, struct handle *req)
{
tracef("handle finalize");
struct cursor *cursor = &req->cursor;
struct leader *l;
struct stmt *stmt;
int rv;

START_V0(finalize, empty);
LOOKUP_DB(request.db_id);
l = gw_get_leader(g, request.db_id, req);
if (l == NULL) {
return 0;
}
LOOKUP_STMT(request.stmt_id);
rv = stmt__registry_del(&g->stmts, stmt);
if (rv != 0) {
Expand Down Expand Up @@ -817,6 +854,7 @@
tracef("handle exec sql schema:%" PRIu8, req->schema);
struct cursor *cursor = &req->cursor;
struct request_exec_sql request = { 0 };
struct leader *l;
int rc;

/* Fail early if the schema version isn't recognized, even though we
Expand All @@ -833,13 +871,19 @@
return rc;
}

CHECK_LEADER(req);
LOOKUP_DB(request.db_id);
l = gw_get_leader(g, request.db_id, req);
if (l == NULL) {
return 0;
} else if (l->readonly) {
failure(req, SQLITE_READONLY, "dqlite connection is readonly");
return 0;
}

FAIL_IF_CHECKPOINTING;
req->sql = request.sql;
req->exec_count = 0;
g->req = req;
rc = leader_barrier_v2(g->leader, &g->barrier, exec_sql_barrier_cb);
rc = leader_barrier_v2(l, &g->barrier, exec_sql_barrier_cb);
if (rc == LEADER_NOT_ASYNC) {
exec_sql_barrier_cb(&g->barrier, 0);
} else if (rc != 0) {
Expand Down Expand Up @@ -882,7 +926,6 @@
const char *tail;
sqlite3_stmt *tail_stmt;
int tuple_format;
bool is_readonly;
int rv;

if (status != 0) {
Expand Down Expand Up @@ -910,7 +953,6 @@
failure(req, SQLITE_ERROR, "nonempty statement tail");
return;
}

switch (req->schema) {
case DQLITE_REQUEST_PARAMS_SCHEMA_V0:
tuple_format = TUPLE__PARAMS;
Expand All @@ -933,10 +975,11 @@
req->stmt = stmt;
g->req = req;

is_readonly = (bool)sqlite3_stmt_readonly(stmt);
if (is_readonly) {
query_batch(g);
} else {
if (!sqlite3_stmt_readonly(stmt)) {
if (g->leader->readonly) {
failure(req, SQLITE_READONLY, "dqlite connection is readonly");
return;
}
rv = leader_exec_v2(g->leader, &g->exec, stmt,
modifying_query_sql_exec_cb);
if (rv == LEADER_NOT_ASYNC) {
Expand All @@ -946,14 +989,18 @@
g->req = NULL;
failure(req, rv, "leader exec");
}
return;
}

query_batch(g);
}

static int handle_query_sql(struct gateway *g, struct handle *req)
{
tracef("handle query sql schema:%" PRIu8, req->schema);
struct cursor *cursor = &req->cursor;
struct request_query_sql request = { 0 };
struct leader *l;
int rv;

/* Fail early if the schema version isn't recognized. */
Expand All @@ -968,12 +1015,22 @@
return rv;
}

CHECK_LEADER(req);
LOOKUP_DB(request.db_id);
l = gw_get_leader(g, request.db_id, req);
if (l == NULL) {
return 0;
}

FAIL_IF_CHECKPOINTING;

req->sql = request.sql;
g->req = req;
rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb);

if (l->readonly) {
g->barrier.data = g;
rv = LEADER_NOT_ASYNC;
} else {
rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb);
}
if (rv == LEADER_NOT_ASYNC) {
query_sql_barrier_cb(&g->barrier, 0);
} else if (rv != 0) {
Expand Down Expand Up @@ -1027,11 +1084,10 @@
struct cursor *cursor = &req->cursor;
struct change *r;
int rv;

START_V0(add, empty);
(void)response;

CHECK_LEADER(req);

r = sqlite3_malloc(sizeof *r);
if (r == NULL) {
return DQLITE_NOMEM;
Expand Down Expand Up @@ -1060,11 +1116,10 @@
struct change *r;
uint64_t role = DQLITE_VOTER;
int rv;

START_V0(promote_or_assign, empty);
(void)response;

CHECK_LEADER(req);

/* Detect if this is an assign role request, instead of the former
* promote request. */
if (cursor->cap > 0) {
Expand Down Expand Up @@ -1103,11 +1158,10 @@
struct cursor *cursor = &req->cursor;
struct change *r;
int rv;

START_V0(remove, empty);
(void)response;

CHECK_LEADER(req);

r = sqlite3_malloc(sizeof *r);
if (r == NULL) {
tracef("malloc failed");
Expand Down Expand Up @@ -1357,11 +1411,10 @@
struct cursor *cursor = &req->cursor;
struct raft_transfer *r;
int rv;

START_V0(transfer, empty);
(void)response;

CHECK_LEADER(req);

r = sqlite3_malloc(sizeof *r);
if (r == NULL) {
tracef("malloc failed");
Expand Down
Loading
Loading