Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Merge pull request #560 from MathieuBordere/interrupt" #570

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
#include "tracing.h"
#include "transport.h"

#include <stdlib.h>

/* Initialize the given buffer for reading, ensure it has the given size. */
static int init_read(struct conn *c, uv_buf_t *buf, size_t size)
{
Expand All @@ -20,46 +18,34 @@ static int init_read(struct conn *c, uv_buf_t *buf, size_t size)
}

static int read_message(struct conn *c);
static void conn_write_cb(uv_write_t *req, int status)
static void conn_write_cb(struct transport *transport, int status)
{
struct transport *t = req->data;
assert(t != NULL);
struct conn *c = t->data;
assert(c != NULL);
struct conn *c = transport->data;
bool finished;
int rv;
if (status != 0) {
tracef("write cb status %d", status);
goto abort;
}
if (c->closed) {
tracef("connection closing");
goto abort;
}

buffer__reset(&c->write);
buffer__advance(&c->write, message__sizeof(&c->response)); /* Header */

rv = gateway__resume(&c->gateway, &finished);
tracef("request finished: %d", finished);
if (rv != 0) {
goto abort;
}

/* Start reading the next message if we're not doing that already. */
if (c->reading_message) {
free(req);
if (!finished) {
return;
}

/* Start reading the next request */
rv = read_message(c);
if (rv != 0) {
goto abort;
}

free(req);
return;
abort:
free(req);
conn__stop(c);
}

Expand Down Expand Up @@ -205,7 +191,6 @@ static void read_message_cb(struct transport *transport, int status)
struct cursor cursor;
int rv;

c->reading_message = false;
if (status != 0) {
// errorf(c->logger, "read error");
tracef("read error %d", status);
Expand Down Expand Up @@ -237,7 +222,6 @@ static int read_message(struct conn *c)
tracef("init read failed %d", rv);
return rv;
}
c->reading_message = true;
rv = transport__read(&c->transport, &buf, read_message_cb);
if (rv != 0) {
tracef("transport read failed %d", rv);
Expand Down Expand Up @@ -336,7 +320,6 @@ int conn__start(struct conn *c,
}
c->handle.data = c;
c->closed = false;
c->reading_message = false;
/* First, we expect the client to send us the protocol version. */
rv = read_protocol(c);
if (rv != 0) {
Expand Down
1 change: 0 additions & 1 deletion src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ struct conn
struct message response; /* Response message meta data */
struct handle handle;
bool closed;
bool reading_message; /* Conn is waiting for a message */
queue queue;
};

Expand Down
14 changes: 10 additions & 4 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -1394,8 +1394,11 @@
goto handle;
}

/* Request in progress, the only time we allow interleaving requests is
* when the second request tries to interrupt a query yielding rows. */
/* Request in progress. TODO The current implementation doesn't allow
* reading a new request while a query is yielding rows, in that case
* gateway__resume in write_cb will indicate it has not finished
* returning results and a new request (in this case, the interrupt)
* will not be read. */
if (g->req->type == DQLITE_REQUEST_QUERY &&
type == DQLITE_REQUEST_INTERRUPT) {
goto handle;
Expand All @@ -1407,8 +1410,9 @@
}

/* Receiving a request when one is ongoing on the same connection
* is an error, unless it's an interrupt request. The connection will be
* stopped due to the non-0 return value. */
* is a hard error. The connection will be stopped due to the non-0
* return code in case asserts are off. */
assert(false);

Check warning on line 1415 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L1415

Added line #L1415 was not covered by tests
return SQLITE_BUSY;

handle:
Expand Down Expand Up @@ -1441,9 +1445,11 @@
{
if (g->req == NULL || (g->req->type != DQLITE_REQUEST_QUERY &&
g->req->type != DQLITE_REQUEST_QUERY_SQL)) {
tracef("gateway resume - finished");
*finished = true;
return 0;
}
tracef("gateway resume - not finished");
*finished = false;
query_batch(g);
return 0;
Expand Down
26 changes: 17 additions & 9 deletions src/lib/transport.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include <raft.h>

#include <stdlib.h>

#include "../../include/dqlite.h"

#include "assert.h"
Expand Down Expand Up @@ -124,7 +122,9 @@ int transport__init(struct transport *t, struct uv_stream_s *stream)
t->stream->data = t;
t->read.base = NULL;
t->read.len = 0;
t->write.data = t;
t->read_cb = NULL;
t->write_cb = NULL;
t->close_cb = NULL;

return 0;
Expand Down Expand Up @@ -161,15 +161,23 @@ int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb)
return 0;
}

int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb)
static void write_cb(uv_write_t *req, int status)
{
struct transport *t = req->data;
transport_write_cb cb = t->write_cb;

assert(cb != NULL);
t->write_cb = NULL;

cb(t, status);
}

int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb)
{
int rv;
uv_write_t *req = malloc(sizeof(*req));
if (req == NULL) {
return DQLITE_NOMEM;
}
req->data = t;
rv = uv_write(req, t->stream, buf, 1, cb);
assert(t->write_cb == NULL);
t->write_cb = cb;
rv = uv_write(&t->write, t->stream, buf, 1, write_cb);
if (rv != 0) {
return rv;
}
Expand Down
7 changes: 4 additions & 3 deletions src/lib/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
struct transport;
typedef void (*transport_read_cb)(struct transport *t, int status);
typedef void (*transport_write_cb)(struct transport *t, int status);
typedef void (*transport_close_cb)(struct transport *t);

/**
Expand All @@ -27,6 +28,7 @@ struct transport
uv_buf_t read; /* Read buffer */
uv_write_t write; /* Write request */
transport_read_cb read_cb; /* Read callback */
transport_write_cb write_cb; /* Write callback */
transport_close_cb close_cb; /* Close callback */
};

Expand All @@ -47,10 +49,9 @@ void transport__close(struct transport *t, transport_close_cb cb);
int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb);

/**
* Write the given buffer to the transport. The @cb gains ownership of
* uv_write_t and must `free` it at its own convenience.
* Write the given buffer to the transport.
*/
int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb);
int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb);

/* Create an UV stream object from the given fd. */
int transport__stream(struct uv_loop_s *loop,
Expand Down
72 changes: 0 additions & 72 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,46 +123,6 @@ TEST(client, query, setUp, tearDown, 0, client_params)
return MUNIT_OK;
}

TEST(client, queryReuseStmtIdAferInterrupt, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
uint32_t stmt_id;
uint64_t last_insert_id;
uint64_t rows_affected;
unsigned i;
struct rows rows;
(void)params;
PREPARE("CREATE TABLE test (n INT)", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("BEGIN", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id);
for (i = 0; i < 4098; i++) {
EXEC(stmt_id, &last_insert_id, &rows_affected);
}

PREPARE("COMMIT", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

/* More than 1 response buffer will be needed to return all the rows, so
* we are able to interrupt the query. */
PREPARE("SELECT * FROM test", &stmt_id);
bool done = true;
QUERY_DONE(stmt_id, &rows, &done);
munit_assert_false(done);

clientSendInterrupt(f->client, NULL);
clientCloseRows(&rows);

/* Ensure stmt_id is still valid after interrupt. */
QUERY(stmt_id, &rows);

clientCloseRows(&rows);
return MUNIT_OK;
}

TEST(client, querySql, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
Expand All @@ -189,35 +149,3 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

TEST(client, querySqlInterrupt, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
uint32_t stmt_id;
uint64_t last_insert_id;
uint64_t rows_affected;
unsigned i;
struct rows rows;
bool done = true;
(void)params;
EXEC_SQL("CREATE TABLE test (n INT)", &last_insert_id, &rows_affected);

EXEC_SQL("BEGIN", &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id);
for (i = 0; i < 4098; i++) {
EXEC(stmt_id, &last_insert_id, &rows_affected);
}

EXEC_SQL("COMMIT", &last_insert_id, &rows_affected);

/* More than 1 response buffer will be needed to return all the rows, so
* we are able to interrupt the query. */
QUERY_SQL_DONE("SELECT * FROM test", &rows, &done);
munit_assert_false(done);

clientSendInterrupt(f->client, NULL);
clientCloseRows(&rows);

return MUNIT_OK;
}
16 changes: 5 additions & 11 deletions test/lib/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,23 @@
munit_assert_int(rv_, ==, 0); \
}

/* Perform a query, DONE is a pointer to a bool that will be true when the query
* is done. */
#define QUERY_DONE(STMT_ID, ROWS, DONE) \
/* Perform a query. */
#define QUERY(STMT_ID, ROWS) \
{ \
int rv_; \
rv_ = clientSendQuery(f->client, STMT_ID, NULL, 0, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \
rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \
munit_assert_int(rv_, ==, 0); \
}

/* Perform a query. */
#define QUERY(STMT_ID, ROWS) QUERY_DONE(STMT_ID, ROWS, NULL)

#define QUERY_SQL_DONE(SQL, ROWS, DONE) \
#define QUERY_SQL(SQL, ROWS) \
{ \
int rv_; \
rv_ = clientSendQuerySQL(f->client, SQL, NULL, 0, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \
rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \
munit_assert_int(rv_, ==, 0); \
}

#define QUERY_SQL(SQL, ROWS) QUERY_SQL_DONE(SQL, ROWS, NULL)

#endif /* TEST_CLIENT_H */
7 changes: 2 additions & 5 deletions test/unit/lib/test_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@ static void read_cb(struct transport *transport, int status)
f->read.status = status;
}

static void write_cb(uv_write_t *req, int status)
static void write_cb(struct transport *transport, int status)
{
struct transport *t = req->data;
munit_assert_ptr_not_null(t);
struct fixture *f = t->data;
struct fixture *f = transport->data;
f->write.invoked = true;
f->write.status = status;
free(req);
}

static void *setup(const MunitParameter params[], void *user_data)
Expand Down
Loading