Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuse grouped aggregate and filter operators for improved performance #5944

Closed
andygrove opened this issue Apr 10, 2023 · 8 comments
Closed
Assignees
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@andygrove
Copy link
Member

Is your feature request related to a problem or challenge?

When we perform a grouped aggregate on a filtered input (such as with TPC-H q1), the filter operator performs two main tasks:

  • Evaluate the filter predicate (usually very fast)
  • Create new batches and copy over the filtered data (very slow if the filter is not very selective, as in q1)

I wonder if we would see a significant performance improvement if we could avoid creating the filtered batches in this case.

One idea would be to create the filtered batches by copying the arrays and mutating the validity bitmap to hide the rows that are filtered out. This would potentially change the semantics in some cases though so we can probably only do this under certain conditions.

Another idea is to update the aggregate logic to perform the predicate evaluation and then use the resulting bitmap to determine which rows to accumulate.

Describe the solution you'd like

I am working on a small prototype of this, outside of DataFusion, that I will share once the code is less embarrassing.

Describe alternatives you've considered

It would be worth seeing how other engines handle this.

Additional context

No response

@andygrove andygrove added enhancement New feature or request performance Make DataFusion faster labels Apr 10, 2023
@andygrove
Copy link
Member Author

@Dandandan @alamb wdyt?

@Dandandan
Copy link
Contributor

Dandandan commented Apr 10, 2023

One other source of inefficiency (and should be relatively easy to change) is that currently we output the entire RecordBatch, including the columns that are needed to evaluate the filter, while throwing those columns away in the following Projection.

See:
See #5436

@Dandandan
Copy link
Contributor

In q1 this would remove l_shipdate from the list of 7 columns to be copied, so I would expect a smaller improvement here.

@Dandandan
Copy link
Contributor

I remind seeing some issue/papers about a similar approach before to this, maybe those were shared by @alamb ?

@alamb
Copy link
Contributor

alamb commented Apr 10, 2023

One idea would be to create the filtered batches by copying the arrays and mutating the validity bitmap to hide the rows that are filtered out. This would potentially change the semantics in some cases though so we can probably only do this under certain conditions.

I think this basic idea is called a "selection vector" in the literature -- and as you hint at, it is not quite the same as the null mask as it has different semantics.

One approach might be to add another enum type to ColumnarValue that had an additional selection mask

https://github.com/apache/arrow-datafusion/blob/bbc71692fcd8dd9f3a9686162e59d092b37031f2/datafusion/expr/src/columnar_value.rs#L33

After @tustvold 's recent work in Arrow, I think this would just be a https://docs.rs/arrow/latest/arrow/buffer/struct.BooleanBuffer.html and should be straightforward to use.

To really take advantage of a selection vector, however, the underlying compute kernels need to be updated to know how to ignore the selection vectors (and likely only do so when they are sparse)

Another idea is to update the aggregate logic to perform the predicate evaluation and then use the resulting bitmap to determine which rows to accumulate.

While not exactly the same, @yjshen 's has been workking to add filtering to the aggregate input here, which is similar: #5868

@tustvold
Copy link
Contributor

apache/arrow-rs#3620 may be related

@tustvold
Copy link
Contributor

This would potentially change the semantics in some cases though so we can probably only do this under certain conditions

I'm curious about this, in what situation would the nullability or not of a non-selected value matter? It is just going to be discarded regardless? See apache/arrow-rs#3620

@andygrove
Copy link
Member Author

I did some research / prototyping on this idea.

I used this logic to create a selection vector from a predicate bit mask:

    let num_true = predicate.true_count();
    let mut b = Int32Builder::with_capacity(num_true);
    for i in 0..predicate.len() {
        if predicate.value(i) {
            b.append_value(i as i32);
        }
    }
    let offsets = b.finish();

The selection vector can then be used in a naive sum aggregate like this:

    let mut sum = 0;
    for i in 0..selection_vector.len() {
        sum += array.value(selection_vector.value(i) as usize);
    }

If we ignore the cost of creating the selection vector, then this approach is faster than filtering the batch first. However, the cost of creating the selection vector is similar to filtering a batch in some cases, such as when we are aggregating a single column.

I'm less excited about using selection vectors for aggregates at this point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

4 participants