Skip to content

Commit

Permalink
Merge branch 'main' into kwannoel/batch
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 10, 2023
2 parents 1652253 + 03bef46 commit 9636785
Show file tree
Hide file tree
Showing 35 changed files with 1,108 additions and 246 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 27 additions & 37 deletions e2e_test/batch/tpch/q15.slt.part
Original file line number Diff line number Diff line change
@@ -1,44 +1,34 @@
query ITTTR
with revenue0 (supplier_no, total_revenue) as (
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
)
select
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
from
supplier,
(
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
) as revenue0 (supplier_no, total_revenue)
supplier,
revenue0
where
s_suppkey = supplier_no
and total_revenue = (
select
max(total_revenue)
from
(
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
) as revenue0 (supplier_no, total_revenue)
)
s_suppkey = supplier_no
and total_revenue = (
select
max(total_revenue)
from
revenue0
)
order by
s_suppkey;
s_suppkey;
----
4 Supplier#000000004 Bk7ah4CK8SYQTepEmvMkkgMwg 25-843-787-7479 901304.1506
42 changes: 16 additions & 26 deletions e2e_test/streaming/tpch/views/q15.slt.part
Original file line number Diff line number Diff line change
@@ -1,43 +1,33 @@
statement ok
create materialized view tpch_q15 as
select
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
from
supplier,
(
select
with revenue0 (supplier_no, total_revenue) as (
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount)) as total_revenue
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
) as revenue0 (supplier_no, total_revenue)
)
select
s_suppkey,
s_name,
s_address,
s_phone,
total_revenue
from
supplier,
revenue0
where
s_suppkey = supplier_no
and total_revenue = (
select
max(total_revenue) as max_revenue
max(total_revenue)
from
(
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount)) as total_revenue
from
lineitem
where
l_shipdate >= date '1993-01-01'
and l_shipdate < date '1993-01-01' + interval '3' month
group by
l_suppkey
) as revenue0 (supplier_no, total_revenue)
revenue0
)
order by
s_suppkey;
s_suppkey;
16 changes: 15 additions & 1 deletion grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ def section_streaming_actors(outer_panels):
],
),
panels.timeseries_actor_ops(
"Aggregation Executor Cache",
"Aggregation Executor Cache Statistics For Each Key/State",
"",
[
panels.target(
Expand All @@ -1349,6 +1349,20 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_actor_ops(
"Aggregation Executor Cache Statistics For Each StreamChunk",
"",
[
panels.target(
f"rate({metric('stream_agg_chunk_lookup_miss_count')}[$__rate_interval])",
"chunk-level cache miss {{actor_id}}",
),
panels.target(
f"rate({metric('stream_agg_chunk_lookup_total_count')}[$__rate_interval])",
"chunk-level total lookups {{actor_id}}",
),
],
),
panels.timeseries_count(
"Aggregation Cached Keys",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ message SinkNode {

message ProjectNode {
repeated expr.ExprNode select_list = 1;
// this two field is expressing a list of usize pair, which means when project receives a
// watermark with `watermark_input_key[i]` column index, it should derive a new watermark
// with `watermark_output_key[i]`th expression
repeated uint32 watermark_input_key = 2;
repeated uint32 watermark_output_key = 3;
}

message FilterNode {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
Box::new(LiteralExpression::new(outer_type.clone(), datum.clone())),
)?;

cast_expr.eval_row(OwnedRow::empty())?
cast_expr.eval_row(&OwnedRow::empty())?
};

scan_range.eq_conds.push(datum);
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
Box::new(LiteralExpression::new(outer_type.clone(), datum.clone())),
)?;

cast_expr.eval_row(OwnedRow::empty())?
cast_expr.eval_row(&OwnedRow::empty())?
};

scan_range.eq_conds.push(datum);
Expand Down
6 changes: 3 additions & 3 deletions src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ fn get_services(profile: &str) -> (Vec<RisingWaveService>, bool) {
],
"playground-3cn" => vec![
RisingWaveService::Meta(osstrs([])),
RisingWaveService::Compute(osstrs(["--host", "localhost:5687"])),
RisingWaveService::Compute(osstrs(["--host", "localhost:5688"])),
RisingWaveService::Compute(osstrs(["--host", "localhost:5689"])),
RisingWaveService::Compute(osstrs(["--host", "127.0.0.1:5687"])),
RisingWaveService::Compute(osstrs(["--host", "127.0.0.1:5688"])),
RisingWaveService::Compute(osstrs(["--host", "127.0.0.1:5689"])),
RisingWaveService::Frontend(osstrs([])),
],
"online-docker-playground" | "docker-playground" => {
Expand Down
22 changes: 11 additions & 11 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ impl Field {
}
}

impl From<&ColumnDesc> for Field {
fn from(desc: &ColumnDesc) -> Self {
Self {
data_type: desc.data_type.clone(),
name: desc.name.clone(),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}

impl From<ColumnDesc> for Field {
fn from(column_desc: ColumnDesc) -> Self {
Self {
Expand Down Expand Up @@ -208,17 +219,6 @@ impl From<&ProstField> for Field {
}
}

impl From<&ColumnDesc> for Field {
fn from(desc: &ColumnDesc) -> Self {
Self {
data_type: desc.data_type.clone(),
name: desc.name.clone(),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}

impl Index<usize> for Schema {
type Output = Field;

Expand Down
Loading

0 comments on commit 9636785

Please sign in to comment.