Skip to content

Commit

Permalink
PostgreSQL Bugfix: Ensure connection is usable after failed COPY insi…
Browse files Browse the repository at this point in the history
…de a transaction (launchbadge#3138)

* Include test case for regular subtransactions

While using COPY and subtransactions I kept running into errors.
This test case documents that error, it currently fails with:

    Error: encountered unexpected or invalid data: expecting ParseComplete but received CommandComplete

* PostgreSQL Copy: Consume ReadyForQuery on error

When a COPY statement was in error inside a subtransaction,
a Protocol Error used to be raised. By consuming the ReadyForQuery
message when there is an error, we no longer have this issue.
  • Loading branch information
feikesteenbergen authored and jayy-lmao committed Jun 6, 2024
1 parent cd4042c commit 0b62ac1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
21 changes: 13 additions & 8 deletions sqlx-postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,16 +342,21 @@ async fn pg_begin_copy_out<'c, C: DerefMut<Target = PgConnection> + Send + 'c>(

let stream: TryAsyncStream<'c, Bytes> = try_stream! {
loop {
let msg = conn.stream.recv().await?;
match msg.format {
MessageFormat::CopyData => r#yield!(msg.decode::<CopyData<Bytes>>()?.0),
MessageFormat::CopyDone => {
let _ = msg.decode::<CopyDone>()?;
conn.stream.recv_expect(MessageFormat::CommandComplete).await?;
match conn.stream.recv().await {
Err(e) => {
conn.stream.recv_expect(MessageFormat::ReadyForQuery).await?;
return Ok(())
return Err(e);
},
_ => return Err(err_protocol!("unexpected message format during copy out: {:?}", msg.format))
Ok(msg) => match msg.format {
MessageFormat::CopyData => r#yield!(msg.decode::<CopyData<Bytes>>()?.0),
MessageFormat::CopyDone => {
let _ = msg.decode::<CopyDone>()?;
conn.stream.recv_expect(MessageFormat::CommandComplete).await?;
conn.stream.recv_expect(MessageFormat::ReadyForQuery).await?;
return Ok(())
},
_ => return Err(err_protocol!("unexpected message format during copy out: {:?}", msg.format))
}
}
}
};
Expand Down
66 changes: 65 additions & 1 deletion tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};

use sqlx::postgres::types::Oid;
use sqlx::postgres::{
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgListener,
PgPoolOptions, PgRow, PgSeverity, Postgres,
};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_core::bytes::Bytes;
use sqlx_test::{new, pool, setup_if_needed};
use std::env;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -382,6 +385,67 @@ async fn it_can_query_all_scalar() -> anyhow::Result<()> {
Ok(())
}

#[sqlx_macros::test]
async fn copy_can_work_with_failed_transactions() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;

// We're using a (local) statement_timeout to simulate a runtime failure, as opposed to
// a parse/plan failure.
let mut tx = conn.begin().await?;
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
.bind("statement_timeout")
.bind("1ms")
.execute(tx.as_mut())
.await?;

let mut copy_out: Pin<
Box<dyn Stream<Item = Result<Bytes, sqlx::Error>> + Send>,
> = (&mut tx)
.copy_out_raw("COPY (SELECT nspname FROM pg_catalog.pg_namespace WHERE pg_sleep(0.001) IS NULL) TO STDOUT")
.await?;

while copy_out.try_next().await.is_ok() {}
drop(copy_out);

tx.rollback().await?;

// conn should be usable again, as we explictly rolled back the transaction
let got: i32 = sqlx::query_scalar("SELECT 1")
.fetch_one(conn.as_mut())
.await?;
assert_eq!(1, got);

Ok(())
}

#[sqlx_macros::test]
async fn it_can_work_with_failed_transactions() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;

// We're using a (local) statement_timeout to simulate a runtime failure, as opposed to
// a parse/plan failure.
let mut tx = conn.begin().await?;
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
.bind("statement_timeout")
.bind("1ms")
.execute(tx.as_mut())
.await?;

assert!(sqlx::query("SELECT 1 WHERE pg_sleep(0.30) IS NULL")
.fetch_one(tx.as_mut())
.await
.is_err());
tx.rollback().await?;

// conn should be usable again, as we explictly rolled back the transaction
let got: i32 = sqlx::query_scalar("SELECT 1")
.fetch_one(conn.as_mut())
.await?;
assert_eq!(1, got);

Ok(())
}

#[sqlx_macros::test]
async fn it_can_work_with_transactions() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
Expand Down

0 comments on commit 0b62ac1

Please sign in to comment.