diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 5e422655bf..01600d9931 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -83,7 +83,7 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin( self.worker - .execute(query, args, self.row_channel_size, persistent) + .execute(query, args, self.row_channel_size, persistent, None) .map_ok(flume::Receiver::into_stream) .try_flatten_stream() .map( @@ -107,7 +107,7 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin(async move { let stream = self .worker - .execute(query, args, self.row_channel_size, persistent) + .execute(query, args, self.row_channel_size, persistent, Some(1)) .map_ok(flume::Receiver::into_stream) .await?; futures_util::pin_mut!(stream); diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index ebc2908c88..541a4f7d4d 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -32,7 +32,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin( self.worker - .execute(sql, arguments, self.row_channel_size, persistent) + .execute(sql, arguments, self.row_channel_size, persistent, None) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(), ) @@ -58,7 +58,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin(async move { let stream = self .worker - .execute(sql, arguments, self.row_channel_size, persistent) + .execute(sql, arguments, self.row_channel_size, persistent, Some(1)) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(); diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 18e34aae86..a01de2419c 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -52,6 +52,7 @@ enum Command { arguments: Option>, persistent: bool, tx: flume::Sender, Error>>, + limit: Option, }, Begin { tx: rendezvous_oneshot::Sender>, @@ -136,6 +137,7 @@ impl ConnectionWorker { arguments, persistent, tx, + limit } => { let iter = match execute::iter(&mut conn, &query, arguments, persistent) { @@ -146,10 +148,34 @@ impl ConnectionWorker { } }; - for res in iter { - if tx.send(res).is_err() { - break; - } + match limit { + None => { + for res in iter { + if tx.send(res).is_err() { + break; + } + } + }, + Some(limit) => { + let mut iter = iter; + let mut rows_returned = 0; + + while let Some(res) = iter.next() { + if let Ok(ok) = &res { + if ok.is_right() { + rows_returned += 1; + if rows_returned >= limit { + drop(iter); + let _ = tx.send(res); + break; + } + } + } + if tx.send(res).is_err() { + break; + } + } + }, } update_cached_statements_size(&conn, &shared.cached_statements_size); @@ -284,6 +310,7 @@ impl ConnectionWorker { args: Option>, chan_size: usize, persistent: bool, + limit: Option, ) -> Result, Error>>, Error> { let (tx, rx) = flume::bounded(chan_size); @@ -294,6 +321,7 @@ impl ConnectionWorker { arguments: args.map(SqliteArguments::into_static), persistent, tx, + limit, }, Span::current(), ))