Skip to content

Commit

Permalink
feat(adapter/kv): support async iterating on scan results
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Oct 20, 2024
1 parent 32ddf46 commit b014ca6
Show file tree
Hide file tree
Showing 26 changed files with 232 additions and 39 deletions.
45 changes: 45 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ services-s3 = [
services-seafile = []
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
services-sled = ["dep:sled", "internal-tokio-rt"]
services-sqlite = ["dep:sqlx", "sqlx?/sqlite"]
services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"]
services-supabase = []
services-surrealdb = ["dep:surrealdb"]
services-swift = []
Expand Down Expand Up @@ -277,6 +277,9 @@ sqlx = { version = "0.8.0", features = [
# For http based services.
reqsign = { version = "0.16", default-features = false, optional = true }

# for self-referencing structs
ouroboros = { version = "0.18.4", optional = true }

# for services-atomic-server
atomic_lib = { version = "0.39.0", optional = true }
# for services-cacache
Expand Down
13 changes: 12 additions & 1 deletion core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,28 @@
use std::fmt::Debug;
use std::future::ready;

use futures::stream::Empty;
use futures::Future;
use futures::Stream;

use crate::raw::*;
use crate::Capability;
use crate::Scheme;
use crate::*;

/// A noop placeholder for Adapter::ScanIter
pub type EmptyScanIter = Empty<Result<String>>;

/// KvAdapter is the adapter to underlying kv services.
///
/// By implement this trait, any kv service can work as an OpenDAL Service.
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
/// async iterator type for Adapter::scan()
///
/// TODO: consider to replace it with std::async_iter::AsyncIterator after stablized
/// TODO: use default associate type `= EmptyScanIter` after stablized
type ScanIter: Stream<Item = Result<String>> + Send + Unpin;

/// Return the metadata of this key value accessor.
fn metadata(&self) -> Metadata;

Expand Down Expand Up @@ -81,7 +92,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
}

/// Scan a key prefix to get all keys that start with this key.
fn scan(&self, path: &str) -> impl Future<Output = Result<Vec<String>>> + MaybeSend {
fn scan(&self, path: &str) -> impl Future<Output = Result<Self::ScanIter>> + MaybeSend {
let _ = path;

ready(Err(Error::new(
Expand Down
62 changes: 50 additions & 12 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
use std::sync::Arc;
use std::vec::IntoIter;

use futures::lock::Mutex;
use futures::{Stream, StreamExt};

use super::Adapter;
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::QueueBuf;
Expand Down Expand Up @@ -68,8 +71,8 @@ impl<S: Adapter> Access for Backend<S> {
type BlockingReader = Buffer;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Lister = HierarchyLister<KvLister>;
type BlockingLister = HierarchyLister<KvLister>;
type Lister = HierarchyLister<KvLister<S::ScanIter>>;
type BlockingLister = HierarchyLister<BlockingKvLister>;

fn info(&self) -> Arc<AccessorInfo> {
let mut am: AccessorInfo = self.kv.metadata().into();
Expand Down Expand Up @@ -182,19 +185,60 @@ impl<S: Adapter> Access for Backend<S> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.blocking_scan(&p)?;
let lister = KvLister::new(&self.root, res);
let lister = BlockingKvLister::new(&self.root, res);
let lister = HierarchyLister::new(lister, path, args.recursive());

Ok((RpList::default(), lister))
}
}

pub struct KvLister {
pub struct KvLister<Iter> {
root: String,
inner: Mutex<Iter>,
}

impl<Iter> KvLister<Iter>
where
Iter: Stream<Item = Result<String>> + Send + Unpin,
{
fn new(root: &str, inner: Iter) -> Self {
Self {
root: root.to_string(),
inner: Mutex::new(inner),
}
}

async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner.lock().await.next().await.transpose()?.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
EntryMode::FILE
};
let mut path = build_rel_path(&self.root, &v);
if path.is_empty() {
path = "/".to_string();
}
oio::Entry::new(&path, Metadata::new(mode))
}))
}
}

impl<Iter> oio::List for KvLister<Iter>
where
Iter: Stream<Item = Result<String>> + Send + Unpin,
{
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner_next().await
}
}

pub struct BlockingKvLister {
root: String,
inner: IntoIter<String>,
}

impl KvLister {
impl BlockingKvLister {
fn new(root: &str, inner: Vec<String>) -> Self {
Self {
root: root.to_string(),
Expand All @@ -218,13 +262,7 @@ impl KvLister {
}
}

impl oio::List for KvLister {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
}

impl oio::BlockingList for KvLister {
impl oio::BlockingList for BlockingKvLister {
fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/adapters/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod api;
pub use api::Adapter;
pub use api::EmptyScanIter;
pub use api::Metadata;

mod backend;
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/atomicserver/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Atomicserver,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/cacache/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Cacache,
Expand Down
15 changes: 13 additions & 2 deletions core/src/services/cloudflare_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::vec;

use bytes::Buf;
use futures::stream;
use futures::stream::iter;
use http::header;
use http::Request;
use http::StatusCode;
Expand Down Expand Up @@ -181,6 +184,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = stream::Iter<vec::IntoIter<Result<String>>>;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::CloudflareKv,
Expand Down Expand Up @@ -240,7 +245,7 @@ impl kv::Adapter for Adapter {
}
}

async fn scan(&self, path: &str) -> Result<Vec<String>> {
async fn scan(&self, path: &str) -> Result<Self::ScanIter> {
let mut url = format!("{}/keys", self.url_prefix);
if !path.is_empty() {
url = format!("{}?prefix={}", url, path);
Expand All @@ -261,7 +266,13 @@ impl kv::Adapter for Adapter {
format!("failed to parse error response: {}", e),
)
})?;
Ok(response.result.into_iter().map(|r| r.name).collect())
Ok(iter(
response
.result
.into_iter()
.map(|r| Ok(r.name))
.collect::<Vec<_>>(),
))
}
_ => Err(parse_error(resp)),
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/d1/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::D1,
Expand Down
11 changes: 8 additions & 3 deletions core/src/services/etcd/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::vec;

use bb8::PooledConnection;
use bb8::RunError;
Expand All @@ -27,6 +28,8 @@ use etcd_client::Error as EtcdError;
use etcd_client::GetOptions;
use etcd_client::Identity;
use etcd_client::TlsOptions;
use futures::stream;
use futures::stream::iter;
use tokio::sync::OnceCell;

use crate::raw::adapters::kv;
Expand Down Expand Up @@ -271,6 +274,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = stream::Iter<vec::IntoIter<Result<String>>>;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Etcd,
Expand Down Expand Up @@ -310,7 +315,7 @@ impl kv::Adapter for Adapter {
Ok(())
}

async fn scan(&self, path: &str) -> Result<Vec<String>> {
async fn scan(&self, path: &str) -> Result<Self::ScanIter> {
let mut client = self.conn().await?;
let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
let resp = client
Expand All @@ -323,10 +328,10 @@ impl kv::Adapter for Adapter {
Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
.set_source(err)
})?;
res.push(v);
res.push(Ok(v));
}

Ok(res)
Ok(iter(res))
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/services/foundationdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Foundationdb,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gridfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Gridfs,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/libsql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Libsql,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/memcached/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Memcached,
Expand Down
Loading

0 comments on commit b014ca6

Please sign in to comment.