Skip to content

Commit

Permalink
Merge branch 'idx0dev/sourceng' of https://github.com/risingwavelabs/…
Browse files Browse the repository at this point in the history
…risingwave into idx0dev/sourceng
  • Loading branch information
algosday committed Jun 15, 2023
2 parents b53a3fc + 9ca0139 commit 1b49c15
Show file tree
Hide file tree
Showing 32 changed files with 954 additions and 250 deletions.
43 changes: 7 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ if [ $# -gt 0 ]; then
ARGS=("$@")
echo "Applying clippy --fix for $@ (including dirty and staged files)"
cargo clippy ${ARGS[@]/#/--package risingwave_} ${RISINGWAVE_FEATURE_FLAGS} --fix --allow-dirty --allow-staged
cargo clippy ${ARGS[@]/#/--package risingwave_} --fix --allow-dirty --allow-staged
else
echo "Applying clippy --fix for all targets to all files (including dirty and staged files)"
echo "Tip: run $(tput setaf 4)./risedev cf {package_names}$(tput sgr0) to only check-fix those packages (e.g. frontend, meta)."
Expand Down
33 changes: 33 additions & 0 deletions e2e_test/batch/aggregate/ordered_set_agg.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement error
select p, percentile_cont(p) within group (order by x::float8)
from generate_series(1,5) x,
(values (0::float8),(0.1),(0.25),(0.4),(0.5),(0.6),(0.75),(0.9),(1)) v(p)
group by p order by p;

statement error
select percentile_cont(array[0,1,0.25,0.75,0.5,1,0.3,0.32,0.35,0.38,0.4]) within group (order by x)
from generate_series(1,6) x;

statement error
select percentile_disc(array[0.25,0.5,0.75]) within group (order by x)
from unnest('{fred,jim,fred,jack,jill,fred,jill,jim,jim,sheila,jim,sheila}'::text[]) u(x);

statement error
select pg_collation_for(percentile_disc(1) within group (order by x collate "POSIX"))
from (values ('fred'),('jim')) v(x);

query RR
select
percentile_cont(0.5) within group (order by a),
percentile_disc(0.5) within group (order by a)
from (values(1::float8),(3),(5),(7)) t(a);
----
4 3

query RR
select
percentile_cont(0.25) within group (order by a),
percentile_disc(0.5) within group (order by a)
from (values(1::float8),(3),(5),(7)) t(a);
----
2.5 3
14 changes: 14 additions & 0 deletions e2e_test/batch/functions/func_in_from.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
query I
select abs.abs from abs(-1);
----
1

query I
select alias.alias from abs(-1) alias;
----
1

query I
select alias.col from abs(-1) alias(col);
----
1
18 changes: 18 additions & 0 deletions e2e_test/streaming/values.slt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,21 @@ drop materialized view mv;

statement ok
drop table t;

statement ok
create materialized view mv as select * from abs(-1);

# TODO: support this
statement error not yet implemented: LogicalTableFunction::logical_rewrite_for_stream
create materialized view mv2 as select * from range(1,2);

statement ok
flush;

query IR
select * from mv;
----
1

statement ok
drop materialized view mv;
2 changes: 1 addition & 1 deletion src/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ hex = "0.4.3"
itertools = "0.10"
md5 = "0.7.0"
num-traits = "0.2"
ouroboros = "0.15"
parse-display = "0.6"
paste = "1"
regex = "1"
Expand All @@ -43,6 +42,7 @@ risingwave_expr_macro = { path = "macro" }
risingwave_pb = { path = "../prost" }
risingwave_udf = { path = "../udf" }
rust_decimal = { version = "1", features = ["db-postgres", "maths"] }
self_cell = "1.0.0"
serde_json = "1"
sha1 = "0.10.5"
sha2 = "0.10.6"
Expand Down
3 changes: 3 additions & 0 deletions src/expr/src/agg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ mod array_agg;
mod count_star;
mod general;
mod jsonb_agg;
mod mode;
mod percentile_cont;
mod percentile_disc;
mod string_agg;

// wrappers
Expand Down
126 changes: 126 additions & 0 deletions src/expr/src/agg/mode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::*;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::types::*;
use risingwave_expr_macro::build_aggregate;

use super::Aggregator;
use crate::agg::AggCall;
use crate::Result;

#[build_aggregate("mode(*) -> *")]
fn build(agg: AggCall) -> Result<Box<dyn Aggregator>> {
Ok(Box::new(Mode::new(agg.return_type)))
}

/// Computes the mode, the most frequent value of the aggregated argument (arbitrarily choosing the
/// first one if there are multiple equally-frequent values). The aggregated argument must be of a
/// sortable type.
///
/// ```slt
/// query I
/// select mode() within group (order by unnest) from unnest(array[1]);
/// ----
/// 1
///
/// query I
/// select mode() within group (order by unnest) from unnest(array[1,2,2,3,3,4,4,4]);
/// ----
/// 4
///
/// query R
/// select mode() within group (order by unnest) from unnest(array[0.1,0.2,0.2,0.4,0.4,0.3,0.3,0.4]);
/// ----
/// 0.4
///
/// query R
/// select mode() within group (order by unnest) from unnest(array[1,2,2,3,3,4,4,4,3]);
/// ----
/// 3
///
/// query T
/// select mode() within group (order by unnest) from unnest(array['1','2','2','3','3','4','4','4','3']);
/// ----
/// 3
///
/// query I
/// select mode() within group (order by unnest) from unnest(array[]::int[]);
/// ----
/// NULL
/// ```
#[derive(Clone, EstimateSize)]
pub struct Mode {
return_type: DataType,
cur_mode: Datum,
cur_mode_freq: usize,
cur_item: Datum,
cur_item_freq: usize,
}

impl Mode {
pub fn new(return_type: DataType) -> Self {
Self {
return_type,
cur_mode: None,
cur_mode_freq: 0,
cur_item: None,
cur_item_freq: 0,
}
}

fn add_datum(&mut self, datum_ref: DatumRef<'_>) {
let datum = datum_ref.to_owned_datum();
if datum.is_some() && self.cur_item == datum {
self.cur_item_freq += 1;
} else if datum.is_some() {
self.cur_item = datum;
self.cur_item_freq = 1;
}
if self.cur_item_freq > self.cur_mode_freq {
self.cur_mode = self.cur_item.clone();
self.cur_mode_freq = self.cur_item_freq;
}
}
}

#[async_trait::async_trait]
impl Aggregator for Mode {
fn return_type(&self) -> DataType {
self.return_type.clone()
}

async fn update_multi(
&mut self,
input: &DataChunk,
start_row_id: usize,
end_row_id: usize,
) -> Result<()> {
let array = input.column_at(0);
for row_id in start_row_id..end_row_id {
self.add_datum(array.value_at(row_id));
}
Ok(())
}

fn output(&mut self, builder: &mut ArrayBuilderImpl) -> Result<()> {
builder.append(self.cur_mode.clone());
Ok(())
}

fn estimated_size(&self) -> usize {
EstimateSize::estimated_size(self)
}
}
Loading

0 comments on commit 1b49c15

Please sign in to comment.