Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tracked-consumers memory pool be the default. #11949

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ async fn group_by_none() {
TestCase::new()
.with_query("select median(request_bytes) from t")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"AggregateStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: AggregateStream"
])
.with_memory_limit(2_000)
.run()
Expand All @@ -89,8 +88,7 @@ async fn group_by_row_hash() {
TestCase::new()
.with_query("select count(*) from t GROUP BY response_bytes")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"GroupedHashAggregateStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream"
])
.with_memory_limit(2_000)
.run()
Expand All @@ -103,8 +101,7 @@ async fn group_by_hash() {
// group by dict column
.with_query("select count(*) from t GROUP BY service, host, pod, container")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"GroupedHashAggregateStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream"
])
.with_memory_limit(1_000)
.run()
Expand All @@ -117,8 +114,7 @@ async fn join_by_key_multiple_partitions() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"HashJoinInput[0]",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[0]",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -132,8 +128,7 @@ async fn join_by_key_single_partition() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"HashJoinInput",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -146,8 +141,7 @@ async fn join_by_expression() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"NestedLoopJoinLoad[0]",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -159,8 +153,7 @@ async fn cross_join() {
TestCase::new()
.with_query("select t1.* from t t1 CROSS JOIN t t2")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"CrossJoinExec",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec",
])
.with_memory_limit(1_000)
.run()
Expand Down Expand Up @@ -216,8 +209,7 @@ async fn symmetric_hash_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"SymmetricHashJoinStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SymmetricHashJoinStream",
])
.with_memory_limit(1_000)
.with_scenario(Scenario::AccessLogStreaming)
Expand All @@ -235,8 +227,7 @@ async fn sort_preserving_merge() {
// so only a merge is needed
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"SortPreservingMergeExec",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SortPreservingMergeExec",
])
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
Expand Down Expand Up @@ -313,8 +304,7 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"ExternalSorterMerge", // merging in sort fails
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
])
.with_config(config)
.run()
Expand Down Expand Up @@ -343,8 +333,7 @@ async fn oom_recursive_cte() {
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"RecursiveQuery",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: RecursiveQuery",
])
.with_memory_limit(2_000)
.run()
Expand Down Expand Up @@ -396,7 +385,7 @@ async fn oom_with_tracked_consumer_pool() {
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)"
"Additional allocation failed with top memory consumers (across reservations) as: ParquetSink(ArrowColumnWriter)"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
Expand Down
16 changes: 8 additions & 8 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ fn provide_top_memory_consumers_to_error_msg(
error_msg: String,
top_consumers: String,
) -> String {
format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg)
format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg)
}

#[cfg(test)]
Expand Down Expand Up @@ -501,7 +501,7 @@ mod tests {
// Test: reports if new reservation causes error
// using the previously set sizes for other consumers
let mut r5 = MemoryConsumer::new("r5").register(&pool);
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
let res = r5.try_grow(150);
assert!(
matches!(
Expand All @@ -524,7 +524,7 @@ mod tests {

// Test: see error message when no consumers recorded yet
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -543,7 +543,7 @@ mod tests {
let mut r1 = new_consumer_same_name.clone().register(&pool);
// TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
// a followup PR will clarify this message "0 bytes already allocated for this reservation"
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
let res = r1.try_grow(150);
assert!(
matches!(
Expand All @@ -555,7 +555,7 @@ mod tests {

// Test: will accumulate size changes per consumer, not per reservation
r1.grow(20);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let res = r1.try_grow(150);
assert!(
matches!(
Expand All @@ -570,7 +570,7 @@ mod tests {
let consumer_with_same_name_but_different_hash =
MemoryConsumer::new(same_name).with_can_spill(true);
let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let res = r2.try_grow(150);
assert!(
matches!(
Expand All @@ -590,7 +590,7 @@ mod tests {
let r1_consumer = MemoryConsumer::new("r1");
let mut r1 = r1_consumer.clone().register(&pool);
r1.grow(20);
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -604,7 +604,7 @@ mod tests {
// Test: unregister one
// only the remaining one should be listed
pool.unregister(&r1_consumer);
let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes";
let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes";
let res = r0.try_grow(150);
assert!(
matches!(
Expand Down
14 changes: 11 additions & 3 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

use crate::{
disk_manager::{DiskManager, DiskManagerConfig},
memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
memory_pool::{
GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
},
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};

use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
use datafusion_common::{DataFusionError, Result};
use object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::sync::Arc;
use std::{
fmt::{Debug, Formatter},
num::NonZeroUsize,
};
use url::Url;

#[derive(Clone)]
Expand Down Expand Up @@ -213,7 +218,10 @@ impl RuntimeConfig {
/// Note DataFusion does not yet respect this limit in all cases.
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
let pool_size = (max_memory as f64 * memory_fraction) as usize;
self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(pool_size),
NonZeroUsize::new(5).unwrap(),
)))
}

/// Use the specified path to create any needed temporary files
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,8 @@ mod tests {

assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec"
);
assert_contains!(err.to_string(), "CrossJoinExec");

Ok(())
}
Expand Down
13 changes: 5 additions & 8 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3821,13 +3821,11 @@ mod tests {
let stream = join.execute(0, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();

// Asserting that operator-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput"
);

// Asserting that operator-level reservation attempting to overallocate
assert_contains!(err.to_string(), "HashJoinInput");
}

Ok(())
Expand Down Expand Up @@ -3902,13 +3900,12 @@ mod tests {
let stream = join.execute(1, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();

// Asserting that stream-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
);
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]"

// Asserting that stream-level reservation attempting to overallocate
assert_contains!(err.to_string(), "HashJoinInput[1]");
);
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,8 @@ mod tests {

assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
);
assert_contains!(err.to_string(), "NestedLoopJoinLoad[0]");
}

Ok(())
Expand Down