Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

group_by 2.0 #80

Merged
merged 8 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.2.5"
version = "0.3.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -14,7 +14,7 @@ parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common cr
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.2.5"
documentation = "https://docs.rs/amadeus/0.3.0"
readme = "README.md"
edition = "2018"

Expand All @@ -35,14 +35,15 @@ json = ["amadeus-serde", "amadeus-derive/serde"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.2.5", path = "amadeus-core" }
amadeus-derive = { version = "=0.2.5", path = "amadeus-derive" }
amadeus-types = { version = "=0.2.5", path = "amadeus-types" }
amadeus-aws = { version = "=0.2.5", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.2.5", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.2.5", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.2.5", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.2.5", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.3.0", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.0", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.0", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.0", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.0", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.0", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.0", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.0", path = "amadeus-serde", optional = true }
async-std = { version = "1.6", features = ["unstable"] }
constellation-rs = { version = "0.1", default-features = false, optional = true }
derive-new = "0.5"
doc-comment = "0.3"
Expand All @@ -52,7 +53,7 @@ pin-project = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.2"
serde_traitobject = { version = "0.2", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util"] }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] }

[dev-dependencies]
either = { version = "1.5", features = ["serde"] }
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</p>

<p align="center">
<a href="https://docs.rs/amadeus/0.2.3/amadeus/">📖 Docs</a> | <a href="https://constellation.rs/amadeus">🌐 Home</a> | <a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬 Chat</a>
<a href="https://docs.rs/amadeus">📖 Docs</a> | <a href="https://constellation.rs/amadeus">🌐 Home</a> | <a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬 Chat</a>
</p>

## Amadeus provides:
Expand Down Expand Up @@ -50,7 +50,7 @@ We aim to create a community that is welcoming and helpful to anyone that is int

Amadeus has deep, pluggable, integration with various file formats, databases and interfaces:

| Data format | [`Source`](https://docs.rs/amadeus/0.2.5/amadeus/trait.Source.html) | [`Destination`](https://docs.rs/amadeus/0.2.5/amadeus/trait.Destination.html) |
| Data format | [`Source`](https://docs.rs/amadeus/0.3/amadeus/trait.Source.html) | [`Destination`](https://docs.rs/amadeus/0.3/amadeus/trait.Destination.html) |
|---|---|---|
| CSV | ✔ | ✔ |
| JSON | ✔ | ✔ |
Expand Down Expand Up @@ -149,7 +149,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let row = row.ok()?.into_group().ok()?;
row.get("uri")?.clone().into_url().ok()
})
.filter(|row| futures::future::ready(row.is_some()))
.filter(|row| row.is_some())
.map(Option::unwrap)
.most_frequent(&pool, 100, 0.99, 0.002)
.await;
Expand Down
9 changes: 5 additions & 4 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.2.5"
version = "0.3.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.2.5"
documentation = "https://docs.rs/amadeus/0.3.0"
readme = "README.md"
edition = "2018"

Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" }
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.2.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.2.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.0", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.0", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false }
Expand All @@ -35,6 +35,7 @@ serde_closure = "0.2"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.2"
url = { version = "2.1", features = ["serde"] }
vec-utils = "0.2"

# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation
[target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies]
Expand Down
15 changes: 8 additions & 7 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![allow(unused_qualifications)]

use async_compression::futures::bufread::GzipDecoder;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use futures::{future, io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use http::{Method, StatusCode};
use rusoto_s3::{GetObjectRequest, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use serde_closure::*;
use serde_closure::FnMut;
use std::{
convert::identity, io::{self}, time::Duration
};
use vec_utils::VecExt;

use amadeus_core::{
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source
Expand Down Expand Up @@ -39,9 +42,7 @@ impl Cloudfront {

let objects = list(&client, &bucket, &prefix)
.await?
.into_iter()
.map(|object: Object| object.key.unwrap())
.collect();
.map(|object: Object| object.key.unwrap());

Ok(Self {
region,
Expand Down Expand Up @@ -277,7 +278,7 @@ mod http_serde {
use http::{Method, StatusCode};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub struct Serde<T>(T);
pub(crate) struct Serde<T>(T);

impl Serialize for Serde<&Method> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -325,14 +326,14 @@ mod http_serde {
}
}

pub fn serialize<T, S>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
pub(crate) fn serialize<T, S>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
where
for<'a> Serde<&'a T>: Serialize,
S: Serializer,
{
Serde(t).serialize(serializer)
}
pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
Serde<T>: Deserialize<'de>,
D: Deserializer<'de>,
Expand Down
4 changes: 2 additions & 2 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Directory for S3Directory {
|| (current_path.depth() > 0
&& current_path.last().unwrap() != path[current_path.depth() - 1])
{
current_path.pop().unwrap();
let _ = current_path.pop().unwrap();
}
while path.len() > current_path.depth() {
current_path.push(path[current_path.depth()]);
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Page for S3Page {
let mut buf_ = vec![0; len].into_boxed_slice();
let mut buf = &mut *buf_;
let len: u64 = len.try_into().unwrap();
let mut pos = 0u64;
let mut pos = 0_u64;
let mut errors: usize = 0;
let end = offset + len - 1;
while !buf.is_empty() {
Expand Down
53 changes: 41 additions & 12 deletions amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.2.5")]
//! Harmonious distributed data processing & analysis in Rust.
//!
//! <p style="font-family: 'Fira Sans',sans-serif;padding:0.3em 0"><strong>
//! <a href="https://crates.io/crates/amadeus">📦&nbsp;&nbsp;Crates.io</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://github.com/constellation-rs/amadeus">📑&nbsp;&nbsp;GitHub</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬&nbsp;&nbsp;Chat</a>
//! </strong></p>
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.0")]
#![feature(type_alias_impl_trait)]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
// missing_docs,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications,
unused_results,
unreachable_pub,
clippy::pedantic,
)]
#![allow(
clippy::module_name_repetitions,
clippy::if_not_else,
clippy::too_many_lines,
clippy::must_use_candidate,
clippy::type_repetition_in_bounds,
clippy::filter_map,
clippy::missing_errors_doc
)]
#![deny(unsafe_code)]

mod cloudfront;
mod file;
Expand Down Expand Up @@ -144,7 +173,7 @@ pub enum AwsError {
NoSuchBucket(String),
NoSuchKey(String),
HttpDispatch(rusoto_core::request::HttpDispatchError),
Credentials(rusoto_credential::CredentialsError),
Credentials(CredentialsError),
Validation(String),
ParseError(String),
Unknown(rusoto_core::request::BufferedHttpResponse),
Expand All @@ -156,8 +185,8 @@ impl Clone for AwsError {
Self::NoSuchBucket(err) => Self::NoSuchBucket(err.clone()),
Self::NoSuchKey(err) => Self::NoSuchKey(err.clone()),
Self::HttpDispatch(err) => Self::HttpDispatch(err.clone()),
Self::Credentials(rusoto_credential::CredentialsError { message }) => {
Self::Credentials(rusoto_credential::CredentialsError {
Self::Credentials(CredentialsError { message }) => {
Self::Credentials(CredentialsError {
message: message.clone(),
})
}
Expand All @@ -179,12 +208,12 @@ impl Clone for AwsError {
impl PartialEq for AwsError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::NoSuchBucket(a), Self::NoSuchBucket(b)) => a == b,
(Self::NoSuchKey(a), Self::NoSuchKey(b)) => a == b,
(Self::NoSuchBucket(a), Self::NoSuchBucket(b))
| (Self::NoSuchKey(a), Self::NoSuchKey(b))
| (Self::Validation(a), Self::Validation(b))
| (Self::ParseError(a), Self::ParseError(b)) => a == b,
(Self::HttpDispatch(a), Self::HttpDispatch(b)) => a == b,
(Self::Credentials(a), Self::Credentials(b)) => a == b,
(Self::Validation(a), Self::Validation(b)) => a == b,
(Self::ParseError(a), Self::ParseError(b)) => a == b,
(Self::Unknown(a), Self::Unknown(b)) => a == b,
(Self::Io(a), Self::Io(b)) => a == b,
_ => false,
Expand All @@ -195,12 +224,12 @@ impl error::Error for AwsError {}
impl Display for AwsError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::NoSuchBucket(err) => err.fmt(f),
Self::NoSuchKey(err) => err.fmt(f),
Self::NoSuchBucket(err)
| Self::NoSuchKey(err)
| Self::Validation(err)
| Self::ParseError(err) => err.fmt(f),
Self::HttpDispatch(err) => err.fmt(f),
Self::Credentials(err) => err.fmt(f),
Self::Validation(err) => err.fmt(f),
Self::ParseError(err) => err.fmt(f),
Self::Unknown(err) => fmt::Debug::fmt(err, f),
Self::Io(err) => err.fmt(f),
}
Expand Down
8 changes: 4 additions & 4 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-commoncrawl"
version = "0.2.5"
version = "0.3.0"
license = "MIT OR Apache-2.0"
authors = ["Stephen Becker IV <github@deathbyescalator.com>", "Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.2.5"
documentation = "https://docs.rs/amadeus/0.3.0"
readme = "README.md"
edition = "2018"

Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" }
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.2.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.2.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.0", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.0", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
futures = "0.3"
nom = "4.2.3"
Expand Down
12 changes: 6 additions & 6 deletions amadeus-commoncrawl/src/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const CHOMP: usize = 2 << 13; // 8 KiB

#[pin_project]
#[derive(Clone, Debug)]
pub struct WarcParser<I> {
pub(crate) struct WarcParser<I> {
#[pin]
input: I,
state: WarcParserState,
Expand All @@ -30,7 +30,7 @@ enum WarcParserState {
Done,
}
impl<I> WarcParser<I> {
pub fn new(input: I) -> WarcParser<I> {
pub(crate) fn new(input: I) -> WarcParser<I> {
WarcParser {
input,
state: WarcParserState::Info,
Expand All @@ -43,7 +43,7 @@ impl<I> WarcParser<I>
where
I: Read,
{
pub fn next_borrowed(&mut self) -> Result<Option<Webpage<'_>>, io::Error> {
pub(crate) fn next_borrowed(&mut self) -> Result<Option<Webpage<'_>>, io::Error> {
if let WarcParserState::Done = self.state {
return Ok(None);
}
Expand All @@ -68,7 +68,7 @@ where
}

loop {
self.res.splice(..self.offset, iter::empty());
let _ = self.res.splice(..self.offset, iter::empty());
self.offset = 0;
if self.offset == self.res.len() {
continue 'chomp;
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<I> WarcParser<I>
where
I: AsyncRead,
{
pub fn poll_next_borrowed(
pub(crate) fn poll_next_borrowed(
self: Pin<&mut Self>, cx: &mut Context,
) -> Poll<Result<Option<Webpage<'_>>, io::Error>> {
let mut self_ = self.project();
Expand All @@ -149,7 +149,7 @@ where
}

loop {
self_.res.splice(..*self_.offset, iter::empty());
let _ = self_.res.splice(..*self_.offset, iter::empty());
*self_.offset = 0;
if *self_.offset == self_.res.len() {
continue 'chomp;
Expand Down
29 changes: 27 additions & 2 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.2.5")]
//! Harmonious distributed data processing & analysis in Rust.
//!
//! <p style="font-family: 'Fira Sans',sans-serif;padding:0.3em 0"><strong>
//! <a href="https://crates.io/crates/amadeus">📦&nbsp;&nbsp;Crates.io</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://github.com/constellation-rs/amadeus">📑&nbsp;&nbsp;GitHub</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬&nbsp;&nbsp;Chat</a>
//! </strong></p>
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.0")]
#![feature(type_alias_impl_trait)]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
// missing_docs,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications,
unused_results,
unreachable_pub,
clippy::pedantic,
)]
#![allow(
clippy::doc_markdown,
clippy::inline_always,
clippy::missing_errors_doc
)]
#![deny(unsafe_code)]

mod commoncrawl;
mod parser;

use async_compression::futures::bufread::GzipDecoder; // TODO: use stream or https://github.com/alexcrichton/flate2-rs/pull/214
use futures::{io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use reqwest_resume::ClientExt;
use serde_closure::*;
use serde_closure::FnMut;
use std::{io, time};

use amadeus_core::{
Expand Down
Loading