Skip to content

Commit

Permalink
Include test case for regular subtransactions
Browse files Browse the repository at this point in the history
While using COPY and subtransactions I kept running into errors.
This test case documents that error, it currently fails with:

    Error: encountered unexpected or invalid data: expecting ParseComplete but received CommandComplete
  • Loading branch information
feikesteenbergen committed Mar 21, 2024
1 parent e0a1f16 commit c6fc47f
Showing 1 changed file with 98 additions and 1 deletion.
99 changes: 98 additions & 1 deletion tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};

use sqlx::postgres::types::Oid;
use sqlx::postgres::{
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgListener,
PgPoolOptions, PgRow, PgSeverity, Postgres,
};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_core::bytes::Bytes;
use sqlx_test::{new, pool, setup_if_needed};
use std::env;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -382,6 +385,100 @@ async fn it_can_query_all_scalar() -> anyhow::Result<()> {
Ok(())
}

#[sqlx_macros::test]
async fn copy_can_work_with_failed_subtransactions() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;


// We're using a (local) statement_timeout to simulate a runtime failure, as opposed to
// a parse/plan failure.
let mut tx = conn.begin().await?;
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
.bind("statement_timeout")
.bind("1ms")
.execute(tx.as_mut())
.await?;

let mut sub_tx = tx.begin().await?;

let mut copy_out: Pin<
Box<dyn Stream<Item = Result<Bytes, sqlx::Error>> + Send>,
> = (&mut sub_tx)
.copy_out_raw("COPY (SELECT nspname FROM pg_catalog.pg_namespace WHERE pg_sleep(0.001) IS NULL) TO STDOUT")
.await?;

while copy_out.try_next().await.is_ok() {}
drop(copy_out);

sub_tx.rollback().await?;

// tx should be usable again, as we explictly rolled back the sub transaction
let got: i64 = sqlx::query_scalar("SELECT 1")
.fetch_one(tx.as_mut())
.await?;
assert_eq!(1, got);

Ok(())
}

#[sqlx_macros::test]
async fn it_can_work_with_failed_subtransactions() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;

let mut tx = conn.begin().await?;

// We're using a (local) statement_timeout to simulate a runtime failure, as opposed to
// a parse/plan failure.
const KEY: &str = "statement_timeout";
const OUTER: &str = "10ms";
const INNER: &str = "1ms";
// By setting a local configuration parameter, we should test that we only
// rollback the subtx, not the outer tx.
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
.bind(KEY)
.bind(OUTER)
.execute(tx.as_mut())
.await?;
let got: String = sqlx::query_scalar("SELECT current_setting($1)")
.bind(KEY)
.fetch_one(tx.as_mut())
.await?;

assert_eq!(OUTER, got.as_str());

let mut sub_tx = tx.begin().await?;
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
.bind(KEY)
.bind(INNER)
.execute(sub_tx.as_mut())
.await?;

let got: String = sqlx::query_scalar("SELECT current_setting($1)")
.bind(KEY)
.fetch_one(sub_tx.as_mut())
.await?;

assert_eq!(INNER, got.as_str());

let a = sqlx::query("SELECT pg_sleep(0.02)")
.fetch_one(sub_tx.as_mut())
.await;
assert!(a.is_err());
// sub tx is unusable currently
let a = sqlx::query("SELECT 1").fetch_one(sub_tx.as_mut()).await;
assert!(a.is_err());
sub_tx.rollback().await?;

let got: String = sqlx::query_scalar("SELECT current_setting($1)")
.bind(KEY)
.fetch_one(tx.as_mut())
.await?;
assert_eq!(OUTER, got.as_str());
tx.rollback().await?;

Ok(())
}

#[sqlx_macros::test]
async fn it_can_work_with_transactions() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
Expand Down

0 comments on commit c6fc47f

Please sign in to comment.