From c88785302044c75f0fa21d617c4fc76e950a8794 Mon Sep 17 00:00:00 2001 From: Feike Steenbergen Date: Thu, 21 Mar 2024 08:04:43 +0100 Subject: [PATCH] Include test case for regular subtransactions While using COPY and subtransactions I kept running into errors. This test case documents that error, it currently fails with: Error: encountered unexpected or invalid data: expecting ParseComplete but received CommandComplete --- tests/postgres/postgres.rs | 98 +++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index 91728bde8d..ce615ac704 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -1,12 +1,15 @@ -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; + use sqlx::postgres::types::Oid; use sqlx::postgres::{ PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgListener, PgPoolOptions, PgRow, PgSeverity, Postgres, }; use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo}; +use sqlx_core::bytes::Bytes; use sqlx_test::{new, pool, setup_if_needed}; use std::env; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -382,6 +385,99 @@ async fn it_can_query_all_scalar() -> anyhow::Result<()> { Ok(()) } +#[sqlx_macros::test] +async fn copy_can_work_with_failed_subtransactions() -> anyhow::Result<()> { + let mut conn = new::().await?; + + // We're using a (local) statement_timeout to simulate a runtime failure, as opposed to + // a parse/plan failure. + let mut tx = conn.begin().await?; + let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)") + .bind("statement_timeout") + .bind("1ms") + .execute(tx.as_mut()) + .await?; + + let mut sub_tx = tx.begin().await?; + + let mut copy_out: Pin< + Box> + Send>, + > = (&mut sub_tx) + .copy_out_raw("COPY (SELECT nspname FROM pg_catalog.pg_namespace WHERE pg_sleep(0.001) IS NULL) TO STDOUT") + .await?; + + while copy_out.try_next().await.is_ok() {} + drop(copy_out); + + sub_tx.rollback().await?; + + // tx should be usable again, as we explictly rolled back the sub transaction + let got: i64 = sqlx::query_scalar("SELECT 1") + .fetch_one(tx.as_mut()) + .await?; + assert_eq!(1, got); + + Ok(()) +} + +#[sqlx_macros::test] +async fn it_can_work_with_failed_subtransactions() -> anyhow::Result<()> { + let mut conn = new::().await?; + + let mut tx = conn.begin().await?; + + // We're using a (local) statement_timeout to simulate a runtime failure, as opposed to + // a parse/plan failure. + const KEY: &str = "statement_timeout"; + const OUTER: &str = "10ms"; + const INNER: &str = "1ms"; + // By setting a local configuration parameter, we should test that we only + // rollback the subtx, not the outer tx. + let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)") + .bind(KEY) + .bind(OUTER) + .execute(tx.as_mut()) + .await?; + let got: String = sqlx::query_scalar("SELECT current_setting($1)") + .bind(KEY) + .fetch_one(tx.as_mut()) + .await?; + + assert_eq!(OUTER, got.as_str()); + + let mut sub_tx = tx.begin().await?; + let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)") + .bind(KEY) + .bind(INNER) + .execute(sub_tx.as_mut()) + .await?; + + let got: String = sqlx::query_scalar("SELECT current_setting($1)") + .bind(KEY) + .fetch_one(sub_tx.as_mut()) + .await?; + + assert_eq!(INNER, got.as_str()); + + let a = sqlx::query("SELECT pg_sleep(0.02)") + .fetch_one(sub_tx.as_mut()) + .await; + assert!(a.is_err()); + // sub tx is unusable currently + let a = sqlx::query("SELECT 1").fetch_one(sub_tx.as_mut()).await; + assert!(a.is_err()); + sub_tx.rollback().await?; + + let got: String = sqlx::query_scalar("SELECT current_setting($1)") + .bind(KEY) + .fetch_one(tx.as_mut()) + .await?; + assert_eq!(OUTER, got.as_str()); + tx.rollback().await?; + + Ok(()) +} + #[sqlx_macros::test] async fn it_can_work_with_transactions() -> anyhow::Result<()> { let mut conn = new::().await?;