From 49166ea55f317722ab7a37fbfc253bcd497c1672 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 29 Nov 2022 15:07:10 -0500 Subject: [PATCH] Add integration test for erroring when memory limits are hit (#4406) * Add test for runtime memory limiting * Update datafusion/core/tests/memory_limit.rs --- .../src/execution/memory_manager/proxy.rs | 7 +- .../core/src/physical_plan/aggregates/hash.rs | 2 +- .../physical_plan/aggregates/no_grouping.rs | 2 +- .../src/physical_plan/aggregates/row_hash.rs | 2 +- .../core/src/physical_plan/sorts/sort.rs | 15 +-- datafusion/core/tests/join_fuzz.rs | 19 +-- datafusion/core/tests/memory_limit.rs | 112 ++++++++++++++++++ datafusion/core/tests/merge_fuzz.rs | 18 +-- datafusion/core/tests/order_spill_fuzz.rs | 21 +--- test-utils/src/lib.rs | 32 ++++- 10 files changed, 165 insertions(+), 65 deletions(-) create mode 100644 datafusion/core/tests/memory_limit.rs diff --git a/datafusion/core/src/execution/memory_manager/proxy.rs b/datafusion/core/src/execution/memory_manager/proxy.rs index 6ea52e909ae3..2a5bd2507357 100644 --- a/datafusion/core/src/execution/memory_manager/proxy.rs +++ b/datafusion/core/src/execution/memory_manager/proxy.rs @@ -88,9 +88,10 @@ impl MemoryConsumer for MemoryConsumerProxy { } async fn spill(&self) -> Result { - Err(DataFusionError::ResourcesExhausted( - "Cannot spill AggregationState".to_owned(), - )) + Err(DataFusionError::ResourcesExhausted(format!( + "Cannot spill {}", + self.name + ))) } fn mem_used(&self) -> usize { diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs index d3d5a337e02f..8bf929630f05 100644 --- a/datafusion/core/src/physical_plan/aggregates/hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/hash.rs @@ -135,7 +135,7 @@ impl GroupedHashAggregateStream { aggregate_expressions, accumulators: Accumulators { memory_consumer: MemoryConsumerProxy::new( - "Accumulators", + "GroupBy Hash Accumulators", MemoryConsumerId::new(partition), Arc::clone(&context.runtime_env().memory_manager), ), diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs index 8c3556bb6f21..64cc4f569c8c 100644 --- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs +++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs @@ -73,7 +73,7 @@ impl AggregateStream { let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?; let accumulators = create_accumulators(&aggr_expr)?; let memory_consumer = MemoryConsumerProxy::new( - "AggregationState", + "GroupBy None Accumulators", MemoryConsumerId::new(partition), Arc::clone(&context.runtime_env().memory_manager), ); diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index f0311f0885e6..c73fa3da0c2e 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -144,7 +144,7 @@ impl GroupedHashAggregateStreamV2 { let aggr_state = AggregationState { memory_consumer: MemoryConsumerProxy::new( - "AggregationState", + "GroupBy Hash (Row) AggregationState", MemoryConsumerId::new(partition), Arc::clone(&context.runtime_env().memory_manager), ), diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index bfc33a954730..0b3be090672d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -118,6 +118,7 @@ impl ExternalSorter { ) -> Result<()> { if input.num_rows() > 0 { let size = batch_byte_size(&input); + debug!("Inserting {} rows of {} bytes", input.num_rows(), size); self.try_grow(size).await?; self.metrics.mem_used().add(size); let mut in_mem_batches = self.in_mem_batches.lock().await; @@ -272,6 +273,13 @@ impl MemoryConsumer for ExternalSorter { } async fn spill(&self) -> Result { + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + // we could always get a chance to free some memory as long as we are holding some + if in_mem_batches.len() == 0 { + return Ok(0); + } + debug!( "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", self.name(), @@ -280,13 +288,6 @@ impl MemoryConsumer for ExternalSorter { self.spill_count() ); - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - // we could always get a chance to free some memory as long as we are holding some - if in_mem_batches.len() == 0 { - return Ok(0); - } - let tracking_metrics = self .metrics_set .new_intermediate_tracking(partition, self.runtime.clone()); diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs index 1204b442819a..40966843cec4 100644 --- a/datafusion/core/tests/join_fuzz.rs +++ b/datafusion/core/tests/join_fuzz.rs @@ -21,8 +21,7 @@ use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; +use rand::Rng; use datafusion::physical_plan::collect; use datafusion::physical_plan::expressions::Column; @@ -31,7 +30,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion_expr::JoinType; use datafusion::prelude::{SessionConfig, SessionContext}; -use test_utils::add_empty_batches; +use test_utils::stagger_batch_with_seed; #[tokio::test] async fn test_inner_join_1k() { @@ -200,7 +199,7 @@ fn make_staggered_batches(len: usize) -> Vec { let input4 = Int32Array::from_iter_values(input4.into_iter()); // split into several record batches - let mut remainder = RecordBatch::try_from_iter(vec![ + let batch = RecordBatch::try_from_iter(vec![ ("a", Arc::new(input1) as ArrayRef), ("b", Arc::new(input2) as ArrayRef), ("x", Arc::new(input3) as ArrayRef), @@ -208,16 +207,6 @@ fn make_staggered_batches(len: usize) -> Vec { ]) .unwrap(); - let mut batches = vec![]; - // use a random number generator to pick a random sized output - let mut rng = StdRng::seed_from_u64(42); - while remainder.num_rows() > 0 { - let batch_size = rng.gen_range(0..remainder.num_rows() + 1); - - batches.push(remainder.slice(0, batch_size)); - remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); - } - - add_empty_batches(batches, &mut rng) + stagger_batch_with_seed(batch, 42) } diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs new file mode 100644 index 000000000000..20ad555d66e1 --- /dev/null +++ b/datafusion/core/tests/memory_limit.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains tests for limiting memory at runtime in DataFusion + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use datafusion::datasource::MemTable; +use datafusion::execution::disk_manager::DiskManagerConfig; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion_common::assert_contains; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use test_utils::{stagger_batch, AccessLogGenerator}; + +#[cfg(test)] +#[ctor::ctor] +fn init() { + let _ = env_logger::try_init(); +} + +#[tokio::test] +async fn oom_sort() { + run_limit_test( + "select * from t order by host DESC", + "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)", + ) + .await +} + +#[tokio::test] +async fn group_by_none() { + run_limit_test( + "select median(image) from t", + "Resources exhausted: Cannot spill GroupBy None Accumulators", + ) + .await +} + +#[tokio::test] +async fn group_by_row_hash() { + run_limit_test( + "select count(*) from t GROUP BY response_bytes", + "Resources exhausted: Cannot spill GroupBy Hash (Row) AggregationState", + ) + .await +} + +#[tokio::test] +async fn group_by_hash() { + run_limit_test( + // group by dict column + "select count(*) from t GROUP BY service, host, pod, container", + "Resources exhausted: Cannot spill GroupBy Hash Accumulators", + ) + .await +} + +/// 50 byte memory limit +const MEMORY_LIMIT_BYTES: usize = 50; +const MEMORY_FRACTION: f64 = 0.95; + +/// runs the specified query against 1000 rows with a 50 +/// byte memory limit and no disk manager enabled. +async fn run_limit_test(query: &str, expected_error: &str) { + let generator = AccessLogGenerator::new().with_row_limit(Some(1000)); + + let batches: Vec = generator + // split up into more than one batch, as the size limit in sort is not enforced until the second batch + .flat_map(stagger_batch) + .collect(); + + let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap(); + + let rt_config = RuntimeConfig::new() + // do not allow spilling + .with_disk_manager(DiskManagerConfig::Disabled) + // Only allow 50 bytes + .with_memory_limit(MEMORY_LIMIT_BYTES, MEMORY_FRACTION); + + let runtime = RuntimeEnv::new(rt_config).unwrap(); + + let ctx = SessionContext::with_config_rt(SessionConfig::new(), Arc::new(runtime)); + ctx.register_table("t", Arc::new(table)) + .expect("registering table"); + + let df = ctx.sql(query).await.expect("Planning query"); + + match df.collect().await { + Ok(_batches) => { + panic!("Unexpected success when running, expected memory limit failure") + } + Err(e) => { + assert_contains!(e.to_string(), expected_error); + } + } +} diff --git a/datafusion/core/tests/merge_fuzz.rs b/datafusion/core/tests/merge_fuzz.rs index 2280cdeb6029..64738c3ffaa3 100644 --- a/datafusion/core/tests/merge_fuzz.rs +++ b/datafusion/core/tests/merge_fuzz.rs @@ -30,8 +30,7 @@ use datafusion::physical_plan::{ sorts::sort_preserving_merge::SortPreservingMergeExec, }; use datafusion::prelude::{SessionConfig, SessionContext}; -use rand::{prelude::StdRng, Rng, SeedableRng}; -use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; +use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed}; #[tokio::test] async fn test_merge_2() { @@ -151,21 +150,10 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec { let input: Int32Array = (low..high).map(Some).collect(); // split into several record batches - let mut remainder = + let batch = RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap(); - let mut batches = vec![]; - - // use a random number generator to pick a random sized output - let mut rng = StdRng::seed_from_u64(seed); - while remainder.num_rows() > 0 { - let batch_size = rng.gen_range(0..remainder.num_rows() + 1); - - batches.push(remainder.slice(0, batch_size)); - remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); - } - - add_empty_batches(batches, &mut rng) + stagger_batch_with_seed(batch, seed) } fn concat(mut v1: Vec, v2: Vec) -> Vec { diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index ad39630afd8c..cc700d5d2cb7 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -29,10 +29,9 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; -use rand::prelude::StdRng; -use rand::{Rng, SeedableRng}; +use rand::Rng; use std::sync::Arc; -use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; +use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed}; #[tokio::test] #[cfg_attr(tarpaulin, ignore)] @@ -116,19 +115,7 @@ fn make_staggered_batches(len: usize) -> Vec { let input = Int32Array::from_iter_values(input.into_iter()); // split into several record batches - let mut remainder = + let batch = RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap(); - - let mut batches = vec![]; - - // use a random number generator to pick a random sized output - let mut rng = StdRng::seed_from_u64(42); - while remainder.num_rows() > 0 { - let batch_size = rng.gen_range(0..remainder.num_rows() + 1); - - batches.push(remainder.slice(0, batch_size)); - remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); - } - - add_empty_batches(batches, &mut rng) + stagger_batch_with_seed(batch, 42) } diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 5c3b64574a68..4002a49cf585 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -19,7 +19,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_int32_array; use rand::prelude::StdRng; -use rand::Rng; +use rand::{Rng, SeedableRng}; mod data_gen; @@ -50,10 +50,7 @@ pub fn partitions_to_sorted_vec(partitions: &[Vec]) -> Vec, - rng: &mut StdRng, -) -> Vec { +fn add_empty_batches(batches: Vec, rng: &mut StdRng) -> Vec { let schema = batches[0].schema(); batches @@ -68,3 +65,28 @@ pub fn add_empty_batches( }) .collect() } + +/// "stagger" batches: split the batches into random sized batches +pub fn stagger_batch(batch: RecordBatch) -> Vec { + let seed = 42; + stagger_batch_with_seed(batch, seed) +} + +/// "stagger" batches: split the batches into random sized batches +/// using the specified value for a rng seed +pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec { + let mut batches = vec![]; + + // use a random number generator to pick a random sized output + let mut rng = StdRng::seed_from_u64(seed); + + let mut remainder = batch; + while remainder.num_rows() > 0 { + let batch_size = rng.gen_range(0..remainder.num_rows() + 1); + + batches.push(remainder.slice(0, batch_size)); + remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); + } + + add_empty_batches(batches, &mut rng) +}