Skip to content

Commit

Permalink
Add integration test for erroring when memory limits are hit (#4406)
Browse files Browse the repository at this point in the history
* Add test for runtime memory limiting

* Update datafusion/core/tests/memory_limit.rs
  • Loading branch information
alamb authored Nov 29, 2022
1 parent 66c95e7 commit 49166ea
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 65 deletions.
7 changes: 4 additions & 3 deletions datafusion/core/src/execution/memory_manager/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ impl MemoryConsumer for MemoryConsumerProxy {
}

async fn spill(&self) -> Result<usize, DataFusionError> {
Err(DataFusionError::ResourcesExhausted(
"Cannot spill AggregationState".to_owned(),
))
Err(DataFusionError::ResourcesExhausted(format!(
"Cannot spill {}",
self.name
)))
}

fn mem_used(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl GroupedHashAggregateStream {
aggregate_expressions,
accumulators: Accumulators {
memory_consumer: MemoryConsumerProxy::new(
"Accumulators",
"GroupBy Hash Accumulators",
MemoryConsumerId::new(partition),
Arc::clone(&context.runtime_env().memory_manager),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl AggregateStream {
let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
let accumulators = create_accumulators(&aggr_expr)?;
let memory_consumer = MemoryConsumerProxy::new(
"AggregationState",
"GroupBy None Accumulators",
MemoryConsumerId::new(partition),
Arc::clone(&context.runtime_env().memory_manager),
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl GroupedHashAggregateStreamV2 {

let aggr_state = AggregationState {
memory_consumer: MemoryConsumerProxy::new(
"AggregationState",
"GroupBy Hash (Row) AggregationState",
MemoryConsumerId::new(partition),
Arc::clone(&context.runtime_env().memory_manager),
),
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl ExternalSorter {
) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
debug!("Inserting {} rows of {} bytes", input.num_rows(), size);
self.try_grow(size).await?;
self.metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
Expand Down Expand Up @@ -272,6 +273,13 @@ impl MemoryConsumer for ExternalSorter {
}

async fn spill(&self) -> Result<usize> {
let partition = self.partition_id();
let mut in_mem_batches = self.in_mem_batches.lock().await;
// we could always get a chance to free some memory as long as we are holding some
if in_mem_batches.len() == 0 {
return Ok(0);
}

debug!(
"{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)",
self.name(),
Expand All @@ -280,13 +288,6 @@ impl MemoryConsumer for ExternalSorter {
self.spill_count()
);

let partition = self.partition_id();
let mut in_mem_batches = self.in_mem_batches.lock().await;
// we could always get a chance to free some memory as long as we are holding some
if in_mem_batches.len() == 0 {
return Ok(0);
}

let tracking_metrics = self
.metrics_set
.new_intermediate_tracking(partition, self.runtime.clone());
Expand Down
19 changes: 4 additions & 15 deletions datafusion/core/tests/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use arrow::array::{ArrayRef, Int32Array};
use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand::Rng;

use datafusion::physical_plan::collect;
use datafusion::physical_plan::expressions::Column;
Expand All @@ -31,7 +30,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion_expr::JoinType;

use datafusion::prelude::{SessionConfig, SessionContext};
use test_utils::add_empty_batches;
use test_utils::stagger_batch_with_seed;

#[tokio::test]
async fn test_inner_join_1k() {
Expand Down Expand Up @@ -200,24 +199,14 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
let input4 = Int32Array::from_iter_values(input4.into_iter());

// split into several record batches
let mut remainder = RecordBatch::try_from_iter(vec![
let batch = RecordBatch::try_from_iter(vec![
("a", Arc::new(input1) as ArrayRef),
("b", Arc::new(input2) as ArrayRef),
("x", Arc::new(input3) as ArrayRef),
("y", Arc::new(input4) as ArrayRef),
])
.unwrap();

let mut batches = vec![];

// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(42);
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);

batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}

add_empty_batches(batches, &mut rng)
stagger_batch_with_seed(batch, 42)
}
112 changes: 112 additions & 0 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module contains tests for limiting memory at runtime in DataFusion

use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_common::assert_contains;

use datafusion::prelude::{SessionConfig, SessionContext};
use test_utils::{stagger_batch, AccessLogGenerator};

#[cfg(test)]
#[ctor::ctor]
fn init() {
let _ = env_logger::try_init();
}

#[tokio::test]
async fn oom_sort() {
run_limit_test(
"select * from t order by host DESC",
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
)
.await
}

#[tokio::test]
async fn group_by_none() {
run_limit_test(
"select median(image) from t",
"Resources exhausted: Cannot spill GroupBy None Accumulators",
)
.await
}

#[tokio::test]
async fn group_by_row_hash() {
run_limit_test(
"select count(*) from t GROUP BY response_bytes",
"Resources exhausted: Cannot spill GroupBy Hash (Row) AggregationState",
)
.await
}

#[tokio::test]
async fn group_by_hash() {
run_limit_test(
// group by dict column
"select count(*) from t GROUP BY service, host, pod, container",
"Resources exhausted: Cannot spill GroupBy Hash Accumulators",
)
.await
}

/// 50 byte memory limit
const MEMORY_LIMIT_BYTES: usize = 50;
const MEMORY_FRACTION: f64 = 0.95;

/// runs the specified query against 1000 rows with a 50
/// byte memory limit and no disk manager enabled.
async fn run_limit_test(query: &str, expected_error: &str) {
let generator = AccessLogGenerator::new().with_row_limit(Some(1000));

let batches: Vec<RecordBatch> = generator
// split up into more than one batch, as the size limit in sort is not enforced until the second batch
.flat_map(stagger_batch)
.collect();

let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap();

let rt_config = RuntimeConfig::new()
// do not allow spilling
.with_disk_manager(DiskManagerConfig::Disabled)
// Only allow 50 bytes
.with_memory_limit(MEMORY_LIMIT_BYTES, MEMORY_FRACTION);

let runtime = RuntimeEnv::new(rt_config).unwrap();

let ctx = SessionContext::with_config_rt(SessionConfig::new(), Arc::new(runtime));
ctx.register_table("t", Arc::new(table))
.expect("registering table");

let df = ctx.sql(query).await.expect("Planning query");

match df.collect().await {
Ok(_batches) => {
panic!("Unexpected success when running, expected memory limit failure")
}
Err(e) => {
assert_contains!(e.to_string(), expected_error);
}
}
}
18 changes: 3 additions & 15 deletions datafusion/core/tests/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use datafusion::physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use rand::{prelude::StdRng, Rng, SeedableRng};
use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};

#[tokio::test]
async fn test_merge_2() {
Expand Down Expand Up @@ -151,21 +150,10 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
let input: Int32Array = (low..high).map(Some).collect();

// split into several record batches
let mut remainder =
let batch =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();

let mut batches = vec![];

// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(seed);
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);

batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}

add_empty_batches(batches, &mut rng)
stagger_batch_with_seed(batch, seed)
}

fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> {
Expand Down
21 changes: 4 additions & 17 deletions datafusion/core/tests/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use rand::Rng;
use std::sync::Arc;
use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
Expand Down Expand Up @@ -116,19 +115,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
let input = Int32Array::from_iter_values(input.into_iter());

// split into several record batches
let mut remainder =
let batch =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();

let mut batches = vec![];

// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(42);
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);

batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}

add_empty_batches(batches, &mut rng)
stagger_batch_with_seed(batch, 42)
}
32 changes: 27 additions & 5 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int32_array;
use rand::prelude::StdRng;
use rand::Rng;
use rand::{Rng, SeedableRng};

mod data_gen;

Expand Down Expand Up @@ -50,10 +50,7 @@ pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i
}

/// Adds a random number of empty record batches into the stream
pub fn add_empty_batches(
batches: Vec<RecordBatch>,
rng: &mut StdRng,
) -> Vec<RecordBatch> {
fn add_empty_batches(batches: Vec<RecordBatch>, rng: &mut StdRng) -> Vec<RecordBatch> {
let schema = batches[0].schema();

batches
Expand All @@ -68,3 +65,28 @@ pub fn add_empty_batches(
})
.collect()
}

/// "stagger" batches: split the batches into random sized batches
pub fn stagger_batch(batch: RecordBatch) -> Vec<RecordBatch> {
let seed = 42;
stagger_batch_with_seed(batch, seed)
}

/// "stagger" batches: split the batches into random sized batches
/// using the specified value for a rng seed
pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec<RecordBatch> {
let mut batches = vec![];

// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(seed);

let mut remainder = batch;
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);

batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}

add_empty_batches(batches, &mut rng)
}

0 comments on commit 49166ea

Please sign in to comment.