Skip to content

Commit

Permalink
Merge pull request #80 from constellation-rs/groupby2
Browse files Browse the repository at this point in the history
group_by 2.0
  • Loading branch information
alecmocatta authored Jul 6, 2020
2 parents 0ae116d + 33cd62f commit f847ead
Show file tree
Hide file tree
Showing 86 changed files with 2,651 additions and 2,642 deletions.
23 changes: 12 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.2.5"
version = "0.3.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -14,7 +14,7 @@ parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common cr
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.2.5"
documentation = "https://docs.rs/amadeus/0.3.0"
readme = "README.md"
edition = "2018"

Expand All @@ -35,14 +35,15 @@ json = ["amadeus-serde", "amadeus-derive/serde"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.2.5", path = "amadeus-core" }
amadeus-derive = { version = "=0.2.5", path = "amadeus-derive" }
amadeus-types = { version = "=0.2.5", path = "amadeus-types" }
amadeus-aws = { version = "=0.2.5", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.2.5", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.2.5", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.2.5", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.2.5", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.3.0", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.0", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.0", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.0", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.0", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.0", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.0", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.0", path = "amadeus-serde", optional = true }
async-std = { version = "1.6", features = ["unstable"] }
constellation-rs = { version = "0.1", default-features = false, optional = true }
derive-new = "0.5"
doc-comment = "0.3"
Expand All @@ -52,7 +53,7 @@ pin-project = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.2"
serde_traitobject = { version = "0.2", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util"] }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] }

[dev-dependencies]
either = { version = "1.5", features = ["serde"] }
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</p>

<p align="center">
<a href="https://docs.rs/amadeus/0.2.3/amadeus/">📖 Docs</a> | <a href="https://constellation.rs/amadeus">🌐 Home</a> | <a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬 Chat</a>
<a href="https://docs.rs/amadeus">📖 Docs</a> | <a href="https://constellation.rs/amadeus">🌐 Home</a> | <a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬 Chat</a>
</p>

## Amadeus provides:
Expand Down Expand Up @@ -50,7 +50,7 @@ We aim to create a community that is welcoming and helpful to anyone that is int

Amadeus has deep, pluggable, integration with various file formats, databases and interfaces:

| Data format | [`Source`](https://docs.rs/amadeus/0.2.5/amadeus/trait.Source.html) | [`Destination`](https://docs.rs/amadeus/0.2.5/amadeus/trait.Destination.html) |
| Data format | [`Source`](https://docs.rs/amadeus/0.3/amadeus/trait.Source.html) | [`Destination`](https://docs.rs/amadeus/0.3/amadeus/trait.Destination.html) |
|---|---|---|
| CSV |||
| JSON |||
Expand Down Expand Up @@ -149,7 +149,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let row = row.ok()?.into_group().ok()?;
row.get("uri")?.clone().into_url().ok()
})
.filter(|row| futures::future::ready(row.is_some()))
.filter(|row| row.is_some())
.map(Option::unwrap)
.most_frequent(&pool, 100, 0.99, 0.002)
.await;
Expand Down
9 changes: 5 additions & 4 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.2.5"
version = "0.3.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.2.5"
documentation = "https://docs.rs/amadeus/0.3.0"
readme = "README.md"
edition = "2018"

Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" }
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.2.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.2.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.0", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.0", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false }
Expand All @@ -35,6 +35,7 @@ serde_closure = "0.2"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.2"
url = { version = "2.1", features = ["serde"] }
vec-utils = "0.2"

# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation
[target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies]
Expand Down
15 changes: 8 additions & 7 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![allow(unused_qualifications)]

use async_compression::futures::bufread::GzipDecoder;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use futures::{future, io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use http::{Method, StatusCode};
use rusoto_s3::{GetObjectRequest, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use serde_closure::*;
use serde_closure::FnMut;
use std::{
convert::identity, io::{self}, time::Duration
};
use vec_utils::VecExt;

use amadeus_core::{
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source
Expand Down Expand Up @@ -39,9 +42,7 @@ impl Cloudfront {

let objects = list(&client, &bucket, &prefix)
.await?
.into_iter()
.map(|object: Object| object.key.unwrap())
.collect();
.map(|object: Object| object.key.unwrap());

Ok(Self {
region,
Expand Down Expand Up @@ -277,7 +278,7 @@ mod http_serde {
use http::{Method, StatusCode};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub struct Serde<T>(T);
pub(crate) struct Serde<T>(T);

impl Serialize for Serde<&Method> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -325,14 +326,14 @@ mod http_serde {
}
}

pub fn serialize<T, S>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
pub(crate) fn serialize<T, S>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
where
for<'a> Serde<&'a T>: Serialize,
S: Serializer,
{
Serde(t).serialize(serializer)
}
pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
Serde<T>: Deserialize<'de>,
D: Deserializer<'de>,
Expand Down
4 changes: 2 additions & 2 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Directory for S3Directory {
|| (current_path.depth() > 0
&& current_path.last().unwrap() != path[current_path.depth() - 1])
{
current_path.pop().unwrap();
let _ = current_path.pop().unwrap();
}
while path.len() > current_path.depth() {
current_path.push(path[current_path.depth()]);
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Page for S3Page {
let mut buf_ = vec![0; len].into_boxed_slice();
let mut buf = &mut *buf_;
let len: u64 = len.try_into().unwrap();
let mut pos = 0u64;
let mut pos = 0_u64;
let mut errors: usize = 0;
let end = offset + len - 1;
while !buf.is_empty() {
Expand Down
53 changes: 41 additions & 12 deletions amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.2.5")]
//! Harmonious distributed data processing & analysis in Rust.
//!
//! <p style="font-family: 'Fira Sans',sans-serif;padding:0.3em 0"><strong>
//! <a href="https://crates.io/crates/amadeus">📦&nbsp;&nbsp;Crates.io</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://github.com/constellation-rs/amadeus">📑&nbsp;&nbsp;GitHub</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬&nbsp;&nbsp;Chat</a>
//! </strong></p>
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.0")]
#![feature(type_alias_impl_trait)]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
// missing_docs,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications,
unused_results,
unreachable_pub,
clippy::pedantic,
)]
#![allow(
clippy::module_name_repetitions,
clippy::if_not_else,
clippy::too_many_lines,
clippy::must_use_candidate,
clippy::type_repetition_in_bounds,
clippy::filter_map,
clippy::missing_errors_doc
)]
#![deny(unsafe_code)]

mod cloudfront;
mod file;
Expand Down Expand Up @@ -144,7 +173,7 @@ pub enum AwsError {
NoSuchBucket(String),
NoSuchKey(String),
HttpDispatch(rusoto_core::request::HttpDispatchError),
Credentials(rusoto_credential::CredentialsError),
Credentials(CredentialsError),
Validation(String),
ParseError(String),
Unknown(rusoto_core::request::BufferedHttpResponse),
Expand All @@ -156,8 +185,8 @@ impl Clone for AwsError {
Self::NoSuchBucket(err) => Self::NoSuchBucket(err.clone()),
Self::NoSuchKey(err) => Self::NoSuchKey(err.clone()),
Self::HttpDispatch(err) => Self::HttpDispatch(err.clone()),
Self::Credentials(rusoto_credential::CredentialsError { message }) => {
Self::Credentials(rusoto_credential::CredentialsError {
Self::Credentials(CredentialsError { message }) => {
Self::Credentials(CredentialsError {
message: message.clone(),
})
}
Expand All @@ -179,12 +208,12 @@ impl Clone for AwsError {
impl PartialEq for AwsError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::NoSuchBucket(a), Self::NoSuchBucket(b)) => a == b,
(Self::NoSuchKey(a), Self::NoSuchKey(b)) => a == b,
(Self::NoSuchBucket(a), Self::NoSuchBucket(b))
| (Self::NoSuchKey(a), Self::NoSuchKey(b))
| (Self::Validation(a), Self::Validation(b))
| (Self::ParseError(a), Self::ParseError(b)) => a == b,
(Self::HttpDispatch(a), Self::HttpDispatch(b)) => a == b,
(Self::Credentials(a), Self::Credentials(b)) => a == b,
(Self::Validation(a), Self::Validation(b)) => a == b,
(Self::ParseError(a), Self::ParseError(b)) => a == b,
(Self::Unknown(a), Self::Unknown(b)) => a == b,
(Self::Io(a), Self::Io(b)) => a == b,
_ => false,
Expand All @@ -195,12 +224,12 @@ impl error::Error for AwsError {}
impl Display for AwsError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::NoSuchBucket(err) => err.fmt(f),
Self::NoSuchKey(err) => err.fmt(f),
Self::NoSuchBucket(err)
| Self::NoSuchKey(err)
| Self::Validation(err)
| Self::ParseError(err) => err.fmt(f),
Self::HttpDispatch(err) => err.fmt(f),
Self::Credentials(err) => err.fmt(f),
Self::Validation(err) => err.fmt(f),
Self::ParseError(err) => err.fmt(f),
Self::Unknown(err) => fmt::Debug::fmt(err, f),
Self::Io(err) => err.fmt(f),
}
Expand Down
8 changes: 4 additions & 4 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-commoncrawl"
version = "0.2.5"
version = "0.3.0"
license = "MIT OR Apache-2.0"
authors = ["Stephen Becker IV <github@deathbyescalator.com>", "Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.2.5"
documentation = "https://docs.rs/amadeus/0.3.0"
readme = "README.md"
edition = "2018"

Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" }
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.2.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.2.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.0", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.0", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
futures = "0.3"
nom = "4.2.3"
Expand Down
12 changes: 6 additions & 6 deletions amadeus-commoncrawl/src/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const CHOMP: usize = 2 << 13; // 8 KiB

#[pin_project]
#[derive(Clone, Debug)]
pub struct WarcParser<I> {
pub(crate) struct WarcParser<I> {
#[pin]
input: I,
state: WarcParserState,
Expand All @@ -30,7 +30,7 @@ enum WarcParserState {
Done,
}
impl<I> WarcParser<I> {
pub fn new(input: I) -> WarcParser<I> {
pub(crate) fn new(input: I) -> WarcParser<I> {
WarcParser {
input,
state: WarcParserState::Info,
Expand All @@ -43,7 +43,7 @@ impl<I> WarcParser<I>
where
I: Read,
{
pub fn next_borrowed(&mut self) -> Result<Option<Webpage<'_>>, io::Error> {
pub(crate) fn next_borrowed(&mut self) -> Result<Option<Webpage<'_>>, io::Error> {
if let WarcParserState::Done = self.state {
return Ok(None);
}
Expand All @@ -68,7 +68,7 @@ where
}

loop {
self.res.splice(..self.offset, iter::empty());
let _ = self.res.splice(..self.offset, iter::empty());
self.offset = 0;
if self.offset == self.res.len() {
continue 'chomp;
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<I> WarcParser<I>
where
I: AsyncRead,
{
pub fn poll_next_borrowed(
pub(crate) fn poll_next_borrowed(
self: Pin<&mut Self>, cx: &mut Context,
) -> Poll<Result<Option<Webpage<'_>>, io::Error>> {
let mut self_ = self.project();
Expand All @@ -149,7 +149,7 @@ where
}

loop {
self_.res.splice(..*self_.offset, iter::empty());
let _ = self_.res.splice(..*self_.offset, iter::empty());
*self_.offset = 0;
if *self_.offset == self_.res.len() {
continue 'chomp;
Expand Down
29 changes: 27 additions & 2 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.2.5")]
//! Harmonious distributed data processing & analysis in Rust.
//!
//! <p style="font-family: 'Fira Sans',sans-serif;padding:0.3em 0"><strong>
//! <a href="https://crates.io/crates/amadeus">📦&nbsp;&nbsp;Crates.io</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://github.com/constellation-rs/amadeus">📑&nbsp;&nbsp;GitHub</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬&nbsp;&nbsp;Chat</a>
//! </strong></p>
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.0")]
#![feature(type_alias_impl_trait)]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
// missing_docs,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications,
unused_results,
unreachable_pub,
clippy::pedantic,
)]
#![allow(
clippy::doc_markdown,
clippy::inline_always,
clippy::missing_errors_doc
)]
#![deny(unsafe_code)]

mod commoncrawl;
mod parser;

use async_compression::futures::bufread::GzipDecoder; // TODO: use stream or https://github.com/alexcrichton/flate2-rs/pull/214
use futures::{io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use reqwest_resume::ClientExt;
use serde_closure::*;
use serde_closure::FnMut;
use std::{io, time};

use amadeus_core::{
Expand Down
Loading

0 comments on commit f847ead

Please sign in to comment.