Skip to content

Commit

Permalink
sqlite: fix inconsistent read-after-write (#3354)
Browse files Browse the repository at this point in the history
* sqlite: fix inconsistent read-after-write

fetch_one/fetch_optional

* try pushing fetch_optional early-return into worker

* run cargo fmt

* fix "it_can_execute_multiple_statements" test failure

* use Option<usize> instead of bespoke enum for rows returned
  • Loading branch information
ckampfe authored Aug 1, 2024
1 parent 572e2a4 commit ff0252d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
4 changes: 2 additions & 2 deletions sqlx-sqlite/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl AnyConnectionBackend for SqliteConnection {

Box::pin(
self.worker
.execute(query, args, self.row_channel_size, persistent)
.execute(query, args, self.row_channel_size, persistent, None)
.map_ok(flume::Receiver::into_stream)
.try_flatten_stream()
.map(
Expand All @@ -107,7 +107,7 @@ impl AnyConnectionBackend for SqliteConnection {
Box::pin(async move {
let stream = self
.worker
.execute(query, args, self.row_channel_size, persistent)
.execute(query, args, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream)
.await?;
futures_util::pin_mut!(stream);
Expand Down
4 changes: 2 additions & 2 deletions sqlx-sqlite/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {

Box::pin(
self.worker
.execute(sql, arguments, self.row_channel_size, persistent)
.execute(sql, arguments, self.row_channel_size, persistent, None)
.map_ok(flume::Receiver::into_stream)
.try_flatten_stream(),
)
Expand All @@ -58,7 +58,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
Box::pin(async move {
let stream = self
.worker
.execute(sql, arguments, self.row_channel_size, persistent)
.execute(sql, arguments, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream)
.try_flatten_stream();

Expand Down
36 changes: 32 additions & 4 deletions sqlx-sqlite/src/connection/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum Command {
arguments: Option<SqliteArguments<'static>>,
persistent: bool,
tx: flume::Sender<Result<Either<SqliteQueryResult, SqliteRow>, Error>>,
limit: Option<usize>,
},
Begin {
tx: rendezvous_oneshot::Sender<Result<(), Error>>,
Expand Down Expand Up @@ -136,6 +137,7 @@ impl ConnectionWorker {
arguments,
persistent,
tx,
limit
} => {
let iter = match execute::iter(&mut conn, &query, arguments, persistent)
{
Expand All @@ -146,10 +148,34 @@ impl ConnectionWorker {
}
};

for res in iter {
if tx.send(res).is_err() {
break;
}
match limit {
None => {
for res in iter {
if tx.send(res).is_err() {
break;
}
}
},
Some(limit) => {
let mut iter = iter;
let mut rows_returned = 0;

while let Some(res) = iter.next() {
if let Ok(ok) = &res {
if ok.is_right() {
rows_returned += 1;
if rows_returned >= limit {
drop(iter);
let _ = tx.send(res);
break;
}
}
}
if tx.send(res).is_err() {
break;
}
}
},
}

update_cached_statements_size(&conn, &shared.cached_statements_size);
Expand Down Expand Up @@ -284,6 +310,7 @@ impl ConnectionWorker {
args: Option<SqliteArguments<'_>>,
chan_size: usize,
persistent: bool,
limit: Option<usize>,
) -> Result<flume::Receiver<Result<Either<SqliteQueryResult, SqliteRow>, Error>>, Error> {
let (tx, rx) = flume::bounded(chan_size);

Expand All @@ -294,6 +321,7 @@ impl ConnectionWorker {
arguments: args.map(SqliteArguments::into_static),
persistent,
tx,
limit,
},
Span::current(),
))
Expand Down

0 comments on commit ff0252d

Please sign in to comment.