Skip to content

Commit

Permalink
improve the data generator, and start to impl the session context gen…
Browse files Browse the repository at this point in the history
…erator.
  • Loading branch information
Rachelint committed Oct 1, 2024
1 parent e7f66ef commit b9cebff
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use datafusion::prelude::SessionContext;
use rand::thread_rng;

#[derive(Debug, Clone)]
pub struct SessionContextGeneratorBuilder {
total_rows_num: usize,
sort_keys: Vec<Vec<String>>,
}

pub struct SessionContextGenerator {
/// Used in generate the random `batch_size`
///
/// The generated `batch_size` is between (0, total_rows_num]
total_rows_num: usize,

sort_keys: Vec<Vec<String>>,


}

struct SkipPartialParams {

}

impl SessionContextGenerator {
pub fn generate(&self) -> SessionContext {
let mut rng = thread_rng();
}
}


191 changes: 140 additions & 51 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::ArrayBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
use datafusion_physical_plan::sorts::sort::sort_batch;
use rand::{
rngs::{StdRng, ThreadRng},
Expand All @@ -11,7 +29,7 @@ use rand::{
use rand_distr::Alphanumeric;
use test_utils::{
array_gen::{PrimitiveArrayGenerator, StringArrayGenerator},
StringBatchGenerator,
stagger_batch, StringBatchGenerator,
};

/// Config for Data sets generator
Expand All @@ -30,44 +48,106 @@ use test_utils::{
/// will be returned
///
#[derive(Debug, Clone)]
struct DataSetsGeneratorConfig {
// Descriptions of columns in datasets
columns: Vec<ColumnDescr>,
pub struct DatasetsGeneratorBuilder {
// Descriptions of columns in datasets, it's `required`
columns: Option<Vec<ColumnDescr>>,

// Rows num range of the generated datasets, it's `required`
rows_num_range: Option<(usize, usize)>,

// Sort keys used to generate the sorted data set, it's optional
sort_keys_set: Vec<Vec<String>>,
}

impl DatasetsGeneratorBuilder {
pub fn build(self) -> DataSetsGenerator {
let columns = self.columns.expect("columns is required");
let rows_num_range = self.rows_num_range.expect("rows_num_range is required");

// Rows num range of the generated datasets
rows_num_range: (usize, usize),
let batch_generator =
RecordBatchGenerator::new(rows_num_range.0, rows_num_range.1, columns);

// Sort keys used to generate the sorted data set
sort_keys: Vec<Vec<String>>,
DataSetsGenerator {
sort_keys_set: self.sort_keys_set,
batch_generator,
}
}

pub fn columns(mut self, columns: Vec<ColumnDescr>) -> Self {
self.columns = Some(columns);
self
}

pub fn rows_num_range(mut self, rows_num_range: (usize, usize)) -> Self {
self.rows_num_range = Some(rows_num_range);
self
}

pub fn sort_keys_set(mut self, sort_keys_set: Vec<Vec<String>>) -> Self {
self.sort_keys_set = sort_keys_set;
self
}
}

/// Data sets generator
///
/// It will generate one `dataset`s when `generate` function is called.
///
/// The generation logic in `generate`:
///
/// - Randomly generate a base record from `batch_generator` firstly.
/// And `columns`, `rows_num_range` in `config`(detail can see `DataSetsGeneratorConfig`),
/// will be used in generation.
///
/// - Sort the batch according to `sort_keys` in `config` to generator another
/// `len(sort_keys)` sorted batches.
///
/// - Split each batch to multiple batches which each sub-batch in has the randomly `rows num`,
/// and this multiple batches will be used to create the `Dataset`.
///
struct DataSetsGenerator {
config: DataSetsGeneratorConfig,
sort_keys_set: Vec<Vec<String>>,
batch_generator: RecordBatchGenerator,
}

impl DataSetsGenerator {
fn new(config: DataSetsGeneratorConfig) -> Self {
let batch_generator = RecordBatchGenerator::new(
config.rows_num_range.0,
config.rows_num_range.1,
config.columns.clone(),
);
fn generate(&self) -> Vec<DataSet> {
let mut datasets = Vec::with_capacity(self.config.sort_keys_set.len() + 1);

Self {
config,
batch_generator,
// Generate the base batch
let base_batch = self.batch_generator.generate();
let batches = stagger_batch(base_batch.clone());
let dataset = DataSet {
batches,
sorted_key: None,
};
datasets.push(dataset);

// Generate the related sorted batches
let schema = base_batch.schema_ref();
for sort_keys in self.config.sort_keys_set.clone() {
let sort_exprs = sort_keys
.iter()
.map(|key| {
let col_expr = col(&key, &schema)
.expect(
&format!("sort key must be valid, invalid sort key:{key}, schema:{schema:?}")
);
PhysicalSortExpr::new_default(col_expr)
})
.collect::<Vec<_>>();
let sorted_batch = sort_batch(&base_batch, &sort_exprs, None)
.expect("sort batch should not fail");

let batches = stagger_batch(sorted_batch);
let dataset = DataSet {
batches,
sorted_key: None,
};
datasets.push(dataset);
}
}

fn generate(&self) -> Vec<DataSet> {
let batch = self.batch_generator.generate();
vec![DataSet {
batches: vec![batch],
sorted_key: None,
}]
datasets
}
}

Expand Down Expand Up @@ -294,30 +374,39 @@ impl RecordBatchGenerator {

#[cfg(test)]
mod test {
use arrow::util::pretty::pretty_format_batches;

use super::*;

#[test]
fn simple_test() {
let config = DataSetsGeneratorConfig {
columns: vec![
ColumnDescr {
name: "a".to_string(),
column_type: DataType::Utf8,
},
ColumnDescr {
name: "b".to_string(),
column_type: DataType::UInt32,
},
],
min_rows_num: 16,
max_rows_num: 32,
sort_keys: Vec::new(),
};

let gen = DataSetsGenerator::new(config);
let data_sets = gen.generate();
println!("{}", pretty_format_batches(&data_sets[0].batches).unwrap());
}
// use arrow::util::pretty::pretty_format_batches;

// use super::*;

// #[test]
// fn simple_test() {
// let config = DatasetsGeneratorBuilder {
// columns: vec![
// ColumnDescr {
// name: "a".to_string(),
// column_type: DataType::Utf8,
// },
// ColumnDescr {
// name: "b".to_string(),
// column_type: DataType::UInt32,
// },
// ],
// rows_num_range: (16, 32),
// sort_keys_set: vec![vec!["b".to_string()]],
// };

// let gen = DataSetsGenerator::new(config);
// let datasets = gen.generate();

// for (round, dataset) in datasets.into_iter().enumerate() {
// println!("### round:{round} ###");
// let num_rows = dataset
// .batches
// .iter()
// .map(|batch| batch.num_rows())
// .collect::<Vec<_>>();
// println!("num_rows:{num_rows:?}");
// println!("{}", pretty_format_batches(&dataset.batches).unwrap());
// }
// }
}
17 changes: 17 additions & 0 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod data_generator;

struct Test {}

0 comments on commit b9cebff

Please sign in to comment.