diff --git a/Cargo.lock b/Cargo.lock index f5e8d842ffd67..02a54bc05e3c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6117,6 +6117,21 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "risingwave_e2e_extended_mode_test" +version = "0.2.0-alpha" +dependencies = [ + "anyhow", + "chrono", + "clap 4.1.11", + "pg_interval", + "rust_decimal", + "tokio", + "tokio-postgres", + "tracing", + "tracing-subscriber", +] + [[package]] name = "risingwave_expr" version = "0.2.0-alpha" @@ -8838,6 +8853,7 @@ dependencies = [ "regex-syntax", "reqwest", "ring", + "rust_decimal", "scopeguard", "serde", "serde_json", @@ -8849,6 +8865,7 @@ dependencies = [ "syn", "time 0.3.17", "tokio", + "tokio-postgres", "tokio-stream", "tokio-util", "tonic", diff --git a/Cargo.toml b/Cargo.toml index 1be6dbbbc2884..042c61a240113 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ "src/stream", "src/test_runner", "src/tests/compaction_test", + "src/tests/e2e_extended_mode", "src/tests/regress", "src/tests/simulation", "src/tests/sqlsmith", diff --git a/ci/scripts/build.sh b/ci/scripts/build.sh index d9c8a9fee0b9d..ca8e826cbf62b 100755 --- a/ci/scripts/build.sh +++ b/ci/scripts/build.sh @@ -43,10 +43,11 @@ cargo build \ -p risingwave_compaction_test \ -p risingwave_backup_cmd \ -p risingwave_java_binding \ + -p risingwave_e2e_extended_mode_test \ --features "static-link static-log-level" --profile "$profile" # the file name suffix of artifact for risingwave_java_binding is so only for linux. It is dylib for MacOS -artifacts=(risingwave sqlsmith compaction-test backup-restore risingwave_regress_test risedev-dev delete-range-test librisingwave_java_binding.so) +artifacts=(risingwave sqlsmith compaction-test backup-restore risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev delete-range-test librisingwave_java_binding.so) echo "--- Show link info" ldd target/"$target"/risingwave diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index cc29af13c2fdf..2140503ba1915 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -24,12 +24,15 @@ mkdir -p target/debug buildkite-agent artifact download risingwave-"$profile" target/debug/ buildkite-agent artifact download risedev-dev-"$profile" target/debug/ buildkite-agent artifact download "e2e_test/generated/*" ./ +buildkite-agent artifact download risingwave_e2e_extended_mode_test-"$profile" target/debug/ mv target/debug/risingwave-"$profile" target/debug/risingwave mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev +mv target/debug/risingwave_e2e_extended_mode_test-"$profile" target/debug/risingwave_e2e_extended_mode_test echo "--- Adjust permission" chmod +x ./target/debug/risingwave chmod +x ./target/debug/risedev-dev +chmod +x ./target/debug/risingwave_e2e_extended_mode_test echo "--- Generate RiseDev CI config" cp ci/risedev-components.ci.env risedev-components.user.env @@ -75,7 +78,10 @@ cargo make ci-kill echo "--- e2e, ci-3cn-1fe, extended query" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cargo make ci-start ci-3cn-1fe -sqllogictest -p 4566 -d dev -e postgres-extended './e2e_test/extended_query/**/*.slt' +sqllogictest -p 4566 -d dev -e postgres-extended './e2e_test/extended_mode/**/*.slt' +RUST_BACKTRACE=1 target/debug/risingwave_e2e_extended_mode_test --host 127.0.0.1 \ + -p 4566 \ + -u root echo "--- Kill cluster" cargo make ci-kill diff --git a/e2e_test/extended_query/basic.slt b/e2e_test/extended_mode/basic.slt similarity index 54% rename from e2e_test/extended_query/basic.slt rename to e2e_test/extended_mode/basic.slt index c2cf66d73295d..b54fa84d6b4b2 100644 --- a/e2e_test/extended_query/basic.slt +++ b/e2e_test/extended_mode/basic.slt @@ -1,14 +1,4 @@ -# The basic.slt is to cover the path of pgwire. -# -# There are two kinds of statement, they run different path of pgwire: -# 1. un-query statement: SET,CREATE,INSERT,FLUSH,EXPLAIN,DROP.. -# 2. query statement: SELECT,WITH,VALUES,SHOW,DESCRIBE.. -# -# We also need to test different type in extended query mode: -# smallint,int,bigint -# real,double precision,numeric -# time,date,timestamp - +# Test different statements(DDL,DQL,DML) in extended mode. statement ok SET RW_IMPLICIT_FLUSH TO true; @@ -78,18 +68,3 @@ with t as (select generate_series(1,3,1)) select * from t; 1 2 3 - -query III -select 42::smallint, 42::int, 42::bigint; ----- -42 42 42 - -query III -select 42::real,42::double precision,42::decimal; ----- -42 42 42 - -query TTT -select '20:55:12'::time,'2022-07-12'::date,'2022-07-12 20:55:12'::timestamp; ----- -20:55:12 2022-07-12 2022-07-12 20:55:12 diff --git a/e2e_test/extended_mode/type.slt b/e2e_test/extended_mode/type.slt new file mode 100644 index 0000000000000..2271ecb51c5c9 --- /dev/null +++ b/e2e_test/extended_mode/type.slt @@ -0,0 +1,28 @@ +# Test binary format of different type. (sqllogitest return binary format in extended mode) + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# RisingWave can't support list and struct now so we skip them. +# include ../batch/types/array.slt.part +# include ../batch/types/struct.slt.part +# include ../batch/types/list.slt.part + +# Sqllogitest can't support binary format bytea type so we skip it. +# include ../batch/types/bytea.slt.part + +# Can't support inf,-inf binary format now so we skip it. +# include ../batch/types/decimal.slt.part + +# Sqllogitest can't support binary format jsonb type so we skip it. +# include ../batch/types/jsonb_ord.slt.part +# include ../batch/types/jsonb.slt.part + +include ../batch/types/boolean.slt.part +include ../batch/types/cast.slt.part +include ../batch/types/date.slt +include ../batch/types/intercal.slt.part +include ../batch/types/number_arithmetic.slt.part +include ../batch/types/temporal_arithmetic.slt.part +include ../batch/types/time.slt.part +include ../batch/types/timestamptz_utc.slt.part diff --git a/src/tests/e2e_extended_mode/Cargo.toml b/src/tests/e2e_extended_mode/Cargo.toml new file mode 100644 index 0000000000000..831c7ba35a636 --- /dev/null +++ b/src/tests/e2e_extended_mode/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "risingwave_e2e_extended_mode_test" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +anyhow = { version = "1", features = ["backtrace"] } +chrono = { version = "0.4", features = ['serde'] } +clap = { version = "4", features = ["derive"] } +pg_interval = "0.4" +rust_decimal ={ version = "1.25", features = ["db-postgres","db-tokio-postgres"] } +tokio = { version = "1", features = ["rt", "macros","rt-multi-thread"] } +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +tracing = "0.1" +tracing-subscriber = "0.3.16" + +[[bin]] +name = "risingwave_e2e_extended_mode_test" +path = "src/main.rs" diff --git a/src/tests/e2e_extended_mode/README.md b/src/tests/e2e_extended_mode/README.md new file mode 100644 index 0000000000000..5c0a1fbc96871 --- /dev/null +++ b/src/tests/e2e_extended_mode/README.md @@ -0,0 +1,21 @@ +This is a program used for e2e test in extended mode. + +## What is difference between it and extended_mode/*.slt in e2e_test + +For e2e test in extended query mode, there are two thing we can't test in sqllogitest +1. bind parameter +2. max row number +See [detail](https://www.postgresql.org/docs/15/protocol-flow.html#PROTOCOL-FLOW-PIPELINING:~:text=Once%20a%20portal,count%20is%20ignored) + +So before sqllogictest supporting these, we test these function in this program. + +In the future, we may merge it to e2e_text/extended_query + +# How to run + +```shell +RUST_BACKTRACE=1 target/debug/risingwave_e2e_extended_mode_test --host 127.0.0.1 \ + -p 4566 \ + -u root \ + --database dev \ +``` \ No newline at end of file diff --git a/src/tests/e2e_extended_mode/src/main.rs b/src/tests/e2e_extended_mode/src/main.rs new file mode 100644 index 0000000000000..32f5419048492 --- /dev/null +++ b/src/tests/e2e_extended_mode/src/main.rs @@ -0,0 +1,48 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod opts; +mod test; + +use std::process::exit; + +use clap::Parser; +use tracing::{error, info}; + +use crate::opts::Opts; +use crate::test::TestSuite; + +#[tokio::main(flavor = "multi_thread", worker_threads = 5)] +async fn main() { + exit(run_test().await) +} + +async fn run_test() -> i32 { + let opts = Opts::parse(); + + tracing_subscriber::fmt::init(); + + let test_suite = TestSuite::new(opts); + + match test_suite.test().await { + Ok(_) => { + info!("Risingwave e2e extended mode test completed successfully!"); + 0 + } + Err(e) => { + error!("Risingwave e2e extended mode test failed: {:?}. Please ensure that your psql version is larger than 14.1", e); + 1 + } + } +} diff --git a/src/tests/e2e_extended_mode/src/opts.rs b/src/tests/e2e_extended_mode/src/opts.rs new file mode 100644 index 0000000000000..bd83c54e3e8ff --- /dev/null +++ b/src/tests/e2e_extended_mode/src/opts.rs @@ -0,0 +1,33 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use clap::{Parser, ValueHint}; + +#[derive(Parser, Debug, Clone)] +pub struct Opts { + /// Database name used to connect to pg. + #[clap(name = "DB", long = "database", default_value = "dev")] + pub pg_db_name: String, + /// Username used to connect to postgresql. + #[clap(name = "PG_USERNAME", short = 'u', long = "user", default_value="postgres", value_hint=ValueHint::Username)] + pub pg_user_name: String, + /// Postgresql server address to test against. + #[clap(name = "PG_SERVER_ADDRESS", long = "host", default_value = "localhost")] + pub pg_server_host: String, + /// Postgresql server port to test against. + #[clap(name = "PG_SERVER_PORT", short = 'p', long = "port")] + pub pg_server_port: u16, + #[clap(name = "PG_PASSWARD", long = "password", default_value = "")] + pub pg_password: String, +} diff --git a/src/tests/e2e_extended_mode/src/test.rs b/src/tests/e2e_extended_mode/src/test.rs new file mode 100644 index 0000000000000..abf701af763ce --- /dev/null +++ b/src/tests/e2e_extended_mode/src/test.rs @@ -0,0 +1,334 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use pg_interval::Interval; +use rust_decimal::prelude::FromPrimitive; +use rust_decimal::Decimal; +use tokio_postgres::types::Type; +use tokio_postgres::NoTls; + +use crate::opts::Opts; + +pub struct TestSuite { + config: String, +} + +macro_rules! test_eq { + ($left:expr, $right:expr $(,)?) => { + match (&$left, &$right) { + (left_val, right_val) => { + if !(*left_val == *right_val) { + return Err(anyhow!( + "assertion failed: `(left == right)` \ + (left: `{:?}`, right: `{:?}`)", + left_val, + right_val + )); + } + } + } + }; +} + +impl TestSuite { + pub fn new(opts: Opts) -> Self { + let config = if !opts.pg_password.is_empty() { + format!( + "dbname={} user={} host={} port={} password={}", + opts.pg_db_name, + opts.pg_user_name, + opts.pg_server_host, + opts.pg_server_port, + opts.pg_password + ) + } else { + format!( + "dbname={} user={} host={} port={}", + opts.pg_db_name, opts.pg_user_name, opts.pg_server_host, opts.pg_server_port + ) + }; + Self { config } + } + + pub async fn test(&self) -> anyhow::Result<()> { + self.binary_param_and_result().await?; + self.dql_dml_with_param().await?; + self.max_row().await?; + Ok(()) + } + + pub async fn binary_param_and_result(&self) -> anyhow::Result<()> { + // Connect to the database. + let (client, connection) = tokio_postgres::connect(&self.config, NoTls).await?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + for row in client.query("select $1::SMALLINT;", &[&1024_i16]).await? { + let data: i16 = row.try_get(0)?; + test_eq!(data, 1024); + } + + for row in client.query("select $1::INT;", &[&144232_i32]).await? { + let data: i32 = row.try_get(0)?; + test_eq!(data, 144232); + } + + for row in client.query("select $1::BIGINT;", &[&99999999_i64]).await? { + let data: i64 = row.try_get(0)?; + test_eq!(data, 99999999); + } + + for row in client + .query("select $1::DECIMAL;", &[&Decimal::from_f32(2.33454_f32)]) + .await? + { + let data: Decimal = row.try_get(0)?; + test_eq!(data, Decimal::from_f32(2.33454_f32).unwrap()); + } + + for row in client.query("select $1::BOOL;", &[&true]).await? { + let data: bool = row.try_get(0)?; + assert!(data); + } + + for row in client.query("select $1::REAL;", &[&1.234234_f32]).await? { + let data: f32 = row.try_get(0)?; + test_eq!(data, 1.234234); + } + + // TODO(ZENOTME): After #8112, risingwave should support this case. (DOUBLE PRECISION TYPE) + // for row in client + // .query("select $1::DOUBLE PRECISION;", &[&234234.23490238483_f64]) + // .await? + // { + // let data: f64 = row.try_get(0)?; + // test_eq!(data, 234234.23490238483); + // } + for row in client + .query("select $1::FLOAT8;", &[&234234.23490238483_f64]) + .await? + { + let data: f64 = row.try_get(0)?; + test_eq!(data, 234234.23490238483); + } + + for row in client + .query( + "select $1::date;", + &[&NaiveDate::from_ymd_opt(2022, 1, 1).unwrap()], + ) + .await? + { + let data: NaiveDate = row.try_get(0)?; + test_eq!(data, NaiveDate::from_ymd_opt(2022, 1, 1).unwrap()); + } + + for row in client + .query( + "select $1::time", + &[&NaiveTime::from_hms_opt(10, 0, 0).unwrap()], + ) + .await? + { + let data: NaiveTime = row.try_get(0)?; + test_eq!(data, NaiveTime::from_hms_opt(10, 0, 0).unwrap()); + } + + for row in client + .query( + "select $1::timestamp", + &[&NaiveDate::from_ymd_opt(2022, 1, 1) + .unwrap() + .and_hms_opt(10, 0, 0) + .unwrap()], + ) + .await? + { + let data: NaiveDateTime = row.try_get(0)?; + test_eq!( + data, + NaiveDate::from_ymd_opt(2022, 1, 1) + .unwrap() + .and_hms_opt(10, 0, 0) + .unwrap() + ); + } + + let timestamptz = DateTime::::from_utc( + NaiveDate::from_ymd_opt(2022, 1, 1) + .unwrap() + .and_hms_opt(10, 0, 0) + .unwrap(), + Utc, + ); + for row in client + .query("select $1::timestamptz", &[×tamptz]) + .await? + { + let data: DateTime = row.try_get(0)?; + test_eq!(data, timestamptz); + } + + for row in client + .query("select $1::interval", &[&Interval::new(1, 1, 24000000)]) + .await? + { + let data: Interval = row.try_get(0)?; + test_eq!(data, Interval::new(1, 1, 24000000)); + } + + Ok(()) + } + + /// TODO(ZENOTME): After #8112, risingwave should support to change all `prepare_typed` to + /// `prepare`. We don't need to provide the type explicitly. + async fn dql_dml_with_param(&self) -> anyhow::Result<()> { + let (client, connection) = tokio_postgres::connect(&self.config, NoTls).await?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + client.query("create table t(id int)", &[]).await?; + + let insert_statement = client + .prepare_typed("insert INTO t (id) VALUES ($1)", &[Type::INT4]) + .await?; + + for i in 0..20 { + client.execute(&insert_statement, &[&i]).await?; + } + client.execute("flush", &[]).await?; + + let update_statement = client + .prepare_typed( + "update t set id = $1 where id < $2", + &[Type::INT4, Type::INT4], + ) + .await?; + let query_statement = client + .prepare_typed( + "select * FROM t where id < $1 order by id ASC", + &[Type::INT4], + ) + .await?; + let delete_statement = client + .prepare_typed("delete FROM t where id < $1", &[Type::INT4]) + .await?; + + let mut i = 0; + for row in client.query(&query_statement, &[&10_i32]).await? { + let id: i32 = row.try_get(0)?; + test_eq!(id, i); + i += 1; + } + test_eq!(i, 10); + + client + .execute(&update_statement, &[&100_i32, &10_i32]) + .await?; + client.execute("flush", &[]).await?; + + let mut i = 0; + for _ in client.query(&query_statement, &[&10_i32]).await? { + i += 1; + } + test_eq!(i, 0); + + client.execute(&delete_statement, &[&20_i32]).await?; + client.execute("flush", &[]).await?; + + let mut i = 0; + for row in client.query(&query_statement, &[&101_i32]).await? { + let id: i32 = row.try_get(0)?; + test_eq!(id, 100); + i += 1; + } + test_eq!(i, 10); + + client.execute("drop table t", &[]).await?; + + Ok(()) + } + + async fn max_row(&self) -> anyhow::Result<()> { + let (mut client, connection) = tokio_postgres::connect(&self.config, NoTls).await?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + client.query("create table t(id int)", &[]).await?; + + let insert_statement = client + .prepare_typed("insert INTO t (id) VALUES ($1)", &[Type::INT4]) + .await?; + + for i in 0..10 { + client.execute(&insert_statement, &[&i]).await?; + } + client.execute("flush", &[]).await?; + + let transaction = client.transaction().await?; + let statement = transaction + .prepare_typed("SELECT * FROM t order by id", &[]) + .await?; + let portal = transaction.bind(&statement, &[]).await?; + + for t in 0..5 { + let rows = transaction.query_portal(&portal, 1).await?; + test_eq!(rows.len(), 1); + let row = rows.get(0).unwrap(); + let id: i32 = row.get(0); + test_eq!(id, t); + } + + let mut i = 5; + for row in transaction.query_portal(&portal, 3).await? { + let id: i32 = row.get(0); + test_eq!(id, i); + i += 1; + } + test_eq!(i, 8); + + for row in transaction.query_portal(&portal, 5).await? { + let id: i32 = row.get(0); + test_eq!(id, i); + i += 1; + } + test_eq!(i, 10); + + transaction.rollback().await?; + + client.execute("drop table t", &[]).await?; + + Ok(()) + } +} diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index d38c1ada77980..4b425b9fbe8b8 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -83,6 +83,7 @@ regex = { version = "1" } regex-syntax = { version = "0.6" } reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } +rust_decimal = { version = "1", features = ["db-postgres", "db-tokio-postgres"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_json = { version = "1", features = ["alloc"] } @@ -93,6 +94,7 @@ strum = { version = "0.24", features = ["derive"] } subtle = { version = "2" } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } +tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc", features = ["with-chrono-0_4"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["fs", "net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } @@ -176,6 +178,7 @@ regex = { version = "1" } regex-syntax = { version = "0.6" } reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } +rust_decimal = { version = "1", features = ["db-postgres", "db-tokio-postgres"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_json = { version = "1", features = ["alloc"] } @@ -187,6 +190,7 @@ subtle = { version = "2" } syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } +tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc", features = ["with-chrono-0_4"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["fs", "net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] }