Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: runtime module #233

Merged
merged 21 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

[workspace]
resolver = "2"
odysa marked this conversation as resolved.
Show resolved Hide resolved
members = ["crates/catalog/*", "crates/examples", "crates/iceberg", "crates/test_utils"]
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/test_utils",
]

[workspace.package]
version = "0.2.0"
Expand All @@ -36,10 +41,11 @@ arrow-array = { version = ">=46" }
arrow-schema = { version = ">=46" }
async-stream = "0.3.5"
async-trait = "0.1"
async-std = "1.12.0"
bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.5"
chrono = "0.4"
chrono = "0.4.34"
derive_builder = "0.20.0"
either = "1"
env_logger = "0.11.0"
Expand Down
8 changes: 7 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ repository = { workspace = true }
license = { workspace = true }
keywords = ["iceberg"]

[features]
async-std = ["dep:async-std"]
tokio = ["dep:tokio"]
default = ["tokio"]

[dependencies]
anyhow = { workspace = true }
apache-avro = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-std = { workspace = true, optional = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
Expand All @@ -59,7 +65,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ pub mod expr;
pub mod transaction;
pub mod transform;

mod runtime;

pub mod arrow;
pub mod writer;
24 changes: 24 additions & 0 deletions crates/iceberg/src/runtime/async_std_backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::runtime::{JoinHandle, JoinHandleExt};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub fn spawn<F>(future: F) -> JoinHandle<F::Output, async_std::task::JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
JoinHandle {
inner: async_std::task::spawn(future),
_marker: Default::default(),
}
}

impl<T> JoinHandleExt for async_std::task::JoinHandle<T> {
type Output = T;
fn poll_join(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().poll(cx)
}
}
36 changes: 36 additions & 0 deletions crates/iceberg/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::future::Future;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(feature = "tokio")]
pub mod tokio_backend;
#[cfg(feature = "tokio")]
pub use tokio_backend::*;

#[cfg(all(feature = "async-std", not(feature = "tokio"),))]
pub mod async_std_backend;
#[cfg(all(feature = "async-std", not(feature = "tokio"),))]
pub use async_std_backend::*;

pub trait JoinHandleExt {
type Output;
fn poll_join(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub struct JoinHandle<T, J> {
inner: J,
_marker: PhantomData<T>,
}

impl<T, J> Future for JoinHandle<T, J>
where
T: Send + 'static + Unpin,
J: JoinHandleExt<Output = T> + Unpin,
{
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll_join(cx)
}
}
26 changes: 26 additions & 0 deletions crates/iceberg/src/runtime/tokio_backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::runtime::{JoinHandle, JoinHandleExt};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub fn spawn<F>(future: F) -> JoinHandle<F::Output, tokio::task::JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
JoinHandle {
inner: tokio::spawn(future),
_marker: Default::default(),
}
}

impl<T> JoinHandleExt for tokio::task::JoinHandle<T> {
type Output = T;
fn poll_join(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut()
.poll(cx)
.map(|res| res.expect("tokio spawned task crashed"))
}
}
27 changes: 21 additions & 6 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

use crate::arrow::ArrowReaderBuilder;
use crate::io::FileIO;
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef};
use crate::spec::{
DataContentType, Manifest, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind};
use crate::{runtime, Error, ErrorKind};
use arrow_array::RecordBatch;
use futures::future::try_join_all;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
use itertools::Itertools;
use std::sync::Arc;

/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
Expand Down Expand Up @@ -150,10 +155,21 @@ impl TableScan {

// Generate data file stream
let mut file_scan_tasks = Vec::with_capacity(manifest_list.entries().len());
for manifest_list_entry in manifest_list.entries().iter() {
// Data file
let manifest = manifest_list_entry.load_manifest(&self.file_io).await?;

let tasks = manifest_list
.entries()
.iter()
.map(|manifest| {
let cloned_manifest = Arc::new(manifest.clone());
let file_io = Arc::new(self.file_io.clone());
async move { cloned_manifest.load_manifest(file_io).await }
})
.map(runtime::spawn)
.collect_vec();

let manifests: Vec<Manifest> = try_join_all(tasks).await?;

for manifest in manifests {
for manifest_entry in manifest.entries().iter().filter(|e| e.is_alive()) {
match manifest_entry.content_type() {
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
Expand All @@ -172,7 +188,6 @@ impl TableScan {
}
}
}

Ok(iter(file_scan_tasks).boxed())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! ManifestList for Iceberg.

use std::sync::Arc;
use std::{collections::HashMap, str::FromStr};

use crate::io::FileIO;
Expand Down Expand Up @@ -634,7 +635,7 @@ impl ManifestFile {
/// Load [`Manifest`].
///
/// This method will also initialize inherited values of [`ManifestEntry`], such as `sequence_number`.
pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
pub async fn load_manifest(&self, file_io: Arc<FileIO>) -> Result<Manifest> {
let mut avro = Vec::new();
file_io
.new_input(&self.manifest_path)?
Expand Down
99 changes: 57 additions & 42 deletions crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Display for Datum {
(_, PrimitiveLiteral::TimestampTZ(val)) => {
write!(f, "{}", microseconds_to_datetimetz(*val))
}
(_, PrimitiveLiteral::String(val)) => write!(f, "{}", val),
(_, PrimitiveLiteral::String(val)) => write!(f, r#""{}""#, val),
(_, PrimitiveLiteral::UUID(val)) => write!(f, "{}", val),
(_, PrimitiveLiteral::Fixed(val)) => display_bytes(val, f),
(_, PrimitiveLiteral::Binary(val)) => display_bytes(val, f),
Expand Down Expand Up @@ -401,10 +401,10 @@ impl Datum {
/// ```
pub fn time_from_hms_micro(hour: u32, min: u32, sec: u32, micro: u32) -> Result<Self> {
let t = NaiveTime::from_hms_micro_opt(hour, min, sec, micro)
.ok_or_else(|| Error::new(
ErrorKind::DataInvalid,
format!("Can't create time from hour: {hour}, min: {min}, second: {sec}, microsecond: {micro}"),
))?;
.ok_or_else(|| Error::new(
odysa marked this conversation as resolved.
Show resolved Hide resolved
ErrorKind::DataInvalid,
format!("Can't create time from hour: {hour}, min: {min}, second: {sec}, microsecond: {micro}"),
))?;
Ok(Self::time_from_naive_time(t))
}

Expand Down Expand Up @@ -529,7 +529,7 @@ impl Datum {
/// use iceberg::spec::Datum;
/// let t = Datum::string("ss");
///
/// assert_eq!(&format!("{t}"), "ss");
/// assert_eq!(&format!("{t}"), r#""ss""#);
/// ```
pub fn string<S: ToString>(s: S) -> Self {
Self {
Expand Down Expand Up @@ -658,6 +658,21 @@ impl Datum {
unreachable!("Decimal type must be primitive.")
}
}

/// Convert the datum to `target_type`.
pub fn to(self, target_type: &Type) -> Result<Datum> {
// TODO: We should allow more type conversions
match target_type {
Type::Primitive(typ) if typ == &self.r#type => Ok(self),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Can't convert datum from {} type to {} type.",
self.r#type, target_type
),
)),
}
}
}

/// Values present in iceberg type
Expand Down Expand Up @@ -872,10 +887,10 @@ impl Literal {
/// ```
pub fn time_from_hms_micro(hour: u32, min: u32, sec: u32, micro: u32) -> Result<Self> {
let t = NaiveTime::from_hms_micro_opt(hour, min, sec, micro)
.ok_or_else(|| Error::new(
ErrorKind::DataInvalid,
format!("Can't create time from hour: {hour}, min: {min}, second: {sec}, microsecond: {micro}"),
))?;
.ok_or_else(|| Error::new(
ErrorKind::DataInvalid,
format!("Can't create time from hour: {hour}, min: {min}, second: {sec}, microsecond: {micro}"),
))?;
Ok(Self::time_from_naive_time(t))
}

Expand Down Expand Up @@ -1964,32 +1979,32 @@ mod _serde {
})
} else {
let list = map.into_iter().map(|(k,v)| {
let raw_k =
RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?;
let raw_v = v
.map(|v| {
RawLiteralEnum::try_from(v, &map_ty.value_field.field_type)
})
.transpose()?;
if map_ty.value_field.required {
Ok(Some(RawLiteralEnum::Record(Record {
required: vec![
(MAP_KEY_FIELD_NAME.to_string(), raw_k),
(MAP_VALUE_FIELD_NAME.to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?),
],
optional: vec![],
})))
} else {
Ok(Some(RawLiteralEnum::Record(Record {
required: vec![
(MAP_KEY_FIELD_NAME.to_string(), raw_k),
],
optional: vec![
(MAP_VALUE_FIELD_NAME.to_string(), raw_v)
],
})))
}
}).collect::<Result<_, Error>>()?;
let raw_k =
RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?;
let raw_v = v
.map(|v| {
RawLiteralEnum::try_from(v, &map_ty.value_field.field_type)
})
.transpose()?;
if map_ty.value_field.required {
Ok(Some(RawLiteralEnum::Record(Record {
required: vec![
(MAP_KEY_FIELD_NAME.to_string(), raw_k),
(MAP_VALUE_FIELD_NAME.to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?),
],
optional: vec![],
})))
} else {
Ok(Some(RawLiteralEnum::Record(Record {
required: vec![
(MAP_KEY_FIELD_NAME.to_string(), raw_k),
],
optional: vec![
(MAP_VALUE_FIELD_NAME.to_string(), raw_v)
],
})))
}
}).collect::<Result<_, Error>>()?;
RawLiteralEnum::List(List {
list,
required: true,
Expand All @@ -2009,12 +2024,12 @@ mod _serde {
pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> {
let invalid_err = |v: &str| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Unable to convert raw literal ({}) fail convert to type {} for: type mismatch",
v, ty
),
)
ErrorKind::DataInvalid,
format!(
"Unable to convert raw literal ({}) fail convert to type {} for: type mismatch",
v, ty
),
)
};
let invalid_err_with_reason = |v: &str, reason: &str| {
Error::new(
Expand Down