diff --git a/sqlx-core/src/query_as.rs b/sqlx-core/src/query_as.rs index 03a23c2d1d..7a15150cbc 100644 --- a/sqlx-core/src/query_as.rs +++ b/sqlx-core/src/query_as.rs @@ -110,18 +110,14 @@ where O: 'e, A: 'e, { - Box::pin(try_stream! { - let mut s = executor.fetch_many(self.inner); - - while let Some(v) = s.try_next().await? { - r#yield!(match v { - Either::Left(v) => Either::Left(v), - Either::Right(row) => Either::Right(O::from_row(&row)?), - }); - } - - Ok(()) - }) + executor + .fetch_many(self.inner) + .map(|v| match v { + Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right), + Ok(Either::Left(v)) => Ok(Either::Left(v)), + Err(e) => Err(e), + }) + .boxed() } /// Execute the query and return all the generated results, collected into a [`Vec`]. diff --git a/tests/any/any.rs b/tests/any/any.rs index 8624413e2f..85d165e2a4 100644 --- a/tests/any/any.rs +++ b/tests/any/any.rs @@ -38,6 +38,35 @@ async fn it_executes_with_pool() -> anyhow::Result<()> { Ok(()) } +#[sqlx_macros::test] +async fn it_does_not_stop_stream_after_decoding_error() -> anyhow::Result<()> { + use futures::stream::StreamExt; + // see https://github.com/launchbadge/sqlx/issues/1884 + let pool = sqlx_test::pool::().await?; + + #[derive(Debug, PartialEq)] + struct MyType; + impl<'a> sqlx::FromRow<'a, AnyRow> for MyType { + fn from_row(row: &'a AnyRow) -> sqlx::Result { + let n = row.try_get::(0)?; + if n == 1 { + Err(sqlx::Error::RowNotFound) + } else { + Ok(MyType) + } + } + } + + let rows = sqlx::query_as("SELECT 0 UNION ALL SELECT 1 UNION ALL SELECT 2") + .fetch(&pool) + .map(|r| r.ok()) + .collect::>() + .await; + + assert_eq!(rows, vec![Some(MyType), None, Some(MyType)]); + Ok(()) +} + #[sqlx_macros::test] async fn it_gets_by_name() -> anyhow::Result<()> { let mut conn = new::().await?;