diff --git a/Cargo.lock b/Cargo.lock
index 7183c990bfce9..941d455510758 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4772,9 +4772,9 @@ dependencies = [
[[package]]
name = "foyer"
-version = "0.10.1"
+version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b5f358e9b9492a9e9af905934ef4736a97a001fa19232f4a29f42974cf7a24c"
+checksum = "ca1a3a9bf97a9d592d69c2210c5524668788f72ef1b7ea3054cfc1f8a3fb1257"
dependencies = [
"ahash 0.8.11",
"anyhow",
@@ -4845,9 +4845,9 @@ dependencies = [
[[package]]
name = "foyer-storage"
-version = "0.9.1"
+version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "79c20580e456ea337e4bdefa9fba7ff19b42cd87432bda3ef3579ef9e1057c7e"
+checksum = "19f533cf09e39963cc3886f9f165d235c7d4f937f4b09b48c09827ac158876f4"
dependencies = [
"ahash 0.8.11",
"allocator-api2",
diff --git a/Cargo.toml b/Cargo.toml
index 5bfab4feb27fb..9332a55f90415 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -77,7 +77,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"
[workspace.dependencies]
-foyer = { version = "0.10.1", features = ["nightly", "mtrace"] }
+foyer = { version = "0.10.4", features = ["nightly", "mtrace"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
diff --git a/README.md b/README.md
index bf50ae208a972..07d12e99223ef 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,7 @@
RisingWave is a Postgres-compatible SQL engine engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data.
-![RisingWave](https://github.com/risingwavelabs/risingwave/assets/41638002/10c44404-f78b-43ce-bbd9-3646690acc59)
+![RisingWave](./docs/dev/src/images/architecture_20240814.png)
## When to use RisingWave?
RisingWave can ingest millions of events per second, continuously join live data streams with historical tables, and serve ad-hoc queries in real-time. Typical use cases include, but are not limited to:
diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh
index c0f8e9f387d61..29f6c8fe2133d 100755
--- a/ci/scripts/run-e2e-test.sh
+++ b/ci/scripts/run-e2e-test.sh
@@ -90,9 +90,7 @@ echo "--- e2e, $mode, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cluster_start
sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}" --label "can-use-recover"
-if [[ "$mode" != "single-node" ]]; then
- sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"
-fi
+sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"
if [[ $mode != "single-node" ]]; then
sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}"
diff --git a/docs/dev/src/images/architecture_20240814.png b/docs/dev/src/images/architecture_20240814.png
new file mode 100644
index 0000000000000..9d90e7bd86555
Binary files /dev/null and b/docs/dev/src/images/architecture_20240814.png differ
diff --git a/e2e_test/sink/license.slt b/e2e_test/sink/license.slt
new file mode 100644
index 0000000000000..852d7c0fe7bfc
--- /dev/null
+++ b/e2e_test/sink/license.slt
@@ -0,0 +1,195 @@
+statement ok
+SET RW_IMPLICIT_FLUSH TO true;
+
+statement ok
+ALTER SYSTEM SET license_key TO '';
+
+statement ok
+CREATE TABLE t (k INT);
+
+statement error
+CREATE SINK dynamodb_sink
+FROM
+ t
+WITH
+(
+ connector = 'dynamodb',
+ table = 'xx',
+ primary_key = 'k',
+ region = 'xx',
+ access_key = 'xx',
+ secret_key = 'xx'
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: gRPC request to meta service failed: Internal error
+ 2: failed to validate sink
+ 3: Internal error
+ 4: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free
+
+Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.
+
+
+statement error
+CREATE SINK snowflake_sink
+FROM t
+WITH (
+ connector = 'snowflake',
+ type = 'append-only',
+ force_append_only = 'true',
+ s3.bucket_name = 'xx',
+ s3.credentials.access = 'xx',
+ s3.credentials.secret = 'xx',
+ s3.region_name = 'xx',
+ s3.path = 'xx',
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: gRPC request to meta service failed: Internal error
+ 2: failed to validate sink
+ 3: Internal error
+ 4: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free
+
+Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.
+
+
+statement error
+CREATE SINK opensearch_sink
+FROM t
+WITH (
+ connector = 'opensearch',
+ url = 'xx',
+ username = 'xx',
+ password = 'xx',
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: gRPC request to meta service failed: Internal error
+ 2: failed to validate sink
+ 3: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free
+
+Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.
+
+
+statement error
+CREATE SINK bigquery_sink
+FROM
+ t
+WITH
+(
+ connector = 'bigquery',
+ type = 'append-only',
+ force_append_only='true',
+ bigquery.local.path= 'xx',
+ bigquery.project= 'xx',
+ bigquery.dataset= 'xx',
+ bigquery.table= 'xx'
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: gRPC request to meta service failed: Internal error
+ 2: failed to validate sink
+ 3: Internal error
+ 4: feature BigQuerySink is only available for tier Paid and above, while the current tier is Free
+
+Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.
+
+
+statement ok
+ALTER SYSTEM SET license_key TO DEFAULT;
+
+statement ok
+flush;
+
+statement error
+CREATE SINK dynamodb_sink
+FROM
+ t
+WITH
+(
+ connector = 'dynamodb',
+ table = 'xx',
+ primary_key = 'xx',
+ region = 'xx',
+ access_key = 'xx',
+ secret_key = 'xx'
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: Sink error
+ 2: Sink primary key column not found: xx. Please use ',' as the delimiter for different primary key columns.
+
+
+statement ok
+CREATE SINK snowflake_sink
+FROM t
+WITH (
+ connector = 'snowflake',
+ type = 'append-only',
+ force_append_only = 'true',
+ s3.bucket_name = 'xx',
+ s3.credentials.access = 'xx',
+ s3.credentials.secret = 'xx',
+ s3.region_name = 'xx',
+ s3.path = 'xx',
+);
+
+
+statement error
+CREATE SINK opensearch_sink
+FROM t
+WITH (
+ connector = 'opensearch',
+ url = 'xx',
+ username = 'xx',
+ password = 'xx',
+ index = 'xx',
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: gRPC request to meta service failed: Internal error
+ 2: failed to validate sink
+ 3: sink cannot pass validation: INTERNAL: Connection is closed
+
+
+statement error
+CREATE SINK bigquery_sink
+FROM
+ t
+WITH
+(
+ connector = 'bigquery',
+ type = 'append-only',
+ force_append_only='true',
+ bigquery.local.path= 'xx',
+ bigquery.project= 'xx',
+ bigquery.dataset= 'xx',
+ bigquery.table= 'xx'
+);
+----
+db error: ERROR: Failed to run the query
+
+Caused by these errors (recent errors listed first):
+ 1: gRPC request to meta service failed: Internal error
+ 2: failed to validate sink
+ 3: BigQuery error
+ 4: No such file or directory (os error 2)
+
+
+statement ok
+DROP SINK snowflake_sink;
+
+statement ok
+DROP TABLE t;
\ No newline at end of file
diff --git a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt
new file mode 100644
index 0000000000000..012b1ffffb762
--- /dev/null
+++ b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt
@@ -0,0 +1,80 @@
+# Single phase approx percentile
+statement ok
+create table t(p_col double, grp_col int);
+
+statement ok
+insert into t select a, 1 from generate_series(-1000, 1000) t(a);
+
+statement ok
+flush;
+
+query I
+select
+ percentile_cont(0.01) within group (order by p_col) as p01,
+ min(p_col),
+ percentile_cont(0.5) within group (order by p_col) as p50,
+ count(*),
+ percentile_cont(0.99) within group (order by p_col) as p99
+from t;
+----
+-980 -1000 0 2001 980
+
+statement ok
+create materialized view m1 as
+ select
+ approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
+ min(p_col),
+ approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
+ count(*),
+ approx_percentile(0.99, 0.01) within group (order by p_col) as p99
+ from t;
+
+query I
+select * from m1;
+----
+-982.5779489474152 -1000 0 2001 982.5779489474152
+
+# Test state encode / decode
+onlyif can-use-recover
+statement ok
+recover;
+
+onlyif can-use-recover
+sleep 10s
+
+query I
+select * from m1;
+----
+-982.5779489474152 -1000 0 2001 982.5779489474152
+
+# Test 0,
+ /// Recover mode.
+ ///
+ /// Options:
+ ///
+ /// - "None": Do not recover disk cache.
+ /// - "Quiet": Recover disk cache and skip errors.
+ /// - "Strict": Recover disk cache and panic on errors.
+ ///
+ /// More details, see [`RecoverMode::None`], [`RecoverMode::Quiet`] and [`RecoverMode::Strict`],
+ #[serde(default = "default::file_cache::recover_mode")]
+ pub recover_mode: RecoverMode,
+
#[serde(default, flatten)]
#[config_doc(omitted)]
pub unrecognized: Unrecognized,
@@ -1673,6 +1685,8 @@ pub mod default {
}
pub mod file_cache {
+ use foyer::RecoverMode;
+
pub fn dir() -> String {
"".to_string()
}
@@ -1712,6 +1726,10 @@ pub mod default {
pub fn flush_buffer_threshold_mb() -> Option {
None
}
+
+ pub fn recover_mode() -> RecoverMode {
+ RecoverMode::None
+ }
}
pub mod cache_refill {
@@ -1842,11 +1860,11 @@ pub mod default {
}
pub fn memory_controller_threshold_graceful() -> f64 {
- 0.8
+ 0.81
}
pub fn memory_controller_threshold_stable() -> f64 {
- 0.7
+ 0.72
}
pub fn memory_controller_eviction_factor_aggressive() -> f64 {
diff --git a/src/config/example.toml b/src/config/example.toml
index 866a56017982e..73b49440e7ec0 100644
--- a/src/config/example.toml
+++ b/src/config/example.toml
@@ -113,8 +113,8 @@ stream_exchange_concurrent_dispatchers = 0
stream_dml_channel_initial_permits = 32768
stream_hash_agg_max_dirty_groups_heap_size = 67108864
stream_memory_controller_threshold_aggressive = 0.9
-stream_memory_controller_threshold_graceful = 0.8
-stream_memory_controller_threshold_stable = 0.7
+stream_memory_controller_threshold_graceful = 0.81
+stream_memory_controller_threshold_stable = 0.72
stream_memory_controller_eviction_factor_aggressive = 2.0
stream_memory_controller_eviction_factor_graceful = 1.5
stream_memory_controller_eviction_factor_stable = 1.0
@@ -171,6 +171,7 @@ recover_concurrency = 8
insert_rate_limit_mb = 0
indexer_shards = 64
compression = "none"
+recover_mode = "None"
[storage.meta_file_cache]
dir = ""
@@ -182,6 +183,7 @@ recover_concurrency = 8
insert_rate_limit_mb = 0
indexer_shards = 64
compression = "none"
+recover_mode = "None"
[storage.cache_refill]
data_refill_levels = []
diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs
index ebd18fed2063b..22146e86d0d1d 100644
--- a/src/connector/src/sink/big_query.rs
+++ b/src/connector/src/sink/big_query.rs
@@ -358,6 +358,9 @@ impl Sink for BigQuerySink {
}
async fn validate(&self) -> Result<()> {
+ risingwave_common::license::Feature::BigQuerySink
+ .check_available()
+ .map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"Primary key not defined for upsert bigquery sink (please define in `primary_key` field)")));
diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs
index 2df15f517ca0b..6d73bf2d478c8 100644
--- a/src/connector/src/sink/dynamodb.rs
+++ b/src/connector/src/sink/dynamodb.rs
@@ -88,6 +88,9 @@ impl Sink for DynamoDbSink {
const SINK_NAME: &'static str = DYNAMO_DB_SINK;
async fn validate(&self) -> Result<()> {
+ risingwave_common::license::Feature::DynamoDbSink
+ .check_available()
+ .map_err(|e| anyhow::anyhow!(e))?;
let client = (self.config.build_client().await)
.context("validate DynamoDB sink error")
.map_err(SinkError::DynamoDb)?;
diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs
index 31dde5c52509e..5e45c2b8c74aa 100644
--- a/src/connector/src/sink/elasticsearch.rs
+++ b/src/connector/src/sink/elasticsearch.rs
@@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;
use super::encoder::{JsonEncoder, RowEncoder};
-use super::remote::{ElasticSearchSink, OpensearchSink};
+use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
@@ -172,5 +172,5 @@ impl EsStreamChunkConverter {
}
pub fn is_es_sink(sink_name: &str) -> bool {
- sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
+ sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME
}
diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs
index cad35a278edb0..3391520ed0c23 100644
--- a/src/connector/src/sink/mod.rs
+++ b/src/connector/src/sink/mod.rs
@@ -94,7 +94,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
- { Opensearch, $crate::sink::remote::OpensearchSink },
+ { Opensearch, $crate::sink::remote::OpenSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index eb62b7fbc8cd8..606965a8424d7 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -73,7 +73,7 @@ macro_rules! def_remote_sink {
() => {
def_remote_sink! {
{ ElasticSearch, ElasticSearchSink, "elasticsearch" }
- { Opensearch, OpensearchSink, "opensearch"}
+ { Opensearch, OpenSearchSink, "opensearch"}
{ Cassandra, CassandraSink, "cassandra" }
{ Jdbc, JdbcSink, "jdbc", |desc| {
desc.sink_type.is_append_only()
@@ -165,6 +165,11 @@ impl Sink for RemoteSink {
}
async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
+ if sink_name == OpenSearchSink::SINK_NAME {
+ risingwave_common::license::Feature::OpenSearchSink
+ .check_available()
+ .map_err(|e| anyhow::anyhow!(e))?;
+ }
if is_es_sink(sink_name)
&& param.downstream_pk.len() > 1
&& !param.properties.contains_key(ES_OPTION_DELIMITER)
diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs
index 6c3cc291f58e2..d87072a2502e9 100644
--- a/src/connector/src/sink/snowflake.rs
+++ b/src/connector/src/sink/snowflake.rs
@@ -118,6 +118,9 @@ impl Sink for SnowflakeSink {
}
async fn validate(&self) -> Result<()> {
+ risingwave_common::license::Feature::SnowflakeSink
+ .check_available()
+ .map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only {
return Err(SinkError::Config(
anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`")
diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml
index 25f62054f1e66..f00c9f2b4065a 100644
--- a/src/frontend/planner_test/tests/testdata/input/agg.yaml
+++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml
@@ -1053,6 +1053,13 @@
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.2, 0.01) WITHIN GROUP (order by v1 desc) from t;
+ expected_outputs:
+ - logical_plan
+ - stream_plan
+- name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs
+ sql: |
+ CREATE TABLE t (v1 int, v2 int);
+ SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
\ No newline at end of file
diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml
index b6c692fd47364..f6d1af67b331e 100644
--- a/src/frontend/planner_test/tests/testdata/output/agg.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml
@@ -2040,3 +2040,33 @@
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+- name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs
+ sql: |
+ CREATE TABLE t (v1 int, v2 int);
+ SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
+ logical_plan: |-
+ LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] }
+ └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] }
+ └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
+ └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
+ stream_plan: |-
+ StreamMaterialize { columns: [s1, x, count, m2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
+ └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] }
+ ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
+ │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+ │ │ └─StreamExchange { dist: Single }
+ │ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+ │ │ └─StreamShare { id: 2 }
+ │ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+ │ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+ │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+ │ └─StreamExchange { dist: Single }
+ │ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
+ │ └─StreamShare { id: 2 }
+ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] }
+ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
+ └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), max(max(t.v2)), count] }
+ └─StreamExchange { dist: Single }
+ └─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] }
+ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] }
+ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs
index 17738f4dbd5be..898a98c57dea6 100644
--- a/src/frontend/src/binder/expr/function/mod.rs
+++ b/src/frontend/src/binder/expr/function/mod.rs
@@ -62,6 +62,10 @@ const SQL_UDF_MAX_CALLING_DEPTH: u32 = 16;
impl Binder {
pub(in crate::binder) fn bind_function(&mut self, f: Function) -> Result {
+ if f.arg_list.ignore_nulls {
+ bail_not_implemented!("IGNORE NULLS is not supported yet");
+ }
+
let function_name = match f.name.0.as_slice() {
[name] => name.real_value(),
[schema, name] => {
diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs
index 1b745b9a67c5f..8e96a5d177a12 100644
--- a/src/frontend/src/binder/mod.rs
+++ b/src/frontend/src/binder/mod.rs
@@ -814,6 +814,7 @@ mod tests {
],
variadic: false,
order_by: [],
+ ignore_nulls: false,
},
over: None,
filter: None,
diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs
index 987a0ae204869..e63b7d760a68f 100644
--- a/src/frontend/src/optimizer/plan_node/logical_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs
@@ -76,38 +76,17 @@ impl LogicalAgg {
let mut core = self.core.clone();
// ====== Handle approx percentile aggs
- let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg();
-
- let AggInfo {
- calls: non_approx_percentile_agg_calls,
- col_mapping: non_approx_percentile_col_mapping,
- } = normal;
- let AggInfo {
- calls: approx_percentile_agg_calls,
- col_mapping: approx_percentile_col_mapping,
- } = approx;
-
- let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
- && !approx_percentile_agg_calls.is_empty())
- || approx_percentile_agg_calls.len() >= 2;
- core.input = if needs_row_merge {
- // If there's row merge, we need to share the input.
- StreamShare::new_from_input(stream_input.clone()).into()
- } else {
- stream_input
- };
- core.agg_calls = non_approx_percentile_agg_calls;
+ let (non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) =
+ self.prepare_approx_percentile(&mut core, stream_input.clone())?;
- let approx_percentile =
- self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?;
-
- // ====== Handle normal aggs
if core.agg_calls.is_empty() {
if let Some(approx_percentile) = approx_percentile {
return Ok(approx_percentile);
};
bail!("expected at least one agg call");
}
+
+ // ====== Handle normal aggs
let total_agg_calls = core
.agg_calls
.iter()
@@ -123,21 +102,12 @@ impl LogicalAgg {
new_stream_simple_agg(Agg::new(total_agg_calls, IndexSet::empty(), exchange));
// ====== Merge approx percentile and normal aggs
- if let Some(approx_percentile) = approx_percentile {
- if needs_row_merge {
- let row_merge = StreamRowMerge::new(
- approx_percentile,
- global_agg.into(),
- approx_percentile_col_mapping,
- non_approx_percentile_col_mapping,
- )?;
- Ok(row_merge.into())
- } else {
- Ok(approx_percentile)
- }
- } else {
- Ok(global_agg.into())
- }
+ Self::add_row_merge_if_needed(
+ approx_percentile,
+ global_agg.into(),
+ approx_percentile_col_mapping,
+ non_approx_percentile_col_mapping,
+ )
}
/// Generate plan for stateless/stateful 2-phase streaming agg.
@@ -148,10 +118,21 @@ impl LogicalAgg {
stream_input: PlanRef,
dist_key: &[usize],
) -> Result {
- let input_col_num = stream_input.schema().len();
+ let mut core = self.core.clone();
+
+ let (non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) =
+ self.prepare_approx_percentile(&mut core, stream_input.clone())?;
+
+ if core.agg_calls.is_empty() {
+ if let Some(approx_percentile) = approx_percentile {
+ return Ok(approx_percentile);
+ };
+ bail!("expected at least one agg call");
+ }
// Generate vnode via project
// TODO(kwannoel): We should apply Project optimization rules here.
+ let input_col_num = stream_input.schema().len(); // get schema len before moving `stream_input`.
let project = StreamProject::new(generic::Project::with_vnode_col(stream_input, dist_key));
let vnode_col_idx = project.base.schema().len() - 1;
@@ -160,7 +141,7 @@ impl LogicalAgg {
local_group_key.insert(vnode_col_idx);
let n_local_group_key = local_group_key.len();
let local_agg = new_stream_hash_agg(
- Agg::new(self.agg_calls().to_vec(), local_group_key, project.into()),
+ Agg::new(core.agg_calls.to_vec(), local_group_key, project.into()),
Some(vnode_col_idx),
);
// Global group key excludes vnode.
@@ -173,11 +154,11 @@ impl LogicalAgg {
.expect("some input group key could not be mapped");
// Generate global agg step
- if self.group_key().is_empty() {
+ let global_agg = if self.group_key().is_empty() {
let exchange =
RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?;
let global_agg = new_stream_simple_agg(Agg::new(
- self.agg_calls()
+ core.agg_calls
.iter()
.enumerate()
.map(|(partial_output_idx, agg_call)| {
@@ -187,7 +168,7 @@ impl LogicalAgg {
global_group_key.into_iter().collect(),
exchange,
));
- Ok(global_agg.into())
+ global_agg.into()
} else {
let exchange = RequiredDist::shard_by_key(input_col_num, &global_group_key)
.enforce_if_not_satisfies(local_agg.into(), &Order::any())?;
@@ -195,7 +176,7 @@ impl LogicalAgg {
// we can just follow it.
let global_agg = new_stream_hash_agg(
Agg::new(
- self.agg_calls()
+ core.agg_calls
.iter()
.enumerate()
.map(|(partial_output_idx, agg_call)| {
@@ -208,8 +189,14 @@ impl LogicalAgg {
),
None,
);
- Ok(global_agg.into())
- }
+ global_agg.into()
+ };
+ Self::add_row_merge_if_needed(
+ approx_percentile,
+ global_agg,
+ approx_percentile_col_mapping,
+ non_approx_percentile_col_mapping,
+ )
}
fn gen_single_plan(&self, stream_input: PlanRef) -> Result {
@@ -304,6 +291,71 @@ impl LogicalAgg {
}
}
+ /// Prepares metadata and the `approx_percentile` plan, if there's one present.
+ /// It may modify `core.agg_calls` to separate normal agg and approx percentile agg,
+ /// and `core.input` to share the input via `StreamShare`,
+ /// to both approx percentile agg and normal agg.
+ fn prepare_approx_percentile(
+ &self,
+ core: &mut Agg,
+ stream_input: PlanRef,
+ ) -> Result<(ColIndexMapping, ColIndexMapping, Option)> {
+ let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg();
+
+ let AggInfo {
+ calls: non_approx_percentile_agg_calls,
+ col_mapping: non_approx_percentile_col_mapping,
+ } = normal;
+ let AggInfo {
+ calls: approx_percentile_agg_calls,
+ col_mapping: approx_percentile_col_mapping,
+ } = approx;
+ if !self.group_key().is_empty() && !approx_percentile_agg_calls.is_empty() {
+ bail_not_implemented!("two-phase approx percentile agg with group key, please use single phase agg for approx_percentile with group key");
+ }
+
+ // Either we have approx percentile aggs and non_approx percentile aggs,
+ // or we have at least 2 approx percentile aggs.
+ let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
+ && !approx_percentile_agg_calls.is_empty())
+ || approx_percentile_agg_calls.len() >= 2;
+ core.input = if needs_row_merge {
+ // If there's row merge, we need to share the input.
+ StreamShare::new_from_input(stream_input.clone()).into()
+ } else {
+ stream_input
+ };
+ core.agg_calls = non_approx_percentile_agg_calls;
+
+ let approx_percentile =
+ self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?;
+ Ok((
+ non_approx_percentile_col_mapping,
+ approx_percentile_col_mapping,
+ approx_percentile,
+ ))
+ }
+
+ /// Add `RowMerge` if needed
+ fn add_row_merge_if_needed(
+ approx_percentile: Option,
+ global_agg: PlanRef,
+ approx_percentile_col_mapping: ColIndexMapping,
+ non_approx_percentile_col_mapping: ColIndexMapping,
+ ) -> Result {
+ if let Some(approx_percentile) = approx_percentile {
+ let row_merge = StreamRowMerge::new(
+ approx_percentile,
+ global_agg,
+ approx_percentile_col_mapping,
+ non_approx_percentile_col_mapping,
+ )?;
+ Ok(row_merge.into())
+ } else {
+ Ok(global_agg)
+ }
+ }
+
fn separate_normal_and_special_agg(&self) -> SeparatedAggInfo {
let estimated_len = self.agg_calls().len() - 1;
let mut approx_percentile_agg_calls = Vec::with_capacity(estimated_len);
diff --git a/src/license/src/feature.rs b/src/license/src/feature.rs
index 00c6d200aa0a3..e434e3709ac26 100644
--- a/src/license/src/feature.rs
+++ b/src/license/src/feature.rs
@@ -46,6 +46,10 @@ macro_rules! for_all_features {
{ TestPaid, Paid, "A dummy feature that's only available on paid tier for testing purposes." },
{ TimeTravel, Paid, "Query historical data within the retention period."},
{ GlueSchemaRegistry, Paid, "Use Schema Registry from AWS Glue rather than Confluent." },
+ { SnowflakeSink, Paid, "Delivering data to SnowFlake." },
+ { DynamoDbSink, Paid, "Delivering data to DynamoDb." },
+ { OpenSearchSink, Paid, "Delivering data to OpenSearch." },
+ { BigQuerySink, Paid, "Delivering data to BigQuery." },
{ ClickHouseSharedEngine,Paid, "Delivering data to Shared tree on clickhouse cloud"},
{ SecretManagement, Paid, "Secret management." },
{ CdcTableSchemaMap, Paid, "Automatically map upstream schema to CDC Table."},
diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs
index 23c620e99eb72..de105ff5979a2 100644
--- a/src/meta/src/barrier/mod.rs
+++ b/src/meta/src/barrier/mod.rs
@@ -1236,7 +1236,7 @@ impl GlobalBarrierManagerContext {
MetadataManager::V2(mgr) => {
let mviews = mgr
.catalog_controller
- .list_background_creating_mviews()
+ .list_background_creating_mviews(true)
.await
.unwrap();
for mview in mviews {
diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs
index bb45d5ba7f4ce..ccb8656412d3b 100644
--- a/src/meta/src/barrier/recovery.rs
+++ b/src/meta/src/barrier/recovery.rs
@@ -167,7 +167,7 @@ impl GlobalBarrierManagerContext {
let mgr = self.metadata_manager.as_v2_ref();
let mviews = mgr
.catalog_controller
- .list_background_creating_mviews()
+ .list_background_creating_mviews(false)
.await?;
let mut mview_map = HashMap::new();
diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs
index 6303262944b0e..387c5ea6f3c95 100644
--- a/src/meta/src/controller/catalog.rs
+++ b/src/meta/src/controller/catalog.rs
@@ -543,8 +543,16 @@ impl CatalogController {
Ok(())
}
- pub async fn list_background_creating_mviews(&self) -> MetaResult> {
+ pub async fn list_background_creating_mviews(
+ &self,
+ include_initial: bool,
+ ) -> MetaResult> {
let inner = self.inner.read().await;
+ let status_cond = if include_initial {
+ streaming_job::Column::JobStatus.is_in([JobStatus::Initial, JobStatus::Creating])
+ } else {
+ streaming_job::Column::JobStatus.eq(JobStatus::Creating)
+ };
let tables = Table::find()
.join(JoinType::LeftJoin, table::Relation::Object1.def())
.join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
@@ -554,7 +562,7 @@ impl CatalogController {
.and(
streaming_job::Column::CreateType
.eq(CreateType::Background)
- .and(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
+ .and(status_cond),
),
)
.all(&inner.db)
diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs
index 413bbe1d247b4..bcf89b5eedb1c 100644
--- a/src/meta/src/manager/metadata.rs
+++ b/src/meta/src/manager/metadata.rs
@@ -368,7 +368,7 @@ impl MetadataManager {
MetadataManager::V2(mgr) => {
let tables = mgr
.catalog_controller
- .list_background_creating_mviews()
+ .list_background_creating_mviews(false)
.await?;
Ok(tables
diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs
index c5a750ee7230a..ea14e95ba9530 100644
--- a/src/meta/src/rpc/ddl_controller.rs
+++ b/src/meta/src/rpc/ddl_controller.rs
@@ -2312,7 +2312,7 @@ impl DdlController {
MetadataManager::V2(mgr) => {
if mgr
.catalog_controller
- .list_background_creating_mviews()
+ .list_background_creating_mviews(true)
.await?
.is_empty()
{
diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs
index d5a75688a4c7b..b0cd783a637c7 100644
--- a/src/sqlparser/src/ast/mod.rs
+++ b/src/sqlparser/src/ast/mod.rs
@@ -2489,6 +2489,8 @@ pub struct FunctionArgList {
pub variadic: bool,
/// Aggregate function calls may have an `ORDER BY`, e.g. `array_agg(x ORDER BY y)`.
pub order_by: Vec,
+ /// Window function calls may have an `IGNORE NULLS`, e.g. `first_value(x IGNORE NULLS)`.
+ pub ignore_nulls: bool,
}
impl fmt::Display for FunctionArgList {
@@ -2508,6 +2510,9 @@ impl fmt::Display for FunctionArgList {
if !self.order_by.is_empty() {
write!(f, " ORDER BY {}", display_comma_separated(&self.order_by))?;
}
+ if self.ignore_nulls {
+ write!(f, " IGNORE NULLS")?;
+ }
write!(f, ")")?;
Ok(())
}
@@ -2520,6 +2525,7 @@ impl FunctionArgList {
args: vec![],
variadic: false,
order_by: vec![],
+ ignore_nulls: false,
}
}
@@ -2529,6 +2535,7 @@ impl FunctionArgList {
args,
variadic: false,
order_by: vec![],
+ ignore_nulls: false,
}
}
@@ -2538,6 +2545,7 @@ impl FunctionArgList {
args,
variadic: false,
order_by,
+ ignore_nulls: false,
}
}
}
diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs
index 3a0e1508f287e..b0473f280bf25 100644
--- a/src/sqlparser/src/parser.rs
+++ b/src/sqlparser/src/parser.rs
@@ -4670,6 +4670,9 @@ impl Parser<'_> {
if !arg_list.order_by.is_empty() {
parser_err!("ORDER BY is not supported in table-valued function calls");
}
+ if arg_list.ignore_nulls {
+ parser_err!("IGNORE NULLS is not supported in table-valued function calls");
+ }
let args = arg_list.args;
let with_ordinality = self.parse_keywords(&[Keyword::WITH, Keyword::ORDINALITY]);
@@ -4980,11 +4983,14 @@ impl Parser<'_> {
vec![]
};
+ let ignore_nulls = self.parse_keywords(&[Keyword::IGNORE, Keyword::NULLS]);
+
let arg_list = FunctionArgList {
distinct,
args,
variadic,
order_by,
+ ignore_nulls,
};
self.expect_token(&Token::RParen)?;
diff --git a/src/sqlparser/tests/testdata/lambda.yaml b/src/sqlparser/tests/testdata/lambda.yaml
index 04d94baf5060c..eceb53af280bd 100644
--- a/src/sqlparser/tests/testdata/lambda.yaml
+++ b/src/sqlparser/tests/testdata/lambda.yaml
@@ -1,10 +1,10 @@
# This file is automatically generated by `src/sqlparser/tests/parser_test.rs`.
- input: select array_transform(array[1,2,3], |x| x * 2)
formatted_sql: SELECT array_transform(ARRAY[1, 2, 3], |x| x * 2)
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "array_transform", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("3"))], named: true }))), Unnamed(Expr(LambdaFunction { args: [Ident { value: "x", quote_style: None }], body: BinaryOp { left: Identifier(Ident { value: "x", quote_style: None }), op: Multiply, right: Value(Number("2")) } }))], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "array_transform", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("3"))], named: true }))), Unnamed(Expr(LambdaFunction { args: [Ident { value: "x", quote_style: None }], body: BinaryOp { left: Identifier(Ident { value: "x", quote_style: None }), op: Multiply, right: Value(Number("2")) } }))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select array_transform(array[], |s| case when s ilike 'apple%' then 'apple' when s ilike 'google%' then 'google' else 'unknown' end)
formatted_sql: SELECT array_transform(ARRAY[], |s| CASE WHEN s ILIKE 'apple%' THEN 'apple' WHEN s ILIKE 'google%' THEN 'google' ELSE 'unknown' END)
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "array_transform", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Array(Array { elem: [], named: true }))), Unnamed(Expr(LambdaFunction { args: [Ident { value: "s", quote_style: None }], body: Case { operand: None, conditions: [ILike { negated: false, expr: Identifier(Ident { value: "s", quote_style: None }), pattern: Value(SingleQuotedString("apple%")), escape_char: None }, ILike { negated: false, expr: Identifier(Ident { value: "s", quote_style: None }), pattern: Value(SingleQuotedString("google%")), escape_char: None }], results: [Value(SingleQuotedString("apple")), Value(SingleQuotedString("google"))], else_result: Some(Value(SingleQuotedString("unknown"))) } }))], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "array_transform", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Array(Array { elem: [], named: true }))), Unnamed(Expr(LambdaFunction { args: [Ident { value: "s", quote_style: None }], body: Case { operand: None, conditions: [ILike { negated: false, expr: Identifier(Ident { value: "s", quote_style: None }), pattern: Value(SingleQuotedString("apple%")), escape_char: None }, ILike { negated: false, expr: Identifier(Ident { value: "s", quote_style: None }), pattern: Value(SingleQuotedString("google%")), escape_char: None }], results: [Value(SingleQuotedString("apple")), Value(SingleQuotedString("google"))], else_result: Some(Value(SingleQuotedString("unknown"))) } }))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select array_transform(array[], |x, y| x + y * 2)
formatted_sql: SELECT array_transform(ARRAY[], |x, y| x + y * 2)
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "array_transform", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Array(Array { elem: [], named: true }))), Unnamed(Expr(LambdaFunction { args: [Ident { value: "x", quote_style: None }, Ident { value: "y", quote_style: None }], body: BinaryOp { left: Identifier(Ident { value: "x", quote_style: None }), op: Plus, right: BinaryOp { left: Identifier(Ident { value: "y", quote_style: None }), op: Multiply, right: Value(Number("2")) } } }))], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "array_transform", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Array(Array { elem: [], named: true }))), Unnamed(Expr(LambdaFunction { args: [Ident { value: "x", quote_style: None }, Ident { value: "y", quote_style: None }], body: BinaryOp { left: Identifier(Ident { value: "x", quote_style: None }), op: Plus, right: BinaryOp { left: Identifier(Ident { value: "y", quote_style: None }), op: Multiply, right: Value(Number("2")) } } }))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
diff --git a/src/sqlparser/tests/testdata/qualified_operator.yaml b/src/sqlparser/tests/testdata/qualified_operator.yaml
index 6d856d181250c..ddab7deeb214f 100644
--- a/src/sqlparser/tests/testdata/qualified_operator.yaml
+++ b/src/sqlparser/tests/testdata/qualified_operator.yaml
@@ -19,10 +19,10 @@
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "operator", quote_style: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select "operator"(foo.bar);
formatted_sql: SELECT "operator"(foo.bar)
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "bar", quote_style: None }])))], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "bar", quote_style: None }])))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select operator operator(+) operator(+) "operator"(9) operator from operator;
formatted_sql: SELECT operator OPERATOR(+) OPERATOR(+) "operator"(9) AS operator FROM operator
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [ExprWithAlias { expr: BinaryOp { left: Identifier(Ident { value: "operator", quote_style: None }), op: PGQualified(QualifiedOperator { schema: None, name: "+" }), right: UnaryOp { op: PGQualified(QualifiedOperator { schema: None, name: "+" }), expr: Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Value(Number("9"))))], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }) } }, alias: Ident { value: "operator", quote_style: None } }], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "operator", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [ExprWithAlias { expr: BinaryOp { left: Identifier(Ident { value: "operator", quote_style: None }), op: PGQualified(QualifiedOperator { schema: None, name: "+" }), right: UnaryOp { op: PGQualified(QualifiedOperator { schema: None, name: "+" }), expr: Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Value(Number("9"))))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }) } }, alias: Ident { value: "operator", quote_style: None } }], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "operator", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select 3 operator(-) 2 - 1;
formatted_sql: SELECT 3 OPERATOR(-) 2 - 1
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(BinaryOp { left: Value(Number("3")), op: PGQualified(QualifiedOperator { schema: None, name: "-" }), right: BinaryOp { left: Value(Number("2")), op: Minus, right: Value(Number("1")) } })], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml
index e333ba3caf8ec..83c624c64309b 100644
--- a/src/sqlparser/tests/testdata/select.yaml
+++ b/src/sqlparser/tests/testdata/select.yaml
@@ -1,7 +1,7 @@
# This file is automatically generated by `src/sqlparser/tests/parser_test.rs`.
- input: SELECT sqrt(id) FROM foo
formatted_sql: SELECT sqrt(id) FROM foo
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "sqrt", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Identifier(Ident { value: "id", quote_style: None })))], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "sqrt", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Identifier(Ident { value: "id", quote_style: None })))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: SELECT INT '1'
formatted_sql: SELECT INT '1'
- input: SELECT (foo).v1.v2 FROM foo
@@ -99,7 +99,7 @@
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(AtTimeZone { timestamp: TypedString { data_type: Timestamp(true), value: "2022-10-01 12:00:00Z" }, time_zone: Identifier(Ident { value: "zone", quote_style: None }) })], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: SELECT now() + INTERVAL '14 days' AT TIME ZONE 'UTC'; -- https://www.postgresql.org/message-id/CADT4RqBPdbsZW7HS1jJP319TMRHs1hzUiP=iRJYR6UqgHCrgNQ@mail.gmail.com
formatted_sql: SELECT now() + INTERVAL '14 days' AT TIME ZONE 'UTC'
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(BinaryOp { left: Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "now", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [], variadic: false, order_by: [] }, over: None, filter: None, within_group: None }), op: Plus, right: AtTimeZone { timestamp: Value(Interval { value: "14 days", leading_field: None, leading_precision: None, last_field: None, fractional_seconds_precision: None }), time_zone: Value(SingleQuotedString("UTC")) } })], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(BinaryOp { left: Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "now", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: None }), op: Plus, right: AtTimeZone { timestamp: Value(Interval { value: "14 days", leading_field: None, leading_precision: None, last_field: None, fractional_seconds_precision: None }), time_zone: Value(SingleQuotedString("UTC")) } })], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: SELECT c FROM t WHERE c >= '2019-03-27T22:00:00.000Z'::timestamp AT TIME ZONE 'Europe/Brussels'; -- https://github.com/sqlparser-rs/sqlparser-rs/issues/1266
formatted_sql: SELECT c FROM t WHERE c >= CAST('2019-03-27T22:00:00.000Z' AS TIMESTAMP) AT TIME ZONE 'Europe/Brussels'
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "c", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "t", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: Some(BinaryOp { left: Identifier(Ident { value: "c", quote_style: None }), op: GtEq, right: AtTimeZone { timestamp: Cast { expr: Value(SingleQuotedString("2019-03-27T22:00:00.000Z")), data_type: Timestamp(false) }, time_zone: Value(SingleQuotedString("Europe/Brussels")) } }), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
@@ -173,7 +173,7 @@
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), as_of: None }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), as_of: Some(ProcessTime) }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select percentile_cont(0.3) within group (order by x desc) from unnest(array[1,2,4,5,10]) as x
formatted_sql: SELECT percentile_cont(0.3) FROM unnest(ARRAY[1, 2, 4, 5, 10]) AS x
- formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Value(Number("0.3"))))], variadic: false, order_by: [] }, over: None, filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { scalar_as_agg: false, name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), arg_list: FunctionArgList { distinct: false, args: [Unnamed(Expr(Value(Number("0.3"))))], variadic: false, order_by: [], ignore_nulls: false }, over: None, filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: select percentile_cont(0.3) within group (order by x, y desc) from t
error_msg: |-
sql parser error: expected ), found: ,
diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs
index 4c8d8ae8e2bb9..f6d6f31fb3a4f 100644
--- a/src/storage/src/opts.rs
+++ b/src/storage/src/opts.rs
@@ -86,6 +86,7 @@ pub struct StorageOpts {
pub data_file_cache_file_capacity_mb: usize,
pub data_file_cache_flushers: usize,
pub data_file_cache_reclaimers: usize,
+ pub data_file_cache_recover_mode: foyer::RecoverMode,
pub data_file_cache_recover_concurrency: usize,
pub data_file_cache_insert_rate_limit_mb: usize,
pub data_file_cache_indexer_shards: usize,
@@ -105,6 +106,7 @@ pub struct StorageOpts {
pub meta_file_cache_file_capacity_mb: usize,
pub meta_file_cache_flushers: usize,
pub meta_file_cache_reclaimers: usize,
+ pub meta_file_cache_recover_mode: foyer::RecoverMode,
pub meta_file_cache_recover_concurrency: usize,
pub meta_file_cache_insert_rate_limit_mb: usize,
pub meta_file_cache_indexer_shards: usize,
@@ -181,6 +183,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
data_file_cache_file_capacity_mb: c.storage.data_file_cache.file_capacity_mb,
data_file_cache_flushers: c.storage.data_file_cache.flushers,
data_file_cache_reclaimers: c.storage.data_file_cache.reclaimers,
+ data_file_cache_recover_mode: c.storage.data_file_cache.recover_mode,
data_file_cache_recover_concurrency: c.storage.data_file_cache.recover_concurrency,
data_file_cache_insert_rate_limit_mb: c.storage.data_file_cache.insert_rate_limit_mb,
data_file_cache_indexer_shards: c.storage.data_file_cache.indexer_shards,
@@ -191,6 +194,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
meta_file_cache_file_capacity_mb: c.storage.meta_file_cache.file_capacity_mb,
meta_file_cache_flushers: c.storage.meta_file_cache.flushers,
meta_file_cache_reclaimers: c.storage.meta_file_cache.reclaimers,
+ meta_file_cache_recover_mode: c.storage.meta_file_cache.recover_mode,
meta_file_cache_recover_concurrency: c.storage.meta_file_cache.recover_concurrency,
meta_file_cache_insert_rate_limit_mb: c.storage.meta_file_cache.insert_rate_limit_mb,
meta_file_cache_indexer_shards: c.storage.meta_file_cache.indexer_shards,
diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs
index 2115db1aa56d1..22d30a5540af4 100644
--- a/src/storage/src/store_impl.rs
+++ b/src/storage/src/store_impl.rs
@@ -669,6 +669,7 @@ impl StateStoreImpl {
.with_clean_region_threshold(
opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
)
+ .with_recover_mode(opts.meta_file_cache_recover_mode)
.with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
.with_compression(
opts.meta_file_cache_compression
@@ -722,6 +723,7 @@ impl StateStoreImpl {
.with_clean_region_threshold(
opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
)
+ .with_recover_mode(opts.data_file_cache_recover_mode)
.with_recover_concurrency(opts.data_file_cache_recover_concurrency)
.with_compression(
opts.data_file_cache_compression