Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bad performance on wide tables (1000+ columns) #7698

Open
karlovnv opened this issue Sep 29, 2023 · 30 comments
Open

Bad performance on wide tables (1000+ columns) #7698

karlovnv opened this issue Sep 29, 2023 · 30 comments
Labels
bug Something isn't working

Comments

@karlovnv
Copy link

karlovnv commented Sep 29, 2023

Describe the bug

I'am testing DataFusion for using it in a system which has several thousand columns and billions of rows.
I'm excited about the flexibility and possibilities this technology provides.

The problems we faced with:

  1. Optimization of the logical plan works slowly because it has to copy the whole schema in some rules.
    We workarounded it with prepared queries (we cache parametrized logical plan)
  2. Creation of physical plan consume up to 35% on CPU, which is more than it's execution (we use several hundreds of aggregation functions and DF shows pretty good execution time)

Some investigation on that showed, that there a lot of string comparisons (take a look at flamegraph)

  29 %      datafusion_physical_expr::planner::create_physical_expr 
  28.5 %    --> datafusion_common::dfschema::DFSchema::index_of_column
  28.5 %    -- --> datafusion_common::dfschema::DFSchema::index_of_column_by_name
   7.4 %    -- -- --> __memcmp_sse4_1
  14.6 %    -- -- --> datafusion_common::table_reference::TableReference::resolved_eq
   6.8 %    -- -- -- --> __memcmp_sse4_1

photo_2023-09-29_14-29-16

Now algorithm has O(N^2) complexity (N in iterating all the columns in
datafusion_common::dfschema::DFSchema::index_of_column_by_name
and N in datafusion_common::table_reference::TableReference::resolved_eq).

https://github.com/apache/arrow-datafusion/blob/22d03c127e7c5e56cf97ae33eb4446d5b7022eaa/datafusion/common/src/dfschema.rs#L211

Some ideas to resolve:

  • Use hashmap or btree in DFSchema instead of list (decrease complexity of resolving column index by it's name)
  • Implement parametrization of Physical plan and prepared physical plans (in order to enable caching it the same as prepared logical plan)

Thank you for developing a such great tool!

To Reproduce

It's hard to extract some code from the project, but I will try to build simple repro

Expected behavior

Creation of physical plan spent much less time in CPU than it's execution

Additional context

No response

@alamb
Copy link
Contributor

alamb commented Sep 29, 2023

Thank you for the report @karlovnv -- I agree with your analysis and we are indeed tracking various ways to make DataFusion's planing faster in #5637

Another of the performance issues I think is related to the ones you have already identified, which is related to the representation of Schemas and name resolution (often error strings are created and then ignored, for example)

If you (or anyone else) has any time to help with this project it would be most appreciated

@karlovnv
Copy link
Author

@alamb , thank you for reply!
I will continue posting about bottlenecks in DF (for instance I've noticed degradation DF performance due to aggressive concurrency in tokio scheduller and workarounded it by using multiple tokio runtimes; tested DF pinned to NUMA and so on)

@karlovnv
Copy link
Author

various ways to make DataFusion's planing faster

Also it's good to consider implementing prepared physical plans (with parametrization) it will add an ability to cache them

@maruschin
Copy link
Contributor

maruschin commented Oct 18, 2023

Good issue, I do it. We can't use HashMap because we need to preserve the insertion order.
Something like IndexMap would fit here: https://docs.rs/indexmap/latest/indexmap/
Or store list index in HashMap.

@alamb
Copy link
Contributor

alamb commented Oct 18, 2023

Thanks @maruschin

I wonder if this is a good time to take a step back and see if we could make DF Schema easier to use in general -- I think it is already pretty complicated and optimizing certain methods will likely make it more so.

For example, I wonder if we making the index map will make less complex queries more so, or if we need to take more care to reuse DFSchema

Thus I suggest maybe sketching out the type of change you have in mind in a draft PR that we can discuss prior to spending all the time to get the PR polished up.

@maruschin
Copy link
Contributor

@alamb Add some thoughts (#7895) that appeared during working on #7878. I’ll comment code in PR later.

Main things: create distinguish between qualified and unqualified column, and don't allow qualified name as name in column.

@alamb
Copy link
Contributor

alamb commented Oct 23, 2023

@maruschin -- it would help in general to have some idea of thewhy (rationale) behind #7895 -- presumably it is because it makes something easier / less error prone, but I am sorry I don't immediately understand

@maruschin
Copy link
Contributor

maruschin commented Oct 24, 2023

@alamb, my intent was to guarantee that in name field we have unqualified name, like: "col1", "col.1`", not "table.col1".
And prevent the possibility of initializing Column with qualifier and qualified name.
After a few days I think #7895 is too verbose.

@karlovnv
Copy link
Author

@alamb take a look at the PR #7870 please, where @oleggator has implemented BTree instead of list. It's improved physical plan construction x2 times

@maruschin
Copy link
Contributor

maruschin commented Oct 25, 2023

@karlovnv could you please check performance of #7878.

I'm curious how different the performance is between my searching for candidates by field name followed by filtering and searching in a BTree.

@alamb
Copy link
Contributor

alamb commented Oct 26, 2023

I have reviewed #7870 and #7878. Thank you for your work @maruschin and @karlovnv

Here are my thoughts:

  1. I think some sort of performance benchmark results to know how much it is helps / hurts in in other areas (like how much longer it takes to create one). Can someone please create some benchmarks, similar to scalar.rs for index_of_column_by_name and schema creation?
  2. I think it is likely to be too expensive to build a HashMap with each DFSchema (as it is creating / copying owned strings) if that never is read -- I think it should be built on demand, as suggested by @crepererum at https://github.com/apache/arrow-datafusion/pull/7870/files#r1372786446
  3. I have been long bothered by how expensive it is to create a DFSchema. I have some ideas on how to make it faster to construct -- which might not help this usecase directly I think it might help planning in general. I will take a crack at working on this idea

@oleggator
Copy link

Made the benchmark.
Here are the results.

@zeodtr
Copy link

zeodtr commented Nov 13, 2023

Hi,
I think DFField::qualified_name() is another performance bottleneck. For a simple SELECT many-columns query for a table that has 3617 columns, the function took 73% of the total planning time. The problem is that the function calls format! (which is expensive) every time it is called when it has a qualifier.
When I changed the DFField to have a member variable that has precomputed qualified_name and to return its clone() when qualified_name() is called, the planning became 2~3 times faster.
But still, clone()ing the precomputed qualified_name takes a significant percentage (~30%? I didn't measure it with the same condition) of the total planning time.
(I've tested it with DataFusion 31.0.0)

@zeodtr
Copy link

zeodtr commented Nov 13, 2023

I've applied the precomputed qualified_name I've mentioned above and the btree draft by @oleggator to DataFusion 31.0.0, then ran valgrind with a simple SELECT many-columns query for a table that has 3617 columns.

The following attachment is the resulting call graph in SVG format.
(xdb_main is my project's executable name which uses DataFusion as a library.)
Still clone()ing precomputed qualified_name takes about 47% of the total planning time (logical plan building + optimizing) while field_with_qualified_name now takes only a negligible percentage.

out 675915

@Dandandan
Copy link
Contributor

You can try to put it behind a Arc to make the cloning faster?

@zeodtr
Copy link

zeodtr commented Nov 13, 2023

You can try to put it behind a Arc to make the cloning faster?

I've additionally changed the type of the precomputed qualified_name from String to Arc<String> after the above tests. Total planning time reduced to 75% of the previous iteration. But I think it is still far from optimal.

@karlovnv
Copy link
Author

it is still far from optimal

I think it's a good idea to cache instances of DFSchema (and Arrow Schema as well). Tho most flexible way is to implement user defined SchemaCacheProvider (let users of datafusion decide how cache schemas).
For instance, in our case schema is being changed many times rarely then data and we can cache it for a long period of time

@karlovnv
Copy link
Author

Another thought is to use cache of physical plan (I tested serialized into protobuf optimized physical plan as a cache and it leads to increasing of performance dramatically)

@alamb
Copy link
Contributor

alamb commented Nov 13, 2023

For instance, in our case schema is being changed many times rarely then data and we can cache it for a long period of time

We do something similar to this in IOx (cache schemas that we know don't change rather than recomputing them)

It is my opinion that in order to make DFSchema behave well and not be a bottleneck we will need to more fundamentally restructure how it works.

Right now the amount of copying required is substantial as has been pointed out several times on this thread. I think with sufficient diligence we could avoid almost all copies when manipulating DFSchema and then the extra complexity of adding a cache or other techniques would become unnecessary.

I've additionally changed the type of the precomputed qualified_name from String to Arc after the above tests. Total planning time reduced to 75% of the previous iteration. But I think it is still far from optimal.

I think this is a great idea. I think optimizing for the case of the same, reused qualifier, is a very good idea.

What do people think about the approach described on #7944? I (admittedly biasedly) think that approach would eliminate almost all allocations (instead it would be ref count updates). We can extend it to incorporate ideas like pre-caching qualified names and hash sets for column checks, and I think it could be pretty fast

@zeodtr
Copy link

zeodtr commented Nov 17, 2023

I've tried to optimize the logical planning and optimization routines.
As a result, for a wide aggregation query, the logical planning + optimization time was reduced from 49 seconds to 0.8 seconds.
The details are as follows:

  • The version of DataFusion: 31.0.0 (with small modifications)
  • The query
    • a SELECT query
    • ~3000 aggregation functions in the SELECT clause
    • FROM clause has one table that has 3617 columns
    • 16 GROUP BY columns

In the following, each optimization step is accumulated. Elapsed times are reduced accordingly.
No steps require deep knowledge for plan building.
All time values are in milliseconds.
elapsed time after optimization includes elapsed time after creating a logical plan. So, elapsed time after optimization is logical planning time + optimization time.

Note that the following optimization steps were not heavily tested.

No code change: original timing

  • elapsed time after creating a logical plan: 11468
  • elapsed time after optimization: 48734

Optimization 1: In DFField, precompute qualifier_name in new...() functions, set it to a member variable, and use it in qualified_name()

  • elapsed time after creating a logical plan: 10571
  • elapsed time after optimization: 29375

Optimization 2: Apply #7870 (Use btree to search fields in DFSchema)

  • elapsed time after creating a logical plan: 2429
  • elapsed time after optimization: 20307

Optimization 3: Change DFField's qualified_name() to return Arc<String> instead of String

And change other codes accordingly, to avoid string clone()ing.

  • elapsed time after creating a logical plan: 2285
  • elapsed time after optimization: 15503

Optimization 4: precompute using_columns in logical_plan::builder::project()

Like this:

    let using_columns = plan.using_columns()?;
    for e in expr {
        let e = e.into();
        match e {
            Expr::Wildcard => {
                projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
            }
            Expr::QualifiedWildcard { ref qualifier } => projected_expr
                .extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
            _ => projected_expr.push(columnize_expr(
                normalize_col_with_using_columns(e, &plan, &using_columns)?,
                input_schema,
            )),
        }
    }

And implement expr_rewriter::normalize_col_with_using_columns() and logical_plan::builder::normalize_with_using_columns() that receives using_columns as an argument.

  • elapsed time after creating a logical plan: 1491
  • elapsed time after optimization: 14376

Optimization 5: In DFSchema::merge() check duplicated_field with bool-based functions instead of Error-based functions

Like this:

            let duplicated_field = match field.qualifier() {
                Some(q) => self.has_field_with_qualified_name(q, field.name()),
                // for unqualified columns, check as unqualified name
                None => self.has_field_with_unqualified_name(field.name()),
            };

And implement has_field_with_unqualified_name() and has_field_with_qualified_name() which returns bool without involving Error. Since it is not an error condition, receiving bool is more appropriate anyway.
Additionally get_index_of_column_by_name() which returns Option<usize> instead of Result<Option<usize>> for the above functions.
field_not_found() and unqualified_field_not_found() are heavy when used for a wide table since they return all valid field names in the Error. So they must be avoided when not necessary.

  • elapsed time after creating a logical plan: 899
  • elapsed time after optimization: 6538

Optimization 6: In expr::utils::columnize_expr() use bool-based functions instead of Error-based functions

Similar to optimization 5.
Like this:

        _ => match e.display_name() {
            Ok(name) => match input_schema.get_field_with_unqualified_name(&name) {
                Some(field) => Expr::Column(field.qualified_column()),
                // expression not provided as input, do not convert to a column reference
                None => e,
            },
            Err(_) => e,
        },

And implement get_field_with_unqualified_name() in DFSchema which returns Option<&DFField> instead of Result<&DFField>. Since it is not an error condition, receiving Option is more appropriate anyway.

  • elapsed time after creating a logical plan: 442
  • elapsed time after optimization: 6033

Optimization 7: Use IndexSet in expr::utils::find_exprs_in_exprs()

Like this:

fn find_exprs_in_exprs<F>(exprs: &[Expr], test_fn: &F) -> Vec<Expr>
where
    F: Fn(&Expr) -> bool,
{
    exprs
        .iter()
        .flat_map(|expr| find_exprs_in_expr(expr, test_fn))
        .fold(IndexSet::new(), |mut acc, expr| {
            acc.insert(expr);
            acc
        })
        .into_iter()
        .collect()
}

IndexSet is in https://docs.rs/indexmap/latest/indexmap/ crate.

  • elapsed time after creating a logical plan: 391
  • elapsed time after optimization: 5889

Optimization 8: In logical_plan::plan::calc_func_dependencies_for_project() return early when there is no functional dependencies

Like this:

fn calc_func_dependencies_for_project(
    exprs: &[Expr],
    input: &LogicalPlan,
) -> Result<FunctionalDependencies> {
    let input_schema = input.schema();
    if !input_schema.has_functional_dependencies() {
        return Ok(FunctionalDependencies::empty());
    }

And implement DFSchema::has_functional_dependencies() and FunctionalDependencies::is_empty() for it.
calc_func_dependencies_for_project() does heavy operation to get proj_indices even before it calls project_functional_dependencies() which is useless when there is no functional dependency (which is common since functional dependency is rare in my opinion).
I think that functional dependency-related functions require arguments that require heavy operation even before checking whether they are required or not. So, it would be great if functional dependency-related functions receive FnOnce instead of precomputed data, to skip heavy operations if not required.

  • elapsed time after creating a logical plan: 219
  • elapsed time after optimization: 845

Now, the resulting 0.8 seconds is acceptable for me.

@alamb
Copy link
Contributor

alamb commented Nov 17, 2023

Thank you for this very detailed report @zeodtr -- this sound great. Several of the optimizations you describe above seem like they could be pulled out into small PRs for DataFusion.

How do you suggest we proceed to make progress here?

@zeodtr
Copy link

zeodtr commented Nov 20, 2023

@alamb I may not be able to make PRs myself.
Maybe other contributors can make PRs based on my report.
And I'd like to hear other's opinions and test results about my optimizations.
Some optimizations may not be considered appropriate - like the one that adds external dependency.
And since I've investigated the 'wide aggregation' (and 'simple wide selection' although not reported above) code path only, there may be other simple performance problems in other code paths like the ones I've found.
Anyway, I'm planning to apply all the optimizations I've shown to my version of DataFusion (as long as they are correct).

@zeodtr
Copy link

zeodtr commented Nov 20, 2023

In my (humble, may be wrong) opinion, DataFusion planning code may have the following performance problems.

  1. LogicalPlan (and maybe other modules) does the same operation over and over again without any precomputing or caching in a single planning session. And LogicalPlan cannot cache anything and match each time it is called since it is an enum. In my opinion, it would be better to make it a trait and each concrete plan node implements the trait.
  2. Uses format! as if it has a negligible cost (which is not when it's called tens of thousands of times).
  3. Uses iterators as if they have negligible cost (which is not when the number of elements is not small and the operations in the iterator are not cheap).
  4. Executes operations that can be heavy before it is determined to be necessary (functional dependency case in my report).
  5. Assumes the column list is short (which is not sometimes)

It's just my humble opinion. (I don't have deep knowledge of plan building)

@alamb
Copy link
Contributor

alamb commented Nov 20, 2023

LogicalPlan (and maybe other modules) does the same operation over and over again without any precomputing or caching in a single planning session. And LogicalPlan cannot cache anything and match each time it is called since it is an enum. In my opinion, it would be better to make it a trait and each concrete plan node implements the trait.

I think there are tradeoffs of each approach for sure. We have had some discussions about the various pros and cons in the past that might interest you:

It's just my humble opinion. (I don't have deep knowledge of plan building)

In general, I think you have identified the core improvements necessary to support faster planning with complex schemas.

@zeodtr
Copy link

zeodtr commented Nov 21, 2023

@alamb I've read the discussions you shared. Thank you.
Since I'm not quite proficient at Rust, I might not be able to add a useful comment there. However, I do have some general feelings:

  • I noticed that the discussions primarily focus on coding style and convenience rather than the performance of the code.
  • The mention of a deep comparison (between Exprs) raises concerns regarding potential performance issues.
  • I'm uncertain about the necessity of unifying the coding style between trait and enum.

Thank you.

@karlovnv
Copy link
Author

@alamb Hi! Could you please let us know if any work is planned here? We noticed that performance of DaraFusion in case of wide tables slow down significantly from version to version causes us to stay at 31(

@alamb
Copy link
Contributor

alamb commented Feb 26, 2024

@alamb Hi! Could you please let us know if any work is planned here? We noticed that performance of DaraFusion in case of wide tables slow down significantly from version to version causes us to stay at 31(

Hi @karlovnv -- I would say we are making some progress -- most of the work is tracked in the parent epic, #5637 and there have been some improvements recently there.

I think the best help you could give is is to ensure that the planning benchmarks we have added (see https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/benches/sql_planner.rs) accurately reflect what you are doing.

If there are other types of queries / schemas you are using that are not reflected there a PR / examples would be most appreciated as we continue to work to improve things

@karlovnv
Copy link
Author

@alamb we tested the same perf test on 37.1 and it seems that now 99% of request time is spent on planning and optimizing (creating and optimizing of logical plan, planner, creating and optimizing of logical plan)
We have 1000 columns.

It seems that it's a huge degradation since 31 (now we use 31 as it is still much more performant on planning)

image

@alamb
Copy link
Contributor

alamb commented May 15, 2024

Thank you for the report @karlovnv. Any benchmarks you are able to share / contribute would be most helpful for improving the code.

With the benchmarks we have in place, I am happy to report we see 10x faster planning in 38.0.0 (just released) compared to 37.0.0 for 1000 columns. Not sure if you have tried that version yet.

Details are here #5637 (comment)

@karlovnv
Copy link
Author

Thank you for your reply @alamb! We'll check it on 38 and share results.

This particular example is synthetical as we implemented it using pure memory tables without any external dependencies.

In real project (we are developing an in-memory columnar database for antifraud and scoring and using DF as a query engine) we faced with similar perf issues (31 vs 37.1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants