Skip to content

Commit

Permalink
switch from feature = "nightly" to autodetection due to rust-lang/car…
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 14, 2020
1 parent 0badd8b commit e0da6b2
Show file tree
Hide file tree
Showing 35 changed files with 419 additions and 371 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
build = "src/build.rs"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
Expand All @@ -30,10 +31,9 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"]
postgres = ["amadeus-postgres", "amadeus-derive/postgres"]
csv = ["amadeus-serde", "amadeus-derive/serde"]
json = ["amadeus-serde", "amadeus-derive/serde"]
nightly = ["amadeus-core/nightly"]

[package.metadata.docs.rs]
features = ["nightly", "constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.3.1", path = "amadeus-core" }
Expand Down Expand Up @@ -63,6 +63,9 @@ serde_json = "1.0"
streaming_algorithms = "0.2"
tokio = { version = "0.2", features = ["macros", "time"] }

[build-dependencies]
rustversion = "1.0"

[patch.crates-io]
vec-utils = { version = "*", git = "https://github.com/alecmocatta/vec-utils", branch = "stable" }
streaming_algorithms = { version = "*", git = "https://github.com/alecmocatta/streaming_algorithms", branch = "stable" }
Expand Down
7 changes: 4 additions & 3 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
build = "src/build.rs"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
maintenance = { status = "actively-developed" }

[features]
nightly = []

[dependencies]
amadeus-core = { version = "=0.3.1", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.1", path = "../amadeus-types" }
Expand All @@ -43,3 +41,6 @@ vec-utils = "0.2"
# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation
[target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies]
openssl = { version = "0.10", features = ["vendored"] }

[build-dependencies]
rustversion = "1.0"
1 change: 1 addition & 0 deletions amadeus-aws/src/build.rs
10 changes: 5 additions & 5 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl Cloudfront {
}
}

#[cfg(not(feature = "nightly"))]
#[cfg(not(nightly))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<CloudfrontRow, AwsError>> + Send>>;
#[cfg(feature = "nightly")]
#[cfg(nightly)]
type Output = impl Stream<Item = Result<CloudfrontRow, AwsError>> + Send;

FnMutNamed! {
Expand Down Expand Up @@ -103,7 +103,7 @@ FnMutNamed! {
}
.flatten_stream()
.map(|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(identity));
#[cfg(not(feature = "nightly"))]
#[cfg(not(nightly))]
let ret = ret.boxed();
ret
}
Expand All @@ -114,13 +114,13 @@ impl Source for Cloudfront {
type Error = AwsError;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(feature = "nightly"))]
#[cfg(not(nightly))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(feature = "nightly")]
#[cfg(nightly)]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
Expand Down
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.1")]
#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
Expand Down
7 changes: 4 additions & 3 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
build = "src/build.rs"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
maintenance = { status = "actively-developed" }

[features]
nightly = []

[dependencies]
amadeus-core = { version = "=0.3.1", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.1", path = "../amadeus-types" }
Expand All @@ -37,3 +35,6 @@ url = { version = "2.1", features = ["serde"] }
# dependency of reqwest/native-tls; ensure it's vendored to simplify cross-compilation
[target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies]
openssl = { version = "0.10", features = ["vendored"] }

[build-dependencies]
rustversion = "1.0"
1 change: 1 addition & 0 deletions amadeus-commoncrawl/src/build.rs
12 changes: 6 additions & 6 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.1")]
#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
Expand Down Expand Up @@ -80,9 +80,9 @@ impl CommonCrawl {
}
}

#[cfg(not(feature = "nightly"))]
#[cfg(not(nightly))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<Webpage<'static>, io::Error>> + Send>>;
#[cfg(feature = "nightly")]
#[cfg(nightly)]
type Output = impl Stream<Item = Result<Webpage<'static>, io::Error>> + Send;

FnMutNamed! {
Expand All @@ -99,7 +99,7 @@ FnMutNamed! {
WarcParser::new(body)
}
.flatten_stream();
#[cfg(not(feature = "nightly"))]
#[cfg(not(nightly))]
let ret = ret.boxed();
ret
}
Expand All @@ -110,13 +110,13 @@ impl Source for CommonCrawl {
type Error = io::Error;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(feature = "nightly"))]
#[cfg(not(nightly))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(feature = "nightly")]
#[cfg(nightly)]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
Expand Down
8 changes: 4 additions & 4 deletions amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
build = "src/build.rs"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
maintenance = { status = "actively-developed" }

[features]
# uses Fn*() sugar (better docs), and SIMD for streaming_algorithms
nightly = ["streaming_algorithms/nightly"]

[dependencies]
async-trait = "0.1"
derive-new = "0.5"
Expand All @@ -39,3 +36,6 @@ streaming_algorithms = "0.2"
sum = { version = "0.1", features = ["futures", "serde"] }
walkdir = "2.2"
widestring = "0.4"

[build-dependencies]
rustversion = "1.0"
1 change: 1 addition & 0 deletions amadeus-core/src/build.rs
2 changes: 1 addition & 1 deletion amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/).

#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.1")]
#![cfg_attr(feature = "nightly", feature(unboxed_closures))]
#![cfg_attr(nightly, feature(unboxed_closures))]
#![recursion_limit = "25600"]
#![warn(
// missing_copy_implementations,
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,4 @@ macro_rules! pipe {
}

pipe!(ParallelPipe ParallelSink FromParallelStream Send ops assert_parallel_pipe assert_parallel_sink);
pipe!(DistributedPipe DistributedSink FromDistributedStream ProcessSend traits assert_distributed_pipe assert_distributed_sink cfg_attr(not(feature = "nightly"), serde_closure::desugar));
pipe!(DistributedPipe DistributedSink FromDistributedStream ProcessSend traits assert_distributed_pipe assert_distributed_sink cfg_attr(not(nightly), serde_closure::desugar));
2 changes: 1 addition & 1 deletion amadeus-core/src/par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel
}
});

stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream IntoDistributedStream into_dist_stream DistStream ProcessPool ProcessSend traits assert_distributed_stream cfg_attr(not(feature = "nightly"), serde_closure::desugar) {
stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream IntoDistributedStream into_dist_stream DistStream ProcessPool ProcessSend traits assert_distributed_stream cfg_attr(not(nightly), serde_closure::desugar) {
async fn reduce<P, B, R1, R2, R3>(
mut self, pool: &P, reduce_a_factory: R1, reduce_b_factory: R2, reduce_c: R3,
) -> B
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_stream/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl_par_dist! {
mod workaround {
use super::*;

#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)]
#[cfg_attr(not(nightly), serde_closure::desugar)]
#[doc(hidden)]
impl Identity {
pub fn pipe<S>(self, sink: S) -> Pipe<Self, S> {
Expand Down
4 changes: 2 additions & 2 deletions amadeus-core/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl<T: ?Sized> ProcessSend for T where T: Send + Serialize + for<'de> Deseriali

type Result<T> = std::result::Result<T, Box<dyn Error + Send>>;

#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)]
#[cfg_attr(not(nightly), serde_closure::desugar)]
pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin {
type ThreadPool: ThreadPool + 'static;

Expand All @@ -31,7 +31,7 @@ pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin {
T: Send + 'static;
}

#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)]
#[cfg_attr(not(nightly), serde_closure::desugar)]
impl<P: ?Sized> ProcessPool for &P
where
P: ProcessPool,
Expand Down
8 changes: 5 additions & 3 deletions amadeus-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
build = "src/build.rs"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
maintenance = { status = "actively-developed" }

[features]
nightly = []

[dependencies]
amadeus-core = { version = "=0.3.1", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.1", path = "../amadeus-types" }
Expand All @@ -36,6 +34,7 @@ linked-hash-map = "0.5"
lz4 = "1.23"
num-bigint = "0.3"
quick-error = "1.2.2"
rustversion = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.3"
snap = "1.0"
Expand All @@ -46,6 +45,9 @@ zstd = "0.4"
[dev-dependencies]
rand = "0.7"

[build-dependencies]
rustversion = "1.0"

[[test]]
name = "derive"
test = false
1 change: 1 addition & 0 deletions amadeus-parquet/src/build.rs
58 changes: 29 additions & 29 deletions amadeus-parquet/src/internal/record/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,32 +181,32 @@ macro_rules! array {
}
}

// Specialize the implementation to avoid passing a potentially large array around
// TODO: Specialize the implementation to avoid passing a potentially large array around
// on the stack.
impl ParquetData for Box<[u8; $i]> {
type Schema = FixedByteArraySchema<[u8; $i]>;
type Reader = BoxFixedLenByteArrayReader<[u8; $i]>;

fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
<[u8; $i]>::parse(schema, predicate, repetition)
}

fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
BoxFixedLenByteArrayReader::<[u8; $i]> {
column: TypedTripletIter::<FixedLenByteArrayType>::new(
def_level, rep_level, col_reader, batch_size,
),
marker: PhantomData,
}
}
}
// impl ParquetData for Box<[u8; $i]> {
// type Schema = FixedByteArraySchema<[u8; $i]>;
// type Reader = BoxFixedLenByteArrayReader<[u8; $i]>;

// fn parse(
// schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
// ) -> Result<(String, Self::Schema)> {
// <[u8; $i]>::parse(schema, predicate, repetition)
// }

// fn reader(
// _schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
// paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
// ) -> Self::Reader {
// let col_path = ColumnPath::new(path.to_vec());
// let col_reader = paths.remove(&col_path).unwrap();
// BoxFixedLenByteArrayReader::<[u8; $i]> {
// column: TypedTripletIter::<FixedLenByteArrayType>::new(
// def_level, rep_level, col_reader, batch_size,
// ),
// marker: PhantomData,
// }
// }
// }
)*};
}
amadeus_types::array!(array);
Expand All @@ -219,18 +219,18 @@ impl<T> ParquetData for Box<T>
where
T: ParquetData,
{
default type Schema = BoxSchema<T::Schema>;
default type Reader = BoxReader<T::Reader>;
type Schema = BoxSchema<T::Schema>;
type Reader = BoxReader<T::Reader>;
type Predicate = T::Predicate;

default fn parse(
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
T::parse(schema, predicate, repetition)
.map(|(name, schema)| (name, type_coerce(BoxSchema(schema))))
}

default fn reader(
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
Expand Down
8 changes: 6 additions & 2 deletions amadeus-parquet/src/internal/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::{
cell::RefCell, cmp, fs::File, io::{self, BufWriter, Cursor, Initializer, Read, Seek, SeekFrom, Write}, rc::Rc
cell::RefCell, cmp, fs::File, io::{self, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, rc::Rc
};

use crate::internal::file::reader::ParquetReader;
Expand Down Expand Up @@ -57,7 +57,8 @@ impl<R: ParquetReader> Read for BufReader<R> {
self.offset += read as u64;
Ok(read)
}
unsafe fn initializer(&self) -> Initializer {
#[cfg(nightly)]
unsafe fn initializer(&self) -> std::io::Initializer {
self.inner.initializer()
}
}
Expand All @@ -68,6 +69,9 @@ impl<R: ParquetReader> Seek for BufReader<R> {
SeekFrom::Current(n) => n,
SeekFrom::End(n) => self.len as i64 + n - self.offset as i64,
};
#[cfg(not(nightly))]
let _ = self.inner.seek(SeekFrom::Current(offset))?;
#[cfg(nightly)]
self.inner.seek_relative(offset)?;
self.offset = (self.offset as i64 + offset) as u64;
Ok(self.offset)
Expand Down
Loading

0 comments on commit e0da6b2

Please sign in to comment.