Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Oct 21, 2024
1 parent b014ca6 commit 241a1e7
Show file tree
Hide file tree
Showing 24 changed files with 111 additions and 69 deletions.
69 changes: 59 additions & 10 deletions core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,77 @@

use std::fmt::Debug;
use std::future::ready;
use std::ops::DerefMut;

use futures::stream::Empty;
use futures::Future;
use futures::Stream;

use crate::raw::*;
use crate::Capability;
use crate::Scheme;
use crate::*;

/// A noop placeholder for Adapter::ScanIter
pub type EmptyScanIter = Empty<Result<String>>;
/// ScanIter is the async iterator returned by `Adapter::scan`.
pub trait Scan: Send + Sync + Unpin {
/// Fetch the next key in the current key prefix
///
/// `None` means no further key will be returned
fn next(&mut self) -> impl Future<Output = Option<Result<String>>> + MaybeSend;
}

/// A noop implementation of Scan
impl Scan for () {
async fn next(&mut self) -> Option<Result<String>> {
None
}
}

/// A ScanIterator implementation for all trivial non-async iterators
pub struct ScanStdIter<I>(I);

impl<I> ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
{
/// Create a new ScanStdIter from an Iterator
pub fn new(inner: I) -> Self {
Self(inner)
}
}

impl<I> Scan for ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
{
async fn next(&mut self) -> Option<Result<String>> {
self.0.next()
}
}

/// A type-erased wrapper of Scan
pub type Scanner = Box<dyn ScanDyn>;

pub trait ScanDyn: Unpin + Send + Sync {
fn next_dyn(&mut self) -> BoxedFuture<Option<Result<String>>>;
}

impl<T: Scan + ?Sized> ScanDyn for T {
fn next_dyn(&mut self) -> BoxedFuture<Option<Result<String>>> {
Box::pin(self.next())
}
}

impl<T: ScanDyn + ?Sized> Scan for Box<T> {
async fn next(&mut self) -> Option<Result<String>> {
self.deref_mut().next_dyn().await
}
}

/// KvAdapter is the adapter to underlying kv services.
///
/// By implement this trait, any kv service can work as an OpenDAL Service.
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
/// async iterator type for Adapter::scan()
///
/// TODO: consider to replace it with std::async_iter::AsyncIterator after stablized
/// TODO: use default associate type `= EmptyScanIter` after stablized
type ScanIter: Stream<Item = Result<String>> + Send + Unpin;
/// TODO: use default associate type `= ()` after stablized
type Scanner: Scan;

/// Return the metadata of this key value accessor.
fn metadata(&self) -> Metadata;
Expand Down Expand Up @@ -92,7 +141,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
}

/// Scan a key prefix to get all keys that start with this key.
fn scan(&self, path: &str) -> impl Future<Output = Result<Self::ScanIter>> + MaybeSend {
fn scan(&self, path: &str) -> impl Future<Output = Result<Self::Scanner>> + MaybeSend {
let _ = path;

ready(Err(Error::new(
Expand Down
17 changes: 7 additions & 10 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
use std::sync::Arc;
use std::vec::IntoIter;

use futures::lock::Mutex;
use futures::{Stream, StreamExt};

use super::Adapter;
use super::{Adapter, Scan};
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::QueueBuf;
use crate::raw::*;
Expand Down Expand Up @@ -71,7 +68,7 @@ impl<S: Adapter> Access for Backend<S> {
type BlockingReader = Buffer;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Lister = HierarchyLister<KvLister<S::ScanIter>>;
type Lister = HierarchyLister<KvLister<S::Scanner>>;
type BlockingLister = HierarchyLister<BlockingKvLister>;

fn info(&self) -> Arc<AccessorInfo> {
Expand Down Expand Up @@ -194,22 +191,22 @@ impl<S: Adapter> Access for Backend<S> {

pub struct KvLister<Iter> {
root: String,
inner: Mutex<Iter>,
inner: Iter,
}

impl<Iter> KvLister<Iter>
where
Iter: Stream<Item = Result<String>> + Send + Unpin,
Iter: Scan,
{
fn new(root: &str, inner: Iter) -> Self {
Self {
root: root.to_string(),
inner: Mutex::new(inner),
inner: inner,
}
}

async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner.lock().await.next().await.transpose()?.map(|v| {
Ok(self.inner.next().await.transpose()?.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
Expand All @@ -226,7 +223,7 @@ where

impl<Iter> oio::List for KvLister<Iter>
where
Iter: Stream<Item = Result<String>> + Send + Unpin,
Iter: Scan,
{
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner_next().await
Expand Down
4 changes: 3 additions & 1 deletion core/src/raw/adapters/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
mod api;
pub use api::Adapter;
pub use api::EmptyScanIter;
pub use api::Metadata;
pub use api::Scan;
pub use api::ScanStdIter;
pub use api::Scanner;

mod backend;
pub use backend::Backend;
2 changes: 1 addition & 1 deletion core/src/services/atomicserver/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cacache/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
17 changes: 5 additions & 12 deletions core/src/services/cloudflare_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::vec;

use bytes::Buf;
use futures::stream;
use futures::stream::iter;
use http::header;
use http::Request;
use http::StatusCode;
Expand Down Expand Up @@ -184,7 +181,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = stream::Iter<vec::IntoIter<Result<String>>>;
type Scanner = kv::Scanner;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down Expand Up @@ -245,7 +242,7 @@ impl kv::Adapter for Adapter {
}
}

async fn scan(&self, path: &str) -> Result<Self::ScanIter> {
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let mut url = format!("{}/keys", self.url_prefix);
if !path.is_empty() {
url = format!("{}?prefix={}", url, path);
Expand All @@ -266,13 +263,9 @@ impl kv::Adapter for Adapter {
format!("failed to parse error response: {}", e),
)
})?;
Ok(iter(
response
.result
.into_iter()
.map(|r| Ok(r.name))
.collect::<Vec<_>>(),
))
Ok(Box::new(kv::ScanStdIter::new(
response.result.into_iter().map(|r| Ok(r.name)),
)))
}
_ => Err(parse_error(resp)),
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/d1/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
8 changes: 3 additions & 5 deletions core/src/services/etcd/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use etcd_client::Error as EtcdError;
use etcd_client::GetOptions;
use etcd_client::Identity;
use etcd_client::TlsOptions;
use futures::stream;
use futures::stream::iter;
use tokio::sync::OnceCell;

use crate::raw::adapters::kv;
Expand Down Expand Up @@ -274,7 +272,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = stream::Iter<vec::IntoIter<Result<String>>>;
type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down Expand Up @@ -315,7 +313,7 @@ impl kv::Adapter for Adapter {
Ok(())
}

async fn scan(&self, path: &str) -> Result<Self::ScanIter> {
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let mut client = self.conn().await?;
let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
let resp = client
Expand All @@ -331,7 +329,7 @@ impl kv::Adapter for Adapter {
res.push(Ok(v));
}

Ok(iter(res))
Ok(kv::ScanStdIter::new(res.into_iter()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/foundationdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/gridfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/libsql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/memcached/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/mongodb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/mysql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
7 changes: 3 additions & 4 deletions core/src/services/nebula_graph/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::vec;
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::engine::Engine as _;
use bb8::{PooledConnection, RunError};
use futures::stream::{self, iter};
use rust_nebula::{
graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager,
};
Expand Down Expand Up @@ -271,7 +270,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = stream::Iter<vec::IntoIter<Result<String>>>;
type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down Expand Up @@ -363,7 +362,7 @@ impl kv::Adapter for Adapter {
Ok(())
}

async fn scan(&self, path: &str) -> Result<Self::ScanIter> {
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let path = path.replace("'", "\\'").replace('"', "\\\"");
let query = format!(
"LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};",
Expand All @@ -387,7 +386,7 @@ impl kv::Adapter for Adapter {

res_vec.push(Ok(sub_path));
}
Ok(iter(res_vec))
Ok(kv::ScanStdIter::new(res_vec.into_iter()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/persy/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/postgresql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/redb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type ScanIter = kv::EmptyScanIter;
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down
Loading

0 comments on commit 241a1e7

Please sign in to comment.