Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce sorting handle fetchable operators, add option to repartition based on row count estimates #11875

Merged
merged 30 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
510b16c
Tmp
mustafasrepo Aug 7, 2024
6ef4369
Minor changes
mustafasrepo Aug 7, 2024
c3efafc
Minor changes
mustafasrepo Aug 7, 2024
2bf220d
Minor changes
mustafasrepo Aug 7, 2024
eb83917
Implement top down recursion with delete check
mustafasrepo Aug 7, 2024
0b66b15
Minor changes
mustafasrepo Aug 7, 2024
c769f9f
Minor changes
mustafasrepo Aug 7, 2024
0ad7063
Address reviews
mustafasrepo Aug 7, 2024
3661f06
Update comments
mustafasrepo Aug 7, 2024
60967c1
Minor changes
mustafasrepo Aug 7, 2024
6b87c4c
Make test deterministic
mustafasrepo Aug 7, 2024
8dd7e0a
Add fetch info to the statistics
mustafasrepo Aug 8, 2024
15423ae
Enforce distribution use inexact count estimate also.
mustafasrepo Aug 8, 2024
94fb83d
Minor changes
mustafasrepo Aug 8, 2024
9053b9f
Minor changes
mustafasrepo Aug 8, 2024
1171584
Minor changes
mustafasrepo Aug 8, 2024
711038d
Do not add unnecessary hash partitioning
mustafasrepo Aug 9, 2024
7e598e5
Minor changes
mustafasrepo Aug 9, 2024
12ad2c2
Add config option to use inexact row number estimates during planning
mustafasrepo Aug 9, 2024
2e3cc5d
Update config
mustafasrepo Aug 9, 2024
34af8ba
Minor changes
mustafasrepo Aug 9, 2024
98760bc
Minor changes
mustafasrepo Aug 9, 2024
1e4dada
Final review
ozankabak Aug 9, 2024
9fc4f3d
Address reviews
mustafasrepo Aug 9, 2024
1116058
Add handling for sort removal with fetch
mustafasrepo Aug 9, 2024
44dc292
Fix linter errors
mustafasrepo Aug 9, 2024
c6d2de6
Minor changes
mustafasrepo Aug 9, 2024
c7c85f4
Update config
mustafasrepo Aug 9, 2024
7c8967d
Cleanup stats under fetch
ozankabak Aug 9, 2024
ed35660
Update SLT comment
ozankabak Aug 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,14 @@ config_namespace! {
/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000

/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
/// only exact row numbers (not estimates) are used for this decision.
/// Setting this flag to `true` will likely produce better plans.
/// if the source of statistics is accurate.
/// We plan to make this the default in the future.
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
}
}

Expand Down
122 changes: 104 additions & 18 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

use std::fmt::{self, Debug, Display};

use crate::ScalarValue;
use crate::{Result, ScalarValue};

use arrow_schema::Schema;
use arrow_schema::{Schema, SchemaRef};

/// Represents a value with a degree of certainty. `Precision` is used to
/// propagate information the precision of statistical values.
Expand Down Expand Up @@ -247,21 +247,96 @@ impl Statistics {

/// If the exactness of a [`Statistics`] instance is lost, this function relaxes
/// the exactness of all information by converting them [`Precision::Inexact`].
pub fn into_inexact(self) -> Self {
Statistics {
num_rows: self.num_rows.to_inexact(),
total_byte_size: self.total_byte_size.to_inexact(),
column_statistics: self
.column_statistics
.into_iter()
.map(|cs| ColumnStatistics {
null_count: cs.null_count.to_inexact(),
max_value: cs.max_value.to_inexact(),
min_value: cs.min_value.to_inexact(),
distinct_count: cs.distinct_count.to_inexact(),
})
.collect::<Vec<_>>(),
pub fn to_inexact(mut self) -> Self {
self.num_rows = self.num_rows.to_inexact();
self.total_byte_size = self.total_byte_size.to_inexact();
self.column_statistics = self
.column_statistics
.into_iter()
.map(|s| s.to_inexact())
.collect();
self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
pub fn with_fetch(
mut self,
schema: SchemaRef,
fetch: Option<usize>,
skip: usize,
n_partitions: usize,
) -> Result<Self> {
let fetch_val = fetch.unwrap_or(usize::MAX);

self.num_rows = match self {
Statistics {
num_rows: Precision::Exact(nr),
..
}
| Statistics {
num_rows: Precision::Inexact(nr),
..
} => {
// Here, the inexact case gives us an upper bound on the number of rows.
if nr <= skip {
// All input data will be skipped:
Precision::Exact(0)
} else if nr <= fetch_val && skip == 0 {
// If the input does not reach the `fetch` globally, and `skip`
// is zero (meaning the input and output are identical), return
// input stats as is.
// TODO: Can input stats still be used, but adjusted, when `skip`
// is non-zero?
return Ok(self);
} else if nr - skip <= fetch_val {
// After `skip` input rows are skipped, the remaining rows are
// less than or equal to the `fetch` values, so `num_rows` must
// equal the remaining rows.
check_num_rows(
(nr - skip).checked_mul(n_partitions),
// We know that we have an estimate for the number of rows:
self.num_rows.is_exact().unwrap(),
)
} else {
// At this point we know that we were given a `fetch` value
// as the `None` case would go into the branch above. Since
// the input has more rows than `fetch + skip`, the number
// of rows will be the `fetch`, but we won't be able to
// predict the other statistics.
check_num_rows(
fetch_val.checked_mul(n_partitions),
// We know that we have an estimate for the number of rows:
self.num_rows.is_exact().unwrap(),
)
}
}
Statistics {
num_rows: Precision::Absent,
..
} => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
};
self.column_statistics = Statistics::unknown_column(&schema);
self.total_byte_size = Precision::Absent;
Ok(self)
}
}

/// Creates an estimate of the number of rows in the output using the given
/// optional value and exactness flag.
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
if let Some(value) = value {
if is_exact {
Precision::Exact(value)
} else {
// If the input stats are inexact, so are the output stats.
Precision::Inexact(value)
}
} else {
// If the estimate is not available (e.g. due to an overflow), we can
// not produce a reliable estimate.
Precision::Absent
}
}

Expand Down Expand Up @@ -336,14 +411,25 @@ impl ColumnStatistics {
}

/// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
pub fn new_unknown() -> ColumnStatistics {
ColumnStatistics {
pub fn new_unknown() -> Self {
Self {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
distinct_count: Precision::Absent,
}
}

/// If the exactness of a [`ColumnStatistics`] instance is lost, this
/// function relaxes the exactness of all information by converting them
/// [`Precision::Inexact`].
pub fn to_inexact(mut self) -> Self {
self.null_count = self.null_count.to_inexact();
self.max_value = self.max_value.to_inexact();
self.min_value = self.min_value.to_inexact();
self.distinct_count = self.distinct_count.to_inexact();
self
}
}

#[cfg(test)]
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2997,13 +2997,13 @@ mod tests {
.await?
.select_columns(&["c1", "c2", "c3"])?
.filter(col("c2").eq(lit(3)).and(col("c1").eq(lit("a"))))?
.limit(0, Some(1))?
.sort(vec![
// make the test deterministic
col("c1").sort(true, true),
col("c2").sort(true, true),
col("c3").sort(true, true),
])?
.limit(0, Some(1))?
.with_column("sum", col("c2") + col("c3"))?;

let df_sum_renamed = df
Expand All @@ -3019,11 +3019,11 @@ mod tests {

assert_batches_sorted_eq!(
[
"+-----+-----+----+-------+",
Copy link
Contributor Author

@mustafasrepo mustafasrepo Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result of this test changes with this PR. I have analyzed the change, previously this tes was generating the following plan:

    "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as sum]",
    "  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "    SortExec: expr=[c1@0 ASC,c2@1 ASC,c3@2 ASC], preserve_partitioning=[false]",
    "      GlobalLimitExec: skip=0, fetch=1",
    "        CoalescePartitionsExec",
    "          CoalesceBatchesExec: target_batch_size=8192, fetch=1",
    "            FilterExec: c2@1 = 3 AND c1@0 = a",
    "              RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "                CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",

After the changes in this PR, following plan is generated

    "ProjectionExec: expr=[c1@0 as one, c2@1 as two, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as total]",
    "  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "    SortExec: TopK(fetch=1), expr=[c3@2 ASC], preserve_partitioning=[false]",
    "      CoalescePartitionsExec",
    "        CoalesceBatchesExec: target_batch_size=8192",
    "          FilterExec: c2@1 = 3 AND c1@0 = a",
    "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "              CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",

I think the second plan generates a deterministic result. However, the query (dataframe query) is not deterministic as is.
With this observation, I have updated the place of the limit to make sure the query is deterministic after execution. With the change of the place of the limit, the following plan is generated:

    "ProjectionExec: expr=[c1@0 as one, c2@1 as two, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as total]",
    "  GlobalLimitExec: skip=0, fetch=1",
    "    SortPreservingMergeExec: [c1@0 ASC,c2@1 ASC,c3@2 ASC], fetch=1",
    "      SortExec: TopK(fetch=1), expr=[c3@2 ASC], preserve_partitioning=[true]",
    "        CoalesceBatchesExec: target_batch_size=8192",
    "          FilterExec: c2@1 = 3 AND c1@0 = a",
    "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "              CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it also makes sense that the previous test did a sort right after a select + filter which will not produce a deterministic result. Doing the limit after the sort makes sense

"| one | two | c3 | total |",
"+-----+-----+----+-------+",
"| a | 3 | 13 | 16 |",
"+-----+-----+----+-------+"
"+-----+-----+-----+-------+",
"| one | two | c3 | total |",
"+-----+-----+-----+-------+",
"| a | 3 | -72 | -69 |",
"+-----+-----+-----+-------+",
],
&df_sum_renamed
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn get_statistics_with_limit(
// If we still have files in the stream, it means that the limit kicked
// in, and the statistic could have been different had we processed the
// files in a different order.
statistics = statistics.into_inexact()
statistics = statistics.to_inexact()
}

Ok((result_files, statistics))
Expand Down
Loading