Skip to content

Commit

Permalink
Ignore last test, fix cargo clippy, format and pass integration tes…
Browse files Browse the repository at this point in the history
…ts (apache#10)

* Fix tests

* Ignore last test, fix clippy, fmt and enable integration

* more clippy fix
  • Loading branch information
yjshen authored Sep 28, 2021
1 parent f9504e7 commit 33b6931
Show file tree
Hide file tree
Showing 22 changed files with 78 additions and 71 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ members = [
exclude = ["python"]

[patch.crates-io]
arrow2 = { path = "/home/houqp/Documents/code/arrow/arrow2" }
arrow-flight = { path = "/home/houqp/Documents/code/arrow/arrow2/arrow-flight" }
parquet2 = { path = "/home/houqp/Documents/code/arrow/parquet2" }
arrow2 = { git = "https://github.com/houqp/arrow2.git", branch = "qp_ord" }
arrow-flight = { git = "https://github.com/houqp/arrow2.git", branch = "qp_ord" }
2 changes: 1 addition & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ mod tests {
fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> {
self.fields()
.iter()
.position(|c| c.name() == &column_name)
.position(|c| c.name() == column_name)
.map(|pos| self.values()[pos].borrow())
}
}
Expand Down
7 changes: 4 additions & 3 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;

use arrow::io::parquet::write::{Compression, Version, WriteOptions};
use ballista::prelude::{
BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -179,7 +182,7 @@ async fn main() -> Result<()> {
env_logger::init();
match TpchOpt::from_args() {
TpchOpt::Benchmark(BallistaBenchmark(opt)) => {
todo!() //benchmark_ballista(opt).await.map(|_| ())
benchmark_ballista(opt).await.map(|_| ())
}
TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
benchmark_datafusion(opt).await.map(|_| ())
Expand Down Expand Up @@ -239,7 +242,6 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
Ok(result)
}

/*
async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
println!("Running benchmarks with the following options: {:?}", opt);

Expand Down Expand Up @@ -316,7 +318,6 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {

Ok(())
}
*/

fn get_query_sql(query: usize) -> Result<String> {
if query > 0 && query < 23 {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use datafusion::arrow::{
record_batch::RecordBatch,
};

use arrow::array::Array;
use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use std::sync::Arc;
use arrow::array::Array;

// create local execution context with an in-memory table
fn create_context() -> Result<ExecutionContext> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/benches/data_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ fn create_record_batch(
vec![
Arc::new(Utf8Array::<i32>::from_slice(keys)),
Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
Arc::new(Float64Array::from_slice(values)),
Arc::new(UInt64Array::from_slice(integer_values_wide)),
Arc::new(Float64Array::from(values)),
Arc::new(UInt64Array::from(integer_values_wide)),
Arc::new(UInt64Array::from_slice(integer_values_narrow)),
],
)
Expand Down
14 changes: 7 additions & 7 deletions datafusion/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use criterion::{BatchSize, Criterion};
extern crate arrow;
extern crate datafusion;

use std::{iter::FromIterator, sync::Arc};
use std::sync::Arc;

use arrow::{
array::{ArrayRef, Int64Array, StringArray},
array::{ArrayRef, Int64Array, Utf8Array},
record_batch::RecordBatch,
};
use tokio::runtime::Runtime;
Expand All @@ -39,7 +39,7 @@ use datafusion::physical_plan::{
// Initialise the operator using the provided record batches and the sort key
// as inputs. All record batches must have the same schema.
fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
let schema = batches[0].schema();
let schema = batches[0].schema().clone();

let sort = sort
.iter()
Expand All @@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
schema.clone(),
schema,
None,
)
.unwrap();
Expand Down Expand Up @@ -104,9 +104,9 @@ fn batches(
col_b.sort();
col_c.sort();

let col_a: ArrayRef = Arc::new(StringArray::from_iter(col_a));
let col_b: ArrayRef = Arc::new(StringArray::from_iter(col_b));
let col_c: ArrayRef = Arc::new(StringArray::from_iter(col_c));
let col_a: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_a));
let col_b: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_b));
let col_c: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_c));
let col_d: ArrayRef = Arc::new(Int64Array::from(col_d));

let rb = RecordBatch::try_from_iter(vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/arrow_temporal_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
// Note: Use chrono APIs that are different than
// naive_datetime_to_timestamp to compute the utc offset to
// try and double check the logic
let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) {
let utc_offset_secs = match Local.offset_from_local_datetime(naive_datetime) {
LocalResult::Single(local_offset) => {
local_offset.fix().local_minus_utc() as i64
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ impl DataFrame for DataFrameImpl {
/// Print results.
async fn show(&self) -> Result<()> {
let results = self.collect().await?;
Ok(print::print(&results))
print::print(&results);
Ok(())
}

/// Print results and limit rows.
async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(num)?.collect().await?;
Ok(print::print(&results))
print::print(&results);
Ok(())
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
Expand Down
11 changes: 5 additions & 6 deletions datafusion/src/physical_plan/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

use crate::error::{DataFusionError, Result};
use arrow::array::*;
use arrow::compute::concat;
use arrow::datatypes::DataType;
use std::sync::Arc;

use super::ColumnarValue;

Expand All @@ -35,7 +33,10 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {

macro_rules! array {
($PRIMITIVE: ty, $ARRAY: ty, $DATA_TYPE: path) => {{
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(first.len() * size, $DATA_TYPE);
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(
first.len() * size,
$DATA_TYPE,
);
let mut array = MutableFixedSizeListArray::new(array, size);
// for each entry in the array
for index in 0..first.len() {
Expand Down Expand Up @@ -73,7 +74,6 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
}};
}


match first.data_type() {
DataType::Boolean => {
let array = MutableBooleanArray::with_capacity(first.len() * size);
Expand All @@ -91,7 +91,7 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
}
}
Ok(array.as_arc())
},
}
DataType::UInt8 => array!(u8, PrimitiveArray<u8>, DataType::UInt8),
DataType::UInt16 => array!(u16, PrimitiveArray<u16>, DataType::UInt16),
DataType::UInt32 => array!(u32, PrimitiveArray<u32>, DataType::UInt32),
Expand All @@ -109,7 +109,6 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
data_type
))),
}

}

/// put values in an array.
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,18 @@ impl CsvExec {
filenames: &[String],
options: &CsvReadOptions,
) -> Result<Schema> {
Ok(infer_schema_from_files(
infer_schema_from_files(
filenames,
options.delimiter,
Some(options.schema_infer_max_records),
options.has_header,
)?)
)
}
}

type Payload = ArrowResult<RecordBatch>;

#[allow(clippy::too_many_arguments)]
fn producer_task<R: Read>(
reader: R,
response_tx: Sender<Payload>,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,8 @@ fn evaluate_scalar(
Ok(None)
}
}
} else if matches!(op, Or) {
// TODO: optimize scalar Or
Ok(None)
} else if matches!(op, And) {
// TODO: optimize scalar And
} else if matches!(op, Or | And) {
// TODO: optimize scalar Or | And
Ok(None)
} else {
match (lhs.data_type(), op) {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
};

use arrow::array::{BooleanArray, Array};
use arrow::array::{Array, BooleanArray};
use arrow::compute::filter::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use async_trait::async_trait;

use futures::stream::{Stream, StreamExt};
use arrow::compute::boolean::{and, is_not_null};
use futures::stream::{Stream, StreamExt};

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ pub fn create_physical_fun(
))),
}),
BuiltinScalarFunction::BitLength => Arc::new(|args| match &args[0] {
ColumnarValue::Array(v) => todo!(),
ColumnarValue::Array(_v) => todo!(),
ColumnarValue::Scalar(v) => match v {
ScalarValue::Utf8(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int32(
v.as_ref().map(|x| (x.len() * 8) as i32),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ impl RecordBatchStream for HashAggregateStream {

/// Given Vec<Vec<ArrayRef>>, concatenates the inners `Vec<ArrayRef>` into `ArrayRef`, returning `Vec<ArrayRef>`
/// This assumes that `arrays` is not empty.
#[allow(dead_code)]
fn concatenate(arrays: Vec<Vec<ArrayRef>>) -> ArrowResult<Vec<ArrayRef>> {
(0..arrays[0].len())
.map(|column| {
Expand Down Expand Up @@ -968,7 +969,7 @@ fn create_batch_from_map(
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| {
arrow::compute::cast::cast(col.as_ref(), desired_field.data_type())
.map(|v| Arc::from(v))
.map(Arc::from)
})
.collect::<ArrowResult<Vec<_>>>()?;

Expand Down
11 changes: 7 additions & 4 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Execution plan for reading Parquet files

/// FIXME: https://github.com/apache/arrow-datafusion/issues/1058
use fmt::Debug;
use std::fmt;
use std::fs::File;
Expand Down Expand Up @@ -47,7 +48,7 @@ use log::debug;
use parquet::statistics::{
BinaryStatistics as ParquetBinaryStatistics,
BooleanStatistics as ParquetBooleanStatistics,
PrimitiveStatistics as ParquetPrimitiveStatistics, Statistics as ParquetStatistics,
PrimitiveStatistics as ParquetPrimitiveStatistics,
};

use tokio::{
Expand Down Expand Up @@ -294,6 +295,7 @@ impl ParquetFileMetrics {

type Payload = ArrowResult<RecordBatch>;

#[allow(dead_code)]
fn producer_task(
path: &str,
response_tx: Sender<Payload>,
Expand Down Expand Up @@ -416,6 +418,7 @@ impl ExecutionPlan for ParquetExec {
}
}

#[allow(dead_code)]
fn send_result(
response_tx: &Sender<ArrowResult<RecordBatch>>,
result: ArrowResult<RecordBatch>,
Expand Down Expand Up @@ -520,7 +523,7 @@ macro_rules! get_min_max_values {
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok().map(|v| Arc::from(v))
ScalarValue::iter_to_array(scalar_values).ok().map(Arc::from)
}}
}

Expand Down Expand Up @@ -575,7 +578,7 @@ fn read_partition(
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
batch_size: usize,
_batch_size: usize,
response_tx: Sender<ArrowResult<RecordBatch>>,
limit: Option<usize>,
) -> Result<()> {
Expand All @@ -593,7 +596,7 @@ fn read_partition(
)?;

if let Some(predicate_builder) = predicate_builder {
let file_metadata = reader.metadata();
let _file_metadata = reader.metadata();
reader.set_groups_filter(Arc::new(build_row_group_predicate(
predicate_builder,
file_metrics,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{Array, ArrayRef, UInt32Array, UInt64Array, Utf8Array},
array::{Array, UInt64Array},
error::Result as ArrowResult,
};
use arrow::{compute::take, datatypes::SchemaRef};
Expand Down Expand Up @@ -462,6 +462,7 @@ mod tests {
physical_plan::{expressions::col, memory::MemoryExec},
test::exec::{BarrierExec, ErrorExec, MockExec},
};
use arrow::array::{ArrayRef, UInt32Array, Utf8Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,11 @@ mod tests {
let schema = partitions[0][0].schema();
let sort = vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
expr: col("b", schema).unwrap(),
options: Default::default(),
},
PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("c", schema).unwrap(),
options: Default::default(),
},
];
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/windows/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl AggregateWindowExpr {
.collect::<Vec<ArrayRef>>();
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat::concatenate(&results)
.map(|x| ArrayRef::from(x))
.map(ArrayRef::from)
.map_err(DataFusionError::ArrowError)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/windows/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl WindowExpr for BuiltInWindowExpr {
};
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat::concatenate(&results)
.map(|x| ArrayRef::from(x))
.map(ArrayRef::from)
.map_err(DataFusionError::ArrowError)
}
}
Loading

0 comments on commit 33b6931

Please sign in to comment.