diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f0f8f0f5b..5be609c0b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,6 +71,9 @@ jobs: - name: Test run: cargo test --no-fail-fast --all-targets --all-features --workspace - + + - name: Async-std Test + run: cargo test --no-fail-fast --all-targets --no-default-features --features "async-std" --features "storage-fs" --workspace + - name: Doc Test run: cargo test --no-fail-fast --doc --all-features --workspace diff --git a/Cargo.toml b/Cargo.toml index 106aa8af0..ce4d3003d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ arrow-select = { version = "52" } arrow-string = { version = "52" } async-stream = "0.3.5" async-trait = "0.1" +async-std = "1.12.0" aws-config = "1.1.8" aws-sdk-glue = "1.21.0" bimap = "0.6" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6cad4adbe..c43f54f77 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,12 +29,15 @@ license = { workspace = true } keywords = ["iceberg"] [features] -default = ["storage-fs", "storage-s3"] +default = ["storage-fs", "storage-s3", "tokio"] storage-all = ["storage-fs", "storage-s3"] storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] +async-std = ["dep:async-std"] +tokio = ["dep:tokio"] + [dependencies] anyhow = { workspace = true } apache-avro = { workspace = true } @@ -45,6 +48,7 @@ arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } +async-std = { workspace = true, optional = true, features = ["attributes"] } async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } @@ -71,6 +75,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } +tokio = { workspace = true, optional = true } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } @@ -81,4 +86,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } -tokio = { workspace = true } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 475a0584a..3985884c0 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -50,5 +50,7 @@ pub mod expr; pub mod transaction; pub mod transform; +mod runtime; + pub mod arrow; pub mod writer; diff --git a/crates/iceberg/src/runtime/mod.rs b/crates/iceberg/src/runtime/mod.rs new file mode 100644 index 000000000..453b1564a --- /dev/null +++ b/crates/iceberg/src/runtime/mod.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the async runtime abstraction for iceberg. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub enum JoinHandle { + #[cfg(feature = "tokio")] + Tokio(tokio::task::JoinHandle), + #[cfg(all(feature = "async-std", not(feature = "tokio")))] + AsyncStd(async_std::task::JoinHandle), +} + +impl Future for JoinHandle { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + #[cfg(feature = "tokio")] + JoinHandle::Tokio(handle) => Pin::new(handle) + .poll(cx) + .map(|h| h.expect("tokio spawned task failed")), + #[cfg(all(feature = "async-std", not(feature = "tokio")))] + JoinHandle::AsyncStd(handle) => Pin::new(handle).poll(cx), + } + } +} + +#[allow(dead_code)] +pub fn spawn(f: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[cfg(feature = "tokio")] + return JoinHandle::Tokio(tokio::task::spawn(f)); + + #[cfg(all(feature = "async-std", not(feature = "tokio")))] + return JoinHandle::AsyncStd(async_std::task::spawn(f)); +} + +#[allow(dead_code)] +pub fn spawn_blocking(f: F) -> JoinHandle +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + #[cfg(feature = "tokio")] + return JoinHandle::Tokio(tokio::task::spawn_blocking(f)); + + #[cfg(all(feature = "async-std", not(feature = "tokio")))] + return JoinHandle::AsyncStd(async_std::task::spawn_blocking(f)); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn test_tokio_spawn() { + let handle = spawn(async { 1 + 1 }); + assert_eq!(handle.await, 2); + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn test_tokio_spawn_blocking() { + let handle = spawn_blocking(|| 1 + 1); + assert_eq!(handle.await, 2); + } + + #[cfg(all(feature = "async-std", not(feature = "tokio")))] + #[async_std::test] + async fn test_async_std_spawn() { + let handle = spawn(async { 1 + 1 }); + assert_eq!(handle.await, 2); + } + + #[cfg(all(feature = "async-std", not(feature = "tokio")))] + #[async_std::test] + async fn test_async_std_spawn_blocking() { + let handle = spawn_blocking(|| 1 + 1); + assert_eq!(handle.await, 2); + } +}