Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor leader.c to fix stack growth in handle_exec_sql #697

Merged
merged 13 commits into from
Oct 15, 2024
195 changes: 114 additions & 81 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,8 @@
return 0;
}

static void prepareBarrierCb(struct barrier *barrier, int status)
static void prepare_bottom_half(struct gateway *g, int status)
{
tracef("prepare barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
struct response_stmt response_v0 = { 0 };
struct response_stmt_with_offset response_v1 = { 0 };
Expand Down Expand Up @@ -381,6 +379,13 @@
}
}

static void prepare_barrier_cb(struct barrier *barrier, int status)
{
tracef("prepare barrier cb status:%d", status);
struct gateway *g = barrier->data;
prepare_bottom_half(g, status);
}

static int handle_prepare(struct gateway *g, struct handle *req)
{
tracef("handle prepare");
Expand Down Expand Up @@ -413,8 +418,10 @@
req->stmt_id = stmt->id;
req->sql = request.sql;
g->req = req;
rc = leader__barrier(g->leader, &g->barrier, prepareBarrierCb);
if (rc != 0) {
rc = leader_barrier_v2(g->leader, &g->barrier, prepare_barrier_cb);
if (rc == LEADER_NOT_ASYNC) {
prepare_bottom_half(g, 0);
} else if (rc != 0) {
tracef("handle prepare barrier failed %d", rc);
stmt__registry_del(&g->stmts, stmt);
g->req = NULL;
Expand Down Expand Up @@ -478,7 +485,6 @@
struct stmt *stmt;
struct request_exec request = { 0 };
int tuple_format;
uint64_t req_id;
int rv;

switch (req->schema) {
Expand Down Expand Up @@ -513,10 +519,11 @@
}
req->stmt_id = stmt->id;
g->req = req;
req_id = idNext(&g->random_state);
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id,
leader_exec_cb);
if (rv != 0) {
rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, leader_exec_cb);
if (rv == LEADER_NOT_ASYNC) {
/* XXX */
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
leader_exec_cb(&g->exec, g->exec.status);
} else if (rv != 0) {
tracef("handle exec leader exec failed %d", rv);
g->req = NULL;
return rv;
Expand Down Expand Up @@ -576,10 +583,8 @@
query_batch_async(req, POOL_BOTTOM_HALF);
}

static void query_barrier_cb(struct barrier *barrier, int status)
static void query_bottom_half(struct gateway *g, int status)
{
tracef("query barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
Expand All @@ -596,6 +601,13 @@
query_batch(g);
}

static void query_barrier_cb(struct barrier *barrier, int status)

Check warning on line 604 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L604

Added line #L604 was not covered by tests
{
tracef("query barrier cb status:%d", status);
struct gateway *g = barrier->data;
query_bottom_half(g, status);

Check warning on line 608 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L607-L608

Added lines #L607 - L608 were not covered by tests
}

static void leaderModifyingQueryCb(struct exec *exec, int status)
{
struct gateway *g = exec->data;
Expand All @@ -622,7 +634,6 @@
struct request_query request = { 0 };
int tuple_format;
bool is_readonly;
uint64_t req_id;
int rv;

switch (req->schema) {
Expand Down Expand Up @@ -660,11 +671,18 @@

is_readonly = (bool)sqlite3_stmt_readonly(stmt->stmt);
if (is_readonly) {
rv = leader__barrier(g->leader, &g->barrier, query_barrier_cb);
rv = leader_barrier_v2(g->leader, &g->barrier, query_barrier_cb);
if (rv == LEADER_NOT_ASYNC) {
query_bottom_half(g, 0);
rv = 0;
}
} else {
req_id = idNext(&g->random_state);
rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id,
leaderModifyingQueryCb);
rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt,
leaderModifyingQueryCb);
if (rv == LEADER_NOT_ASYNC) {
/* XXX */
leaderModifyingQueryCb(&g->exec, g->exec.status);

Check warning on line 684 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L684

Added line #L684 was not covered by tests
}
}
if (rv != 0) {
g->req = NULL;
Expand Down Expand Up @@ -719,66 +737,66 @@
bool done)
{
tracef("handle exec sql next");
struct leader *l = g->leader;
PRE(l != NULL);
struct cursor *cursor = &req->cursor;
struct response_result response = { 0 };
sqlite3_stmt *stmt = NULL;
int schema = req->schema;
struct response_result response = {};
sqlite3_stmt *stmt;
const char *sql;
const char *tail;
int tuple_format;
uint64_t req_id;
int rv;

if (req->sql == NULL || strcmp(req->sql, "") == 0) {
goto success;
}

/* stmt will be set to NULL by sqlite when an error occurs. */
assert(g->leader != NULL);
rv = sqlite3_prepare_v2(g->leader->conn, req->sql, -1, &stmt, &tail);
if (rv != SQLITE_OK) {
tracef("exec sql prepare failed %d", rv);
failure(req, rv, sqlite3_errmsg(g->leader->conn));
goto done;
}

if (stmt == NULL) {
goto success;
}

if (!done) {
switch (req->schema) {
case DQLITE_REQUEST_PARAMS_SCHEMA_V0:
tuple_format = TUPLE__PARAMS;
break;
case DQLITE_REQUEST_PARAMS_SCHEMA_V1:
tuple_format = TUPLE__PARAMS32;
break;
default:
/* Should have been caught by handle_exec_sql */
assert(0);
tuple_format = schema == DQLITE_REQUEST_PARAMS_SCHEMA_V0 ?
TUPLE__PARAMS :
schema == DQLITE_REQUEST_PARAMS_SCHEMA_V1 ?
TUPLE__PARAMS32 :
(POST(false && "impossible"), 0);

Check warning on line 755 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L755

Added line #L755 was not covered by tests

for (;;) {
stmt = NULL;
sql = req->sql;
if (sql == NULL || strcmp(sql, "") == 0) {
goto success;
}
rv = bind__params(stmt, cursor, tuple_format);
/* stmt will be set to NULL by sqlite when an error occurs. */
rv = sqlite3_prepare_v2(l->conn, sql, -1, &stmt, &tail);
if (rv != SQLITE_OK) {
failure(req, rv, "bind parameters");
goto done_after_prepare;
tracef("exec sql prepare failed %d", rv);
failure(req, rv, sqlite3_errmsg(l->conn));
goto done;
}
if (stmt == NULL) {
goto success;
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
}
if (!done) {
rv = bind__params(stmt, cursor, tuple_format);
if (rv != SQLITE_OK) {
failure(req, rv, "bind parameters");
goto done_after_prepare;

Check warning on line 777 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L776-L777

Added lines #L776 - L777 were not covered by tests
}
}
}

req->sql = tail;
g->req = req;

req_id = idNext(&g->random_state);
/* At this point, leader__exec takes ownership of stmt */
rv =
leader__exec(g->leader, &g->exec, stmt, req_id, handle_exec_sql_cb);
if (rv != SQLITE_OK) {
failure(req, rv, sqlite3_errmsg(g->leader->conn));
goto done_after_prepare;
req->sql = tail;
g->req = req;
rv = leader_exec_v2(g->leader, &g->exec, stmt, handle_exec_sql_cb);
if (rv == 0) {
return;
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
} else if (rv != LEADER_NOT_ASYNC) {
failure(req, rv, sqlite3_errmsg(l->conn));
goto done_after_prepare;

Check warning on line 788 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L787-L788

Added lines #L787 - L788 were not covered by tests
} else if (g->exec.status != SQLITE_DONE) {
/* XXX */
failure(req, g->exec.status, sqlite3_errmsg(l->conn));
goto done_after_prepare;
}
done = true;
sqlite3_finalize(stmt);
req->exec_count++;
}

return;

success:
tracef("handle exec sql next success");
if (req->exec_count > 0) {
fill_result(g, &response);
}
Expand All @@ -789,10 +807,8 @@
g->req = NULL;
}

static void execSqlBarrierCb(struct barrier *barrier, int status)
static void exec_sql_bottom_half(struct gateway *g, int status)
{
tracef("exec sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
Expand All @@ -805,6 +821,13 @@
handle_exec_sql_next(g, req, false);
}

static void exec_sql_barrier_cb(struct barrier *barrier, int status)
{
tracef("exec sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
exec_sql_bottom_half(g, status);
}

static int handle_exec_sql(struct gateway *g, struct handle *req)
{
tracef("handle exec sql schema:%" PRIu8, req->schema);
Expand Down Expand Up @@ -832,8 +855,10 @@
req->sql = request.sql;
req->exec_count = 0;
g->req = req;
rc = leader__barrier(g->leader, &g->barrier, execSqlBarrierCb);
if (rc != 0) {
rc = leader_barrier_v2(g->leader, &g->barrier, exec_sql_barrier_cb);
if (rc == LEADER_NOT_ASYNC) {
exec_sql_bottom_half(g, 0);
} else if (rc != 0) {
tracef("handle exec sql barrier failed %d", rc);
g->req = NULL;
return rc;
Expand All @@ -860,10 +885,8 @@
}
}

static void querySqlBarrierCb(struct barrier *barrier, int status)
static void query_sql_bottom_half(struct gateway *g, int status)
{
tracef("query sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
Expand All @@ -874,7 +897,6 @@
sqlite3_stmt *tail_stmt;
int tuple_format;
bool is_readonly;
uint64_t req_id;
int rv;

if (status != 0) {
Expand Down Expand Up @@ -929,17 +951,26 @@
if (is_readonly) {
query_batch(g);
} else {
req_id = idNext(&g->random_state);
rv = leader__exec(g->leader, &g->exec, stmt, req_id,
leaderModifyingQuerySqlCb);
if (rv != 0) {
rv = leader_exec_v2(g->leader, &g->exec, stmt,
leaderModifyingQuerySqlCb);
if (rv == LEADER_NOT_ASYNC) {
/* XXX */
leaderModifyingQuerySqlCb(&g->exec, g->exec.status);

Check warning on line 958 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L958

Added line #L958 was not covered by tests
} else if (rv != 0) {
sqlite3_finalize(stmt);
g->req = NULL;
failure(req, rv, "leader exec");
}
}
}

static void query_sql_barrier_cb(struct barrier *barrier, int status)
{
tracef("query sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
query_sql_bottom_half(g, status);
}

static int handle_query_sql(struct gateway *g, struct handle *req)
{
tracef("handle query sql schema:%" PRIu8, req->schema);
Expand All @@ -964,8 +995,10 @@
FAIL_IF_CHECKPOINTING;
req->sql = request.sql;
g->req = req;
rv = leader__barrier(g->leader, &g->barrier, querySqlBarrierCb);
if (rv != 0) {
rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb);
if (rv == LEADER_NOT_ASYNC) {
query_sql_bottom_half(g, 0);
} else if (rv != 0) {
tracef("handle query sql barrier failed %d", rv);
g->req = NULL;
return rv;
Expand Down
Loading
Loading