From b014ca60edf1e70de3482ce1312dbbb8851faa19 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sun, 20 Oct 2024 15:22:07 +0800 Subject: [PATCH] feat(adapter/kv): support async iterating on scan results --- core/Cargo.lock | 45 ++++++++++++++++ core/Cargo.toml | 5 +- core/src/raw/adapters/kv/api.rs | 13 ++++- core/src/raw/adapters/kv/backend.rs | 62 +++++++++++++++++----- core/src/raw/adapters/kv/mod.rs | 1 + core/src/services/atomicserver/backend.rs | 2 + core/src/services/cacache/backend.rs | 2 + core/src/services/cloudflare_kv/backend.rs | 15 +++++- core/src/services/d1/backend.rs | 2 + core/src/services/etcd/backend.rs | 11 ++-- core/src/services/foundationdb/backend.rs | 2 + core/src/services/gridfs/backend.rs | 2 + core/src/services/libsql/backend.rs | 2 + core/src/services/memcached/backend.rs | 2 + core/src/services/mongodb/backend.rs | 2 + core/src/services/mysql/backend.rs | 2 + core/src/services/nebula_graph/backend.rs | 10 ++-- core/src/services/persy/backend.rs | 2 + core/src/services/postgresql/backend.rs | 2 + core/src/services/redb/backend.rs | 2 + core/src/services/redis/backend.rs | 2 + core/src/services/rocksdb/backend.rs | 12 +++-- core/src/services/sled/backend.rs | 12 +++-- core/src/services/sqlite/backend.rs | 55 +++++++++++++++---- core/src/services/surrealdb/backend.rs | 2 + core/src/services/tikv/backend.rs | 2 + 26 files changed, 232 insertions(+), 39 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 800575eb3dd9..9ff51b5632ba 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -80,6 +80,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "aligned-array" version = "1.0.1" @@ -5036,6 +5042,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "ouroboros", "percent-encoding", "persy", "pretty_assertions", @@ -5368,6 +5375,31 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "ouroboros" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944fa20996a25aded6b4795c6d63f10014a7a83f8be9828a11860b08c5fc4a67" +dependencies = [ + "aliasable", + "ouroboros_macro", + "static_assertions", +] + +[[package]] +name = "ouroboros_macro" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39b0deead1528fd0e5947a8546a9642a9777c25f6e1e26f34c97b204bbb465bd" +dependencies = [ + "heck 0.4.1", + "itertools 0.12.1", + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.79", +] + [[package]] name = "outref" version = "0.5.1" @@ -5908,6 +5940,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "version_check", + "yansi", +] + [[package]] name = "procfs" version = "0.16.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index bb01c7a7d1f3..ae341a83370b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -194,7 +194,7 @@ services-s3 = [ services-seafile = [] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] services-sled = ["dep:sled", "internal-tokio-rt"] -services-sqlite = ["dep:sqlx", "sqlx?/sqlite"] +services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"] services-supabase = [] services-surrealdb = ["dep:surrealdb"] services-swift = [] @@ -277,6 +277,9 @@ sqlx = { version = "0.8.0", features = [ # For http based services. reqsign = { version = "0.16", default-features = false, optional = true } +# for self-referencing structs +ouroboros = { version = "0.18.4", optional = true } + # for services-atomic-server atomic_lib = { version = "0.39.0", optional = true } # for services-cacache diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs index acf449d58bdc..7d1adb390e14 100644 --- a/core/src/raw/adapters/kv/api.rs +++ b/core/src/raw/adapters/kv/api.rs @@ -18,17 +18,28 @@ use std::fmt::Debug; use std::future::ready; +use futures::stream::Empty; use futures::Future; +use futures::Stream; use crate::raw::*; use crate::Capability; use crate::Scheme; use crate::*; +/// A noop placeholder for Adapter::ScanIter +pub type EmptyScanIter = Empty>; + /// KvAdapter is the adapter to underlying kv services. /// /// By implement this trait, any kv service can work as an OpenDAL Service. pub trait Adapter: Send + Sync + Debug + Unpin + 'static { + /// async iterator type for Adapter::scan() + /// + /// TODO: consider to replace it with std::async_iter::AsyncIterator after stablized + /// TODO: use default associate type `= EmptyScanIter` after stablized + type ScanIter: Stream> + Send + Unpin; + /// Return the metadata of this key value accessor. fn metadata(&self) -> Metadata; @@ -81,7 +92,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static { } /// Scan a key prefix to get all keys that start with this key. - fn scan(&self, path: &str) -> impl Future>> + MaybeSend { + fn scan(&self, path: &str) -> impl Future> + MaybeSend { let _ = path; ready(Err(Error::new( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 3a7ea3525d94..17c3eaf51a66 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -18,6 +18,9 @@ use std::sync::Arc; use std::vec::IntoIter; +use futures::lock::Mutex; +use futures::{Stream, StreamExt}; + use super::Adapter; use crate::raw::oio::HierarchyLister; use crate::raw::oio::QueueBuf; @@ -68,8 +71,8 @@ impl Access for Backend { type BlockingReader = Buffer; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Lister = HierarchyLister; - type BlockingLister = HierarchyLister; + type Lister = HierarchyLister>; + type BlockingLister = HierarchyLister; fn info(&self) -> Arc { let mut am: AccessorInfo = self.kv.metadata().into(); @@ -182,19 +185,60 @@ impl Access for Backend { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { let p = build_abs_path(&self.root, path); let res = self.kv.blocking_scan(&p)?; - let lister = KvLister::new(&self.root, res); + let lister = BlockingKvLister::new(&self.root, res); let lister = HierarchyLister::new(lister, path, args.recursive()); Ok((RpList::default(), lister)) } } -pub struct KvLister { +pub struct KvLister { + root: String, + inner: Mutex, +} + +impl KvLister +where + Iter: Stream> + Send + Unpin, +{ + fn new(root: &str, inner: Iter) -> Self { + Self { + root: root.to_string(), + inner: Mutex::new(inner), + } + } + + async fn inner_next(&mut self) -> Result> { + Ok(self.inner.lock().await.next().await.transpose()?.map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + let mut path = build_rel_path(&self.root, &v); + if path.is_empty() { + path = "/".to_string(); + } + oio::Entry::new(&path, Metadata::new(mode)) + })) + } +} + +impl oio::List for KvLister +where + Iter: Stream> + Send + Unpin, +{ + async fn next(&mut self) -> Result> { + self.inner_next().await + } +} + +pub struct BlockingKvLister { root: String, inner: IntoIter, } -impl KvLister { +impl BlockingKvLister { fn new(root: &str, inner: Vec) -> Self { Self { root: root.to_string(), @@ -218,13 +262,7 @@ impl KvLister { } } -impl oio::List for KvLister { - async fn next(&mut self) -> Result> { - Ok(self.inner_next()) - } -} - -impl oio::BlockingList for KvLister { +impl oio::BlockingList for BlockingKvLister { fn next(&mut self) -> Result> { Ok(self.inner_next()) } diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs index facb6efe1c59..dbc59520bfa2 100644 --- a/core/src/raw/adapters/kv/mod.rs +++ b/core/src/raw/adapters/kv/mod.rs @@ -21,6 +21,7 @@ mod api; pub use api::Adapter; +pub use api::EmptyScanIter; pub use api::Metadata; mod backend; diff --git a/core/src/services/atomicserver/backend.rs b/core/src/services/atomicserver/backend.rs index ac5655bead01..8cbadc45101f 100644 --- a/core/src/services/atomicserver/backend.rs +++ b/core/src/services/atomicserver/backend.rs @@ -351,6 +351,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Atomicserver, diff --git a/core/src/services/cacache/backend.rs b/core/src/services/cacache/backend.rs index 85914d864fd1..39f4cead87a7 100644 --- a/core/src/services/cacache/backend.rs +++ b/core/src/services/cacache/backend.rs @@ -85,6 +85,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Cacache, diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 6a15187c3fa1..8de7ffded46c 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -17,8 +17,11 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::vec; use bytes::Buf; +use futures::stream; +use futures::stream::iter; use http::header; use http::Request; use http::StatusCode; @@ -181,6 +184,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = stream::Iter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::CloudflareKv, @@ -240,7 +245,7 @@ impl kv::Adapter for Adapter { } } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let mut url = format!("{}/keys", self.url_prefix); if !path.is_empty() { url = format!("{}?prefix={}", url, path); @@ -261,7 +266,13 @@ impl kv::Adapter for Adapter { format!("failed to parse error response: {}", e), ) })?; - Ok(response.result.into_iter().map(|r| r.name).collect()) + Ok(iter( + response + .result + .into_iter() + .map(|r| Ok(r.name)) + .collect::>(), + )) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs index f50fd6747425..acf7bdbd2113 100644 --- a/core/src/services/d1/backend.rs +++ b/core/src/services/d1/backend.rs @@ -258,6 +258,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::D1, diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 103e7c7abb6b..59a4aa34460b 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::vec; use bb8::PooledConnection; use bb8::RunError; @@ -27,6 +28,8 @@ use etcd_client::Error as EtcdError; use etcd_client::GetOptions; use etcd_client::Identity; use etcd_client::TlsOptions; +use futures::stream; +use futures::stream::iter; use tokio::sync::OnceCell; use crate::raw::adapters::kv; @@ -271,6 +274,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = stream::Iter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Etcd, @@ -310,7 +315,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let mut client = self.conn().await?; let get_options = Some(GetOptions::new().with_prefix().with_keys_only()); let resp = client @@ -323,10 +328,10 @@ impl kv::Adapter for Adapter { Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string") .set_source(err) })?; - res.push(v); + res.push(Ok(v)); } - Ok(res) + Ok(iter(res)) } } diff --git a/core/src/services/foundationdb/backend.rs b/core/src/services/foundationdb/backend.rs index 4d4adfa52fd2..4eecb1abcf4f 100644 --- a/core/src/services/foundationdb/backend.rs +++ b/core/src/services/foundationdb/backend.rs @@ -110,6 +110,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Foundationdb, diff --git a/core/src/services/gridfs/backend.rs b/core/src/services/gridfs/backend.rs index db2bf34456ed..7d1dd9a1897d 100644 --- a/core/src/services/gridfs/backend.rs +++ b/core/src/services/gridfs/backend.rs @@ -212,6 +212,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Gridfs, diff --git a/core/src/services/libsql/backend.rs b/core/src/services/libsql/backend.rs index ff7b9551e6a8..03cddf1a1a66 100644 --- a/core/src/services/libsql/backend.rs +++ b/core/src/services/libsql/backend.rs @@ -305,6 +305,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Libsql, diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index c89b81ba704a..0627175abd4e 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -197,6 +197,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Memcached, diff --git a/core/src/services/mongodb/backend.rs b/core/src/services/mongodb/backend.rs index 24abe6fb03ef..ee7d21d93214 100644 --- a/core/src/services/mongodb/backend.rs +++ b/core/src/services/mongodb/backend.rs @@ -226,6 +226,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Mongodb, diff --git a/core/src/services/mysql/backend.rs b/core/src/services/mysql/backend.rs index 0b1481ef01ce..2b357286a3cb 100644 --- a/core/src/services/mysql/backend.rs +++ b/core/src/services/mysql/backend.rs @@ -188,6 +188,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Mysql, diff --git a/core/src/services/nebula_graph/backend.rs b/core/src/services/nebula_graph/backend.rs index d03dd5bf2b13..1189b2664d6f 100644 --- a/core/src/services/nebula_graph/backend.rs +++ b/core/src/services/nebula_graph/backend.rs @@ -19,10 +19,12 @@ use std::fmt::Debug; #[cfg(feature = "tests")] use std::time::Duration; +use std::vec; use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::Engine as _; use bb8::{PooledConnection, RunError}; +use futures::stream::{self, iter}; use rust_nebula::{ graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager, }; @@ -269,6 +271,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = stream::Iter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::NebulaGraph, @@ -359,7 +363,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let path = path.replace("'", "\\'").replace('"', "\\\""); let query = format!( "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};", @@ -381,9 +385,9 @@ impl kv::Adapter for Adapter { .map_err(parse_nebulagraph_dataset_error)?; let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?; - res_vec.push(sub_path); + res_vec.push(Ok(sub_path)); } - Ok(res_vec) + Ok(iter(res_vec)) } } diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 10b48db08137..766f1d79bf2b 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -152,6 +152,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Persy, diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index 8b4ea56eb223..dc10edd6f8ed 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -187,6 +187,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Postgresql, diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/backend.rs index d6dc290e758e..647dabc4e2c9 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/backend.rs @@ -111,6 +111,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Redb, diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index bf8cff201856..aaba3e461d2c 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -327,6 +327,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Redis, diff --git a/core/src/services/rocksdb/backend.rs b/core/src/services/rocksdb/backend.rs index 5ed0f38ec6bc..19156bcc0ecb 100644 --- a/core/src/services/rocksdb/backend.rs +++ b/core/src/services/rocksdb/backend.rs @@ -19,6 +19,8 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use futures::stream::iter; +use futures::Stream; use rocksdb::DB; use tokio::task; @@ -108,6 +110,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = Box> + Send + Unpin>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Rocksdb, @@ -164,13 +168,15 @@ impl kv::Adapter for Adapter { self.db.delete(path).map_err(parse_rocksdb_error) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) + let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) .await - .map_err(new_task_join_error)? + .map_err(new_task_join_error)??; + + Ok(Box::new(iter(res.into_iter().map(Ok)))) } /// TODO: we only need key here. diff --git a/core/src/services/sled/backend.rs b/core/src/services/sled/backend.rs index d1d1e7f18453..4b72f08af806 100644 --- a/core/src/services/sled/backend.rs +++ b/core/src/services/sled/backend.rs @@ -19,6 +19,8 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::str; +use futures::stream::iter; +use futures::Stream; use tokio::task; use crate::raw::adapters::kv; @@ -137,6 +139,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = Box> + Send + Unpin>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Sled, @@ -199,13 +203,15 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) + let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) .await - .map_err(new_task_join_error)? + .map_err(new_task_join_error)??; + + Ok(Box::new(iter(res.into_iter().map(Ok)))) } fn blocking_scan(&self, path: &str) -> Result> { diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 0b5c556f8cb3..4b56fda054c4 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -17,8 +17,15 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::pin::Pin; use std::str::FromStr; +use std::task::Context; +use std::task::Poll; +use futures::stream::BoxStream; +use futures::Stream; +use futures::StreamExt; +use ouroboros::self_referencing; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; use tokio::sync::OnceCell; @@ -188,7 +195,27 @@ impl Adapter { } } +#[self_referencing] +pub struct SqlStream { + pool: SqlitePool, + query: String, + + #[borrows(pool, query)] + #[covariant] + stream: BoxStream<'this, Result>, +} + +impl Stream for SqlStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.with_stream_mut(|s| s.poll_next_unpin(cx)) + } +} + impl kv::Adapter for Adapter { + type ScanIter = SqlStream; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Sqlite, @@ -249,19 +276,25 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let pool = self.get_client().await?; + let stream = SqlStreamBuilder { + pool: pool.clone(), + query: format!( + "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", + self.key_field, self.table, self.key_field + ), + stream_builder: |pool, query| { + sqlx::query_scalar(query) + .bind(format!("{path}%")) + .fetch(pool) + .map(|v| v.map_err(parse_sqlite_error)) + .boxed() + }, + } + .build(); - let value = sqlx::query_scalar(&format!( - "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", - self.key_field, self.table, self.key_field - )) - .bind(format!("{path}%")) - .fetch_all(pool) - .await - .map_err(parse_sqlite_error)?; - - Ok(value) + Ok(stream) } } diff --git a/core/src/services/surrealdb/backend.rs b/core/src/services/surrealdb/backend.rs index a7c098a0ad8e..4d5d26576117 100644 --- a/core/src/services/surrealdb/backend.rs +++ b/core/src/services/surrealdb/backend.rs @@ -283,6 +283,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Surrealdb, diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 6f23f156ee70..1d34847990d7 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -185,6 +185,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Tikv,