diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs new file mode 100644 index 0000000000000..fee72c337fa98 --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs @@ -0,0 +1,31 @@ +use datafusion::prelude::SessionContext; +use rand::thread_rng; + +#[derive(Debug, Clone)] +pub struct SessionContextGeneratorBuilder { + total_rows_num: usize, + sort_keys: Vec>, +} + +pub struct SessionContextGenerator { + /// Used in generate the random `batch_size` + /// + /// The generated `batch_size` is between (0, total_rows_num] + total_rows_num: usize, + + sort_keys: Vec>, + + +} + +struct SkipPartialParams { + +} + +impl SessionContextGenerator { + pub fn generate(&self) -> SessionContext { + let mut rng = thread_rng(); + } +} + + diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index ae55d270e8edc..a708557b357d2 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -1,8 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use arrow::array::ArrayBuilder; use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; +use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; use datafusion_physical_plan::sorts::sort::sort_batch; use rand::{ rngs::{StdRng, ThreadRng}, @@ -11,7 +29,7 @@ use rand::{ use rand_distr::Alphanumeric; use test_utils::{ array_gen::{PrimitiveArrayGenerator, StringArrayGenerator}, - StringBatchGenerator, + stagger_batch, StringBatchGenerator, }; /// Config for Data sets generator @@ -30,44 +48,106 @@ use test_utils::{ /// will be returned /// #[derive(Debug, Clone)] -struct DataSetsGeneratorConfig { - // Descriptions of columns in datasets - columns: Vec, +pub struct DatasetsGeneratorBuilder { + // Descriptions of columns in datasets, it's `required` + columns: Option>, + + // Rows num range of the generated datasets, it's `required` + rows_num_range: Option<(usize, usize)>, + + // Sort keys used to generate the sorted data set, it's optional + sort_keys_set: Vec>, +} + +impl DatasetsGeneratorBuilder { + pub fn build(self) -> DataSetsGenerator { + let columns = self.columns.expect("columns is required"); + let rows_num_range = self.rows_num_range.expect("rows_num_range is required"); - // Rows num range of the generated datasets - rows_num_range: (usize, usize), + let batch_generator = + RecordBatchGenerator::new(rows_num_range.0, rows_num_range.1, columns); - // Sort keys used to generate the sorted data set - sort_keys: Vec>, + DataSetsGenerator { + sort_keys_set: self.sort_keys_set, + batch_generator, + } + } + + pub fn columns(mut self, columns: Vec) -> Self { + self.columns = Some(columns); + self + } + + pub fn rows_num_range(mut self, rows_num_range: (usize, usize)) -> Self { + self.rows_num_range = Some(rows_num_range); + self + } + + pub fn sort_keys_set(mut self, sort_keys_set: Vec>) -> Self { + self.sort_keys_set = sort_keys_set; + self + } } /// Data sets generator /// +/// It will generate one `dataset`s when `generate` function is called. +/// +/// The generation logic in `generate`: +/// +/// - Randomly generate a base record from `batch_generator` firstly. +/// And `columns`, `rows_num_range` in `config`(detail can see `DataSetsGeneratorConfig`), +/// will be used in generation. +/// +/// - Sort the batch according to `sort_keys` in `config` to generator another +/// `len(sort_keys)` sorted batches. +/// +/// - Split each batch to multiple batches which each sub-batch in has the randomly `rows num`, +/// and this multiple batches will be used to create the `Dataset`. +/// struct DataSetsGenerator { - config: DataSetsGeneratorConfig, + sort_keys_set: Vec>, batch_generator: RecordBatchGenerator, } impl DataSetsGenerator { - fn new(config: DataSetsGeneratorConfig) -> Self { - let batch_generator = RecordBatchGenerator::new( - config.rows_num_range.0, - config.rows_num_range.1, - config.columns.clone(), - ); + fn generate(&self) -> Vec { + let mut datasets = Vec::with_capacity(self.config.sort_keys_set.len() + 1); - Self { - config, - batch_generator, + // Generate the base batch + let base_batch = self.batch_generator.generate(); + let batches = stagger_batch(base_batch.clone()); + let dataset = DataSet { + batches, + sorted_key: None, + }; + datasets.push(dataset); + + // Generate the related sorted batches + let schema = base_batch.schema_ref(); + for sort_keys in self.config.sort_keys_set.clone() { + let sort_exprs = sort_keys + .iter() + .map(|key| { + let col_expr = col(&key, &schema) + .expect( + &format!("sort key must be valid, invalid sort key:{key}, schema:{schema:?}") + ); + PhysicalSortExpr::new_default(col_expr) + }) + .collect::>(); + let sorted_batch = sort_batch(&base_batch, &sort_exprs, None) + .expect("sort batch should not fail"); + + let batches = stagger_batch(sorted_batch); + let dataset = DataSet { + batches, + sorted_key: None, + }; + datasets.push(dataset); } - } - fn generate(&self) -> Vec { - let batch = self.batch_generator.generate(); - vec![DataSet { - batches: vec![batch], - sorted_key: None, - }] + datasets } } @@ -294,30 +374,39 @@ impl RecordBatchGenerator { #[cfg(test)] mod test { - use arrow::util::pretty::pretty_format_batches; - - use super::*; - - #[test] - fn simple_test() { - let config = DataSetsGeneratorConfig { - columns: vec![ - ColumnDescr { - name: "a".to_string(), - column_type: DataType::Utf8, - }, - ColumnDescr { - name: "b".to_string(), - column_type: DataType::UInt32, - }, - ], - min_rows_num: 16, - max_rows_num: 32, - sort_keys: Vec::new(), - }; - - let gen = DataSetsGenerator::new(config); - let data_sets = gen.generate(); - println!("{}", pretty_format_batches(&data_sets[0].batches).unwrap()); - } + // use arrow::util::pretty::pretty_format_batches; + + // use super::*; + + // #[test] + // fn simple_test() { + // let config = DatasetsGeneratorBuilder { + // columns: vec![ + // ColumnDescr { + // name: "a".to_string(), + // column_type: DataType::Utf8, + // }, + // ColumnDescr { + // name: "b".to_string(), + // column_type: DataType::UInt32, + // }, + // ], + // rows_num_range: (16, 32), + // sort_keys_set: vec![vec!["b".to_string()]], + // }; + + // let gen = DataSetsGenerator::new(config); + // let datasets = gen.generate(); + + // for (round, dataset) in datasets.into_iter().enumerate() { + // println!("### round:{round} ###"); + // let num_rows = dataset + // .batches + // .iter() + // .map(|batch| batch.num_rows()) + // .collect::>(); + // println!("num_rows:{num_rows:?}"); + // println!("{}", pretty_format_batches(&dataset.batches).unwrap()); + // } + // } } diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs index 173f86bbe8971..78595f76ac45e 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + mod data_generator; struct Test {}