Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize queries with LIMIT/LIMIT BY/ORDER BY for distributed with GROUP BY sharding_key #10373

Merged
merged 9 commits into from
Sep 3, 2020

Conversation

azat
Copy link
Collaborator

@azat azat commented Apr 20, 2020

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Optimize queries with LIMIT/LIMIT BY/ORDER BY for distributed with GROUP BY sharding_key (under optimize_skip_unused_shards and optimize_distributed_group_by_sharding_key)

Detailed description / Documentation draft:

Previous set of QueryProcessingStage does not allow to do this.

So this patch set introduces new WithMergeableStateAfterAggregation and use
it to optimize queries with GROUP BY sharding_key and:

  • LIMIT
  • LIMIT BY
  • ORDER BY

And right now it is still not supports:

  • WITH TOTALS (looks like it can be supported)
  • WITH ROLLUP (looks like it can be supported)
  • WITH CUBE
  • SETTINGS extremes=1 (looks like it can be supported)

HEAD:

  • 6c4748a63e7acde2cc3283d96ffec590aae1e724 (initial)
  • a2467f205f4ab120443afbb426cec3d5a506478d (with ABI breakage)
  • 348ef1256ea8fb8f61109c33bbdd28daf46bdc8e (worked)

Continuation of:

@blinkov blinkov added the pr-improvement Pull request with some product improvements label Apr 20, 2020
@azat azat changed the title [WIP] Avoid some stages for distributed queries whenever it is possible More optimizations for distributed with GROUP BY sharding_key Apr 22, 2020
@azat azat marked this pull request as draft April 22, 2020 19:42
@azat azat changed the title More optimizations for distributed with GROUP BY sharding_key Optimize queries with LIMIT/LIMIT BY/ORDER BY for distributed with GROUP BY sharding_key Apr 22, 2020
@azat azat marked this pull request as ready for review April 22, 2020 21:47
@azat azat marked this pull request as draft April 25, 2020 19:38
@azat azat force-pushed the dist-SELECT-optimization branch 2 times, most recently from 95f7af3 to d417da3 Compare April 25, 2020 21:04
@azat azat marked this pull request as ready for review April 25, 2020 21:05
@azat
Copy link
Collaborator Author

azat commented Apr 25, 2020

So to summarize this should solve some issues that had been introduced by the #10341 and adds support for LIMIT/LIMIT BY/ORDER BY

@azat azat marked this pull request as draft April 26, 2020 00:19
@4ertus2 4ertus2 self-assigned this May 25, 2020
@4ertus2 4ertus2 removed their assignment Jul 9, 2020
@azat azat force-pushed the dist-SELECT-optimization branch 3 times, most recently from 6c4748a to a2467f2 Compare August 15, 2020 17:02
@azat azat marked this pull request as ready for review August 15, 2020 17:04
@azat
Copy link
Collaborator Author

azat commented Aug 15, 2020

AST fuzzer — Assertion `(n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) && (n <= static_cast<ssize_t>(this->size()))' failed

#13790

@azat
Copy link
Collaborator Author

azat commented Aug 16, 2020

Performance — Timeout :(

@akuzm can you please take a look is it false-positive or not? (or maybe I should just restart the build? by i.e. rebasing against upstream/master)

@azat
Copy link
Collaborator Author

azat commented Aug 17, 2020

@akuzm can you please take a look is it false-positive or not? (or maybe I should just restart the build? by i.e. rebasing against upstream/master)

Never mind, the problem is in the protocol ABI breakage (queries like select * from system.numbers limit 1 via python clickhouse client works forever, because of the wrong processing stage)

@azat
Copy link
Collaborator Author

azat commented Aug 18, 2020

@4ertus2 this is ready, can you take a look?

@azat
Copy link
Collaborator Author

azat commented Aug 18, 2020

Functional stateless tests (debug) — fail: 1, passed: 2168, skipped: 14

The 00956_sensitive_data_masking is flacky (#13867 )

@azat
Copy link
Collaborator Author

azat commented Aug 27, 2020

@4ertus2 @alexey-milovidov friendly ping, can some one take a look?

@azat
Copy link
Collaborator Author

azat commented Aug 27, 2020

Functional stateless tests (unbundled) — fail: 1, passed: 2156, skipped: 50

2020-08-27 23:43:34 01085_max_distributed_connections:                                      [ FAIL ] 2.00 sec. - return code 124
2020-08-27 23:43:34 , result:
2020-08-27 23:43:34 
2020-08-27 23:43:34 0
2020-08-27 23:43:34 0
2020-08-27 23:43:34 

So here we got the result already, but timeout triggers (timeout exists with 124 if the timeout reached)
Fix this by increasing the delay (as long as number of remote streams) - #14199

@azat
Copy link
Collaborator Author

azat commented Aug 31, 2020

@alexey-milovidov any thoughts on this?

Process query until the stage where the aggregate functions were
calculated and finalized.

It will be used for optimize_distributed_group_by_sharding_key.

v2: fix aliases
v3: Fix protocol ABI breakage due to WithMergeableStateAfterAggregation
    Conditions >= for QueryProcessingStage::Enum has been verified, and they
    are ok (in InterpreterSelectQuery).
…OUP BY sharding_key

Previous set of QueryProcessingStage does not allow to do this.
But after WithMergeableStateAfterAggregation had been introduced the
following queries can be optimized too under
optimize_distributed_group_by_sharding_key:
- GROUP BY sharding_key LIMIT
- GROUP BY sharding_key LIMIT BY
- GROUP BY sharding_key ORDER BY

And right now it is still not supports:
- WITH TOTALS (looks like it can be supported)
- WITH ROLLUP (looks like it can be supported)
- WITH CUBE
- SETTINGS extremes=1 (looks like it can be supported)
But will be implemented separatelly.

vX: fixes
v2: fix WITH *
v3: fix extremes
v4: fix LIMIT OFFSET (and make a little bit cleaner)
v5: fix HAVING
v6: fix ORDER BY
v7: rebase against 20.7
v8: move out WithMergeableStateAfterAggregation
v9: add optimize_distributed_group_by_sharding_key into test names
…merge (convert to UInt64)

Possible values:
- 1 - Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.
- 2 - same as 1 but also apply ORDER BY and LIMIT stages
@azat
Copy link
Collaborator Author

azat commented Sep 2, 2020

Yandex synchronization check (only for Yandex employees)

rebased in attempt to fix it

@robot-clickhouse robot-clickhouse added pr-performance Pull request with some performance improvements and removed pr-improvement Pull request with some product improvements labels Sep 3, 2020
drop table if exists dist_01247;
drop table if exists dist_layer_01247;

select 'Distributed(rand)-over-Distributed(rand)';
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand());
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
select count(), * from dist_01247 group by number;
drop table dist_01247;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand this change, will return DROP queries...

Copy link
Collaborator Author

@azat azat Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When test DROP TABLE at the end you cannot attach with the client and reproduce the failure, that's why I'm trying to keep them, so that said that this is just a cosmetic thing.
@alexey-milovidov Is there some unspoken rule that said that tables should be removed after the test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for convenience when you run .sql files manually and don't want leftovers.

@alexey-milovidov alexey-milovidov self-assigned this Sep 3, 2020
Copy link
Member

@alexey-milovidov alexey-milovidov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

I understand the logic. It looks a bit fragile because StorageDistributed should maintain its awareness of all possible pipeline steps after GROUP BY. It is Ok but we need to write randomized combinatoral tests for correctness of Distributed tables on various types of clusters.

@@ -1,2 +1,42 @@
SELECT count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) SETTINGS distributed_group_by_no_merge = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd better add new test instead of modifying an old simple test case.

@azat
Copy link
Collaborator Author

azat commented Sep 3, 2020

Performance — 4 faster, 5 slower, 449 unstable

FWIW here is previous test report (before rebase against upstream/master) - https://clickhouse-test-reports.s3.yandex.net/10373/9e4aa5954e45865d7756e1a6a174d7c1cf0ccfa0/performance_comparison/report.html#fail1
And it does not have those slow downs.

@alexey-milovidov alexey-milovidov merged commit 4f9df21 into ClickHouse:master Sep 3, 2020
@azat azat deleted the dist-SELECT-optimization branch September 3, 2020 22:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-performance Pull request with some performance improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants