Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shut down statement worker in Sqlite Connection::close #1453

Merged
merged 4 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions sqlx-core/src/sqlite/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ impl Connection for SqliteConnection {

type Options = SqliteConnectOptions;

fn close(self) -> BoxFuture<'static, Result<(), Error>> {
// nothing explicit to do; connection will close in drop
Box::pin(future::ok(()))
fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
let shutdown = self.worker.shutdown();
// Drop the statement worker and any outstanding statements, which should
// cover all references to the connection handle outside of the worker thread
drop(self);
// Ensure the worker thread has terminated
shutdown.await
})
}

fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Expand Down
31 changes: 31 additions & 0 deletions sqlx-core/src/sqlite/statement/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ enum StatementWorkerCommand {
statement: Weak<StatementHandle>,
tx: oneshot::Sender<()>,
},
Shutdown {
tx: oneshot::Sender<()>,
},
}

impl StatementWorker {
Expand Down Expand Up @@ -72,6 +75,13 @@ impl StatementWorker {
let _ = tx.send(());
}
}
StatementWorkerCommand::Shutdown { tx } => {
// drop the connection reference before sending confirmation
// and ending the command loop
drop(conn);
let _ = tx.send(());
return;
}
}
}

Expand Down Expand Up @@ -127,4 +137,25 @@ impl StatementWorker {
rx.await.map_err(|_| Error::WorkerCrashed)
}
}

/// Send a command to the worker to shut down the processing thread.
///
/// A `WorkerCrashed` error may be returned if the thread has already stopped.
/// Subsequent calls to `step()`, `reset()`, or this method will fail with
/// `WorkerCrashed`. Ensure that any associated statements are dropped first.
pub(crate) fn shutdown(&mut self) -> impl Future<Output = Result<(), Error>> {
let (tx, rx) = oneshot::channel();

let send_res = self
.tx
.send(StatementWorkerCommand::Shutdown { tx })
.map_err(|_| Error::WorkerCrashed);

async move {
send_res?;

// wait for the response
rx.await.map_err(|_| Error::WorkerCrashed)
}
}
}
6 changes: 4 additions & 2 deletions tests/sqlite/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ async fn it_executes_with_pool() -> anyhow::Result<()> {
async fn it_opens_in_memory() -> anyhow::Result<()> {
// If the filename is ":memory:", then a private, temporary in-memory database
// is created for the connection.
let _ = SqliteConnection::connect(":memory:").await?;
let conn = SqliteConnection::connect(":memory:").await?;
conn.close().await?;

Ok(())
}
Expand All @@ -215,7 +216,8 @@ async fn it_opens_in_memory() -> anyhow::Result<()> {
async fn it_opens_temp_on_disk() -> anyhow::Result<()> {
// If the filename is an empty string, then a private, temporary on-disk database will
// be created.
let _ = SqliteConnection::connect("").await?;
let conn = SqliteConnection::connect("").await?;
conn.close().await?;

Ok(())
}
Expand Down