Skip to content

Commit

Permalink
feat(frontend): support SET TIME ZONE (risingwavelabs#8572)
Browse files Browse the repository at this point in the history
Signed-off-by: TennyZhuang <zty0826@gmail.com>
  • Loading branch information
TennyZhuang authored Mar 16, 2023
1 parent c19fc72 commit 65a641d
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use derivative::{self, Derivative};
use itertools::Itertools;
pub use query_mode::QueryMode;
pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
use tracing::info;

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
Expand Down Expand Up @@ -357,6 +358,7 @@ pub struct ConfigMap {

impl ConfigMap {
pub fn set(&mut self, key: &str, val: Vec<String>) -> Result<(), RwError> {
info!(%key, ?val, "set config");
let val = val.iter().map(AsRef::as_ref).collect_vec();
if key.eq_ignore_ascii_case(ImplicitFlush::entry_name()) {
self.implicit_flush = val.as_slice().try_into()?;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum-as-inner = "0.5"
fixedbitset = "0.4.1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = "0.2"
iana-time-zone = "0.1"
itertools = "0.10"
lazy_static = "1"
maplit = "1"
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::error::{ErrorCode, Result};
use risingwave_sqlparser::ast::*;

use self::util::DataChunkToRowSetAdapter;
use self::variable::handle_set_time_zone;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
use crate::session::SessionImpl;
use crate::utils::WithOptions;
Expand Down Expand Up @@ -351,6 +352,7 @@ pub async fn handle(
variable,
value,
} => variable::handle_set(handler_args, variable, value),
Statement::SetTimeZone { local: _, value } => handle_set_time_zone(handler_args, value),
Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
Statement::CreateIndex {
name,
Expand Down
23 changes: 21 additions & 2 deletions src/frontend/src/handler/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Row;
use risingwave_common::error::Result;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::{Ident, SetVariableValue, Value};
use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value};

use super::RwPgResponse;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -48,6 +48,25 @@ pub fn handle_set(
Ok(PgResponse::empty_result(StatementType::SET_VARIABLE))
}

pub(super) fn handle_set_time_zone(
handler_args: HandlerArgs,
value: SetTimeZoneValue,
) -> Result<RwPgResponse> {
let tz_info = match value {
SetTimeZoneValue::Local => iana_time_zone::get_timezone()
.map_err(|e| ErrorCode::InternalError(format!("Failed to get local time zone: {}", e))),
SetTimeZoneValue::Default => Ok("UTC".to_string()),
SetTimeZoneValue::Ident(ident) => Ok(ident.real_value()),
SetTimeZoneValue::Literal(Value::DoubleQuotedString(s))
| SetTimeZoneValue::Literal(Value::SingleQuotedString(s)) => Ok(s),
_ => Ok(value.to_string()),
}?;

handler_args.session.set_config("timezone", vec![tz_info])?;

Ok(PgResponse::empty_result(StatementType::SET_VARIABLE))
}

pub(super) async fn handle_show(
handler_args: HandlerArgs,
variable: Vec<Ident>,
Expand Down
33 changes: 33 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,11 @@ pub enum Statement {
snapshot: Option<Value>,
session: bool,
},
/// `SET [ SESSION | LOCAL ] TIME ZONE { value | 'value' | LOCAL | DEFAULT }`
SetTimeZone {
local: bool,
value: SetTimeZoneValue,
},
/// `COMMENT ON ...`
///
/// Note: this is a PostgreSQL-specific statement.
Expand Down Expand Up @@ -1450,6 +1455,14 @@ impl fmt::Display for Statement {
}
Ok(())
}
Statement::SetTimeZone { local, value } => {
write!(f, "SET")?;
if *local {
write!(f, " LOCAL")?;
}
write!(f, " TIME ZONE {}", value)?;
Ok(())
}
Statement::Commit { chain } => {
write!(f, "COMMIT{}", if *chain { " AND CHAIN" } else { "" },)
}
Expand Down Expand Up @@ -1975,6 +1988,26 @@ impl fmt::Display for EmitMode {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SetTimeZoneValue {
Ident(Ident),
Literal(Value),
Local,
Default,
}

impl fmt::Display for SetTimeZoneValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SetTimeZoneValue::Ident(ident) => write!(f, "{}", ident),
SetTimeZoneValue::Literal(value) => write!(f, "{}", value),
SetTimeZoneValue::Local => f.write_str("LOCAL"),
SetTimeZoneValue::Default => f.write_str("DEFAULT"),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum TransactionMode {
Expand Down
17 changes: 17 additions & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3282,6 +3282,23 @@ impl Parser {

pub fn parse_set(&mut self) -> Result<Statement, ParserError> {
let modifier = self.parse_one_of_keywords(&[Keyword::SESSION, Keyword::LOCAL]);
if self.parse_keywords(&[Keyword::TIME, Keyword::ZONE]) {
let value = if self.parse_keyword(Keyword::DEFAULT) {
SetTimeZoneValue::Default
} else if self.parse_keyword(Keyword::LOCAL) {
SetTimeZoneValue::Local
} else if let Ok(ident) = self.parse_identifier() {
SetTimeZoneValue::Ident(ident)
} else {
let value = self.parse_value()?;
SetTimeZoneValue::Literal(value)
};

return Ok(Statement::SetTimeZone {
local: modifier == Some(Keyword::LOCAL),
value,
});
}
let variable = self.parse_identifier()?;
if self.consume_token(&Token::Eq) || self.parse_keyword(Keyword::TO) {
let mut values = vec![];
Expand Down
2 changes: 1 addition & 1 deletion src/sqlparser/tests/testdata/select.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@
Near "SELECT 1::int"
- input: select id1, a1, id2, a2 from stream as S join version FOR SYSTEM_TIME AS OF NOW() AS V on id1= id2
formatted_sql: SELECT id1, a1, id2, a2 FROM stream AS S JOIN version FOR SYSTEM_TIME AS OF NOW() AS V ON id1 = id2
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_now: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_now: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_now: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_now: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
17 changes: 17 additions & 0 deletions src/sqlparser/tests/testdata/set.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# This file is automatically generated. See `src/sqlparser/test_runner/src/bin/apply.rs` for more information.
- input: SET TIME ZONE LOCAL
formatted_sql: SET TIME ZONE LOCAL
- input: SET TIME ZONE DEFAULT
formatted_sql: SET TIME ZONE DEFAULT
- input: SET TIME ZONE "Asia/Shanghai"
formatted_sql: SET TIME ZONE "Asia/Shanghai"
- input: SET TIME ZONE 'Asia/Shanghai'
error_msg: |-
sql parser error: Expected a value, found: EOF
Near "SET TIME ZONE 'Asia/Shanghai'"
- input: SET TIME ZONE "UTC"
formatted_sql: SET TIME ZONE "UTC"
- input: SET TIME ZONE UTC
formatted_sql: SET TIME ZONE UTC
- input: set time = '1';
formatted_sql: SET time = '1'

0 comments on commit 65a641d

Please sign in to comment.