Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: nexmark q15 #7350

Closed
Tracked by #7289
lmatz opened this issue Jan 12, 2023 · 1 comment
Closed
Tracked by #7289

perf: nexmark q15 #7350

lmatz opened this issue Jan 12, 2023 · 1 comment
Assignees
Milestone

Comments

@lmatz
Copy link
Contributor

lmatz commented Jan 12, 2023

query:

CREATE MATERIALIZED VIEW nexmark_q15 AS
SELECT
     to_char(date_time, 'YYYY-MM-DD') as "day",
     count(*) AS total_bids,
     count(*) filter (where price < 10000) AS rank1_bids,
     count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
     count(*) filter (where price >= 1000000) AS rank3_bids,
     count(distinct bidder) AS total_bidders,
     count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
     count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
     count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
     count(distinct auction) AS total_auctions,
     count(distinct auction) filter (where price < 10000) AS rank1_auctions,
     count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
     count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY to_char(date_time, 'YYYY-MM-DD');

plan:

 StreamMaterialize { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], pk_columns: [day] }
 └─StreamProject { exprs: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), sum0(count) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 1000000:Int32))) filter((flag = 0:Int64)), count(Field(bid, 1:Int32)) filter((flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 0:Int32)) filter((flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
   └─StreamHashAgg { group_key: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar)], aggs: [count, sum0(count) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 1000000:Int32))) filter((flag = 0:Int64)), count(Field(bid, 1:Int32)) filter((flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 0:Int32)) filter((flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
     └─StreamExchange { dist: HashShard(ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar)) }
       └─StreamProject { exprs: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32), Field(bid, 1:Int32), Field(bid, 1:Int32), Field(bid, 1:Int32), Field(bid, 0:Int32), Field(bid, 0:Int32), Field(bid, 0:Int32), Field(bid, 0:Int32), flag, count, count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32))] }
         └─StreamAppendOnlyHashAgg { group_key: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32), Field(bid, 0:Int32), flag], aggs: [count, count, count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32))] }
           └─StreamExchange { dist: HashShard(ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32), Field(bid, 0:Int32), flag) }
             └─StreamExpand { column_subsets: [[ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar)], [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32)], [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 0:Int32)]] }
               └─StreamProject { exprs: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 2:Int32), Field(bid, 1:Int32), Field(bid, 0:Int32), _row_id] }
                 └─StreamFilter { predicate: (event_type = 2:Int32) }
                   └─StreamRowIdGen { row_id_index: 4 }
                     └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }

1/13:
The same as #7271 , requires changing to shuffle aggregation instead of two-phase aggregation.
Check again after it is done.

@github-actions github-actions bot added this to the release-0.1.16 milestone Jan 12, 2023
@stdrc stdrc modified the milestones: release-0.1.17, release-0.1.18 Feb 15, 2023
mergify bot pushed a commit that referenced this issue Feb 24, 2023
Previously in #7797, distinct agg support is added (without cache) but not enabled. This PR enables it by disable 2-phase rewrite rule for streaming distinct agg calls, and also adds an LRU cache in the deduplicater.

This will close #7682, and possibly resolve or at least mitigate the performance issue in #7350 and #7271.

Approved-By: st1page
@lmatz
Copy link
Contributor Author

lmatz commented Mar 2, 2023

Now 3/2:

RW:

 StreamMaterialize { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], pk_columns: [day], pk_conflict: "no check" }
 └─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [count, count filter(($expr2 < 10000:Int32)), count filter(($expr2 >= 10000:Int32) AND ($expr2 < 1000000:Int32)), count filter(($expr2 >= 1000000:Int32)), count(distinct $expr3), count(distinct $expr3) filter(($expr2 < 10000:Int32)), count(distinct $expr3) filter(($expr2 >= 10000:Int32) AND ($expr2 < 1000000:Int32)), count(distinct $expr3) filter(($expr2 >= 1000000:Int32)), count(distinct $expr4), count(distinct $expr4) filter(($expr2 < 10000:Int32)), count(distinct $expr4) filter(($expr2 >= 10000:Int32) AND ($expr2 < 1000000:Int32)), count(distinct $expr4) filter(($expr2 >= 1000000:Int32))] }
   └─StreamExchange { dist: HashShard($expr1) }
     └─StreamProject { exprs: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar) as $expr1, Field(bid, 2:Int32) as $expr2, Field(bid, 1:Int32) as $expr3, Field(bid, 0:Int32) as $expr4, _row_id] }
       └─StreamFilter { predicate: (event_type = 2:Int32) }
         └─StreamRowIdGen { row_id_index: 4 }
           └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(7 rows)
 Fragment 0
   StreamMaterialize { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], pk_columns: [day], pk_conflict: "no check" }
       materialized table: 4294967294
     StreamAppendOnlyHashAgg { group_key: [$expr174], aggs: [count, count filter(($expr175 < 10000:Int32)), count filter(($expr175 >= 10000:Int32) AND ($expr175 < 1000000:Int32)), count filter(($expr175 >= 1000000:Int32)), count(distinct $expr176), count(distinct $expr176) filter(($expr175 < 10000:Int32)), count(distinct $expr176) filter(($expr175 >= 10000:Int32) AND ($expr175 < 1000000:Int32)), count(distinct $expr176) filter(($expr175 >= 1000000:Int32)), count(distinct $expr177), count(distinct $expr177) filter(($expr175 < 10000:Int32)), count(distinct $expr177) filter(($expr175 >= 10000:Int32) AND ($expr175 < 1000000:Int32)), count(distinct $expr177) filter(($expr175 >= 1000000:Int32))] }
         result table: 0, state tables: []
       StreamExchange Hash([0]) from 1
 
 Fragment 1
   StreamProject { exprs: [ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar) as $expr174, Field(bid, 2:Int32) as $expr175, Field(bid, 1:Int32) as $expr176, Field(bid, 0:Int32) as $expr177, _row_id] }
     StreamFilter { predicate: (event_type = 2:Int32) }
       StreamRowIdGen { row_id_index: 4 }
         StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
             source state table: 3
 
  Table 0 { columns: [$expr174, count, count filter(($expr175 < 10000:Int32)), count filter(($expr175 >= 10000:Int32) AND ($expr175 < 1000000:Int32)), count filter(($expr175 >= 1000000:Int32)), count(distinct $expr176), count(distinct $expr176) filter(($expr175 < 10000:Int32)), count(distinct $expr176) filter(($expr175 >= 10000:Int32) AND ($expr175 < 1000000:Int32)), count(distinct $expr176) filter(($expr175 >= 1000000:Int32)), count(distinct $expr177), count(distinct $expr177) filter(($expr175 < 10000:Int32)), count(distinct $expr177) filter(($expr175 >= 10000:Int32) AND ($expr175 < 1000000:Int32)), count(distinct $expr177) filter(($expr175 >= 1000000:Int32))], primary key: [$0 ASC], value indices: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0] }
  Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [] }
  Table 4294967294 { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0] }
(17 rows)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants