Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: nexmark q14 #8503

Closed
Tracked by #7289
fuyufjh opened this issue Mar 13, 2023 · 10 comments
Closed
Tracked by #7289

perf: nexmark q14 #8503

fuyufjh opened this issue Mar 13, 2023 · 10 comments
Labels
good first issue Good for newcomers help wanted Issues that need help from contributors type/perf
Milestone

Comments

@fuyufjh
Copy link
Member

fuyufjh commented Mar 13, 2023

Query:

CREATE MATERIALIZED VIEW nexmark_q14 AS
SELECT
  auction,
  bidder,
  0.908 * price as price,
  CASE
    WHEN
      extract(hour from date_time) >= 8 AND
      extract(hour from date_time) <= 18
    THEN 'dayTime'
    WHEN
      extract(hour from date_time) <= 6 OR
      extract(hour from date_time) >= 20
    THEN 'nightTime'
    ELSE 'otherTime'
  END AS bidTimeType,
  date_time
  -- extra
  -- TODO: count_char is an UDF, add it back when we support similar functionality.
  -- https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/java/com/github/nexmark/flink/udf/CountChar.java
  -- count_char(extra, 'c') AS c_counts
FROM bid
WHERE 0.908 * price > 1000000 AND 0.908 * price < 50000000;

RW:

 StreamMaterialize { columns: [auction, bidder, price, bidtimetype, date_time, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" }
 └─StreamExchange { dist: HashShard(_row_id) }
   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, (0.908:Decimal * Field(bid, 2:Int32)) as $expr3, Case(((Extract('HOUR':Varchar, Field(bid, 5:Int32)) >= 8:Int32) AND (Extract('HOUR':Varchar, Field(bid, 5:Int32)) <= 18:Int32)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, Field(bid, 5:Int32)) <= 6:Int32) OR (Extract('HOUR':Varchar, Field(bid, 5:Int32)) >= 20:Int32)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr4, Field(bid, 5:Int32) as $expr5, _row_id] }
     └─StreamFilter { predicate: ((0.908:Decimal * Field(bid, 2:Int32)) > 1000000:Int32) AND ((0.908:Decimal * Field(bid, 2:Int32)) < 50000000:Int32) AND (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 4 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }

Flink:

== Optimized Physical Plan ==
Calc(select=[bid.auction AS auction, bid.bidder AS bidder, *(0.908:DECIMAL(4, 3), bid.price) AS price, CASE(AND(>=(EXTRACT(FLAG(HOUR), CAST(dateTime AS TIMESTAMP(3))), 8), <=(EXTRACT(FLAG(HOUR), CAST(dateTime AS TIMESTAMP(3))), 18)), _UTF-16LE'dayTime':VARCHAR(9) CHARACTER SET "UTF-16LE", OR(<=(EXTRACT(FLAG(HOUR), CAST(dateTime AS TIMESTAMP(3))), 6), >=(EXTRACT(FLAG(HOUR), CAST(dateTime AS TIMESTAMP(3))), 20)), _UTF-16LE'nightTime':VARCHAR(9) CHARACTER SET "UTF-16LE", _UTF-16LE'otherTime':VARCHAR(9) CHARACTER SET "UTF-16LE") AS bidTimeType, dateTime, bid.extra AS extra], where=[AND(=(event_type, 2), >(*(0.908:DECIMAL(4, 3), bid.price), 1000000), <(*(0.908:DECIMAL(4, 3), bid.price), 50000000))])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
   +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
      +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Calc(select=[bid.auction AS auction, bid.bidder AS bidder, (0.908 * bid.price) AS price, CASE(((EXTRACT(HOUR, CAST(dateTime AS TIMESTAMP(3))) >= 8) AND (EXTRACT(HOUR, CAST(dateTime AS TIMESTAMP(3))) <= 18)), 'dayTime', ((EXTRACT(HOUR, CAST(dateTime AS TIMESTAMP(3))) <= 6) OR (EXTRACT(HOUR, CAST(dateTime AS TIMESTAMP(3))) >= 20)), 'nightTime', 'otherTime') AS bidTimeType, dateTime, bid.extra AS extra], where=[((event_type = 2) AND ((0.908 * bid.price) > 1000000) AND ((0.908 * bid.price) < 50000000))])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
   +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
      +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
@fuyufjh
Copy link
Member Author

fuyufjh commented Mar 13, 2023

We are using blackhole sink for performance test, right?

I am not sure whether it is the bottle neck, but I guess Flink may not have this Exchange

└─StreamExchange { dist: HashShard(_row_id) }

which is tracked by #7377 cc. @shanicky

@lmatz
Copy link
Contributor

lmatz commented Mar 13, 2023

RW Blackhole:

-----------------
 StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time] }
 └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4, $expr5] }
   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, (0.908:Decimal * Field(bid, 2:Int32)) as $expr3, Case(((Extract('HOUR':Varchar, Field(bid, 5:Int32)) >= 8:Int32) AND (Extract('HOUR':Varchar, Field(bid, 5:Int32)) <= 18:Int32)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, Field(bid, 5:Int32)) <= 6:Int32) OR (Extract('HOUR':Varchar, Field(bid, 5:Int32)) >= 20:Int32)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr4, Field(bid, 5:Int32) as $expr5, _row_id] }
     └─StreamFilter { predicate: ((0.908:Decimal * Field(bid, 2:Int32)) > 1000000:Int32) AND ((0.908:Decimal * Field(bid, 2:Int32)) < 50000000:Int32) AND (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 4 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(6 rows)

@BugenZhao
Copy link
Member

BugenZhao commented Mar 13, 2023

I guess this might be caused by a similar issue to #5130. We need to evaluate a lot of 'HOUR' constants for calling EXTRACT.

fn build_extract_expr(
ret: DataType,
l: BoxedExpression,
r: BoxedExpression,
) -> Result<BoxedExpression> {
let expr: BoxedExpression =
match r.return_type() {
DataType::Date => Box::new(BinaryExpression::<
Utf8Array,
NaiveDateArray,
DecimalArray,
_,
>::new(l, r, ret, extract_from_date)),
DataType::Timestamp => Box::new(BinaryExpression::<
Utf8Array,
NaiveDateTimeArray,
DecimalArray,
_,
>::new(l, r, ret, extract_from_timestamp)),
DataType::Timestamptz => Box::new(BinaryExpression::<
Utf8Array,
I64Array,
DecimalArray,
_,
>::new(
l, r, ret, extract_from_timestamptz
)),
DataType::Time => Box::new(BinaryExpression::<
Utf8Array,
NaiveTimeArray,
DecimalArray,
_,
>::new(l, r, ret, extract_from_time)),
_ => {
return Err(ExprError::UnsupportedFunction(format!(
"Extract ( {:?} ) is not supported yet!",
r.return_type()
)))
}
};
Ok(expr)
}

@st1page
Copy link
Contributor

st1page commented Mar 14, 2023

RW Blackhole:

-----------------
 StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time] }
 └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4, $expr5] }
   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, (0.908:Decimal * Field(bid, 2:Int32)) as $expr3, Case(((Extract('HOUR':Varchar, Field(bid, 5:Int32)) >= 8:Int32) AND (Extract('HOUR':Varchar, Field(bid, 5:Int32)) <= 18:Int32)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, Field(bid, 5:Int32)) <= 6:Int32) OR (Extract('HOUR':Varchar, Field(bid, 5:Int32)) >= 20:Int32)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr4, Field(bid, 5:Int32) as $expr5, _row_id] }
     └─StreamFilter { predicate: ((0.908:Decimal * Field(bid, 2:Int32)) > 1000000:Int32) AND ((0.908:Decimal * Field(bid, 2:Int32)) < 50000000:Int32) AND (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 4 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(6 rows)

I am not sure if the behavior here is proper. we have source_partitions and parallelism. But for the too-simple query which comprises of only 1 fragment. the parallelism will be the number of source partitions. if a user writes some heavy CPU computing with the project or filter executor. It might be better to shuffle the data into more shards with higher parallelism.

@BugenZhao
Copy link
Member

BugenZhao commented Mar 14, 2023

According to the grafana dashboard and @fuyufjh, we inspected that the CPU is not fully utilized.

Occasionally I encounter segmentation faults or bad performance on Mac only for Nexmark Q14. Considering #6205, I'm extremely suspicious that it's caused by the backtrace capture here: when we try to extract the HOUR, we'll first try extract_date and then it returns an error as HOUR is handled in extract_time.

pub fn extract_from_timestamp(time_unit: &str, timestamp: NaiveDateTimeWrapper) -> Result<Decimal> {
let time = timestamp.0;
let mut res = extract_date(time, time_unit);
if res.is_err() {
res = extract_time(time, time_unit);
}
res
}

@fuyufjh
Copy link
Member Author

fuyufjh commented Mar 15, 2023

The CPU ultilization looks good after #8538. Let's wait for next performance report.

(parallelism = 4)

image

@lmatz
Copy link
Contributor

lmatz commented Mar 15, 2023

Running Q14 right now as the last night's performance test failed
https://buildkite.com/risingwave-test/nexmark-benchmark/builds/579#0186e367-4520-4a99-8f22-e625892844e6

@lmatz
Copy link
Contributor

lmatz commented Mar 15, 2023

SCR-20230316-qn

OK, now it's 893.8K 😆

But as

I guess this might be caused by a similar issue to #5130. We need to evaluate a lot of 'HOUR' constants for calling EXTRACT.

we still keep this open?

@BugenZhao
Copy link
Member

BugenZhao commented Mar 15, 2023

we still keep this open?

LGTM. I guess there won't be significant difference (compared to the "backtrace" one 😄), but it's indeed a possible optimization.

@lmatz lmatz added good first issue Good for newcomers help wanted Issues that need help from contributors labels Mar 15, 2023
@fuyufjh fuyufjh modified the milestones: release-0.18, release-0.19 Mar 21, 2023
@lmatz
Copy link
Contributor

lmatz commented May 12, 2023

Since #9052 tracks the only left undone issue, we close this issue now.

@lmatz lmatz closed this as completed May 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers help wanted Issues that need help from contributors type/perf
Projects
None yet
Development

No branches or pull requests

4 participants