Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support query data without checkpoint #5850

Merged
merged 16 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions e2e_test/batch/visibility_all.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
statement ok
SET VISIBILITY_MODE TO all;

include ./local_mode.slt
include ./distribution_mode.slt

1 change: 0 additions & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
value_indices,
prefix_hint_len,
);

Ok(Box::new(RowSeqScanExecutor::new(
table,
scan_ranges,
Expand Down
22 changes: 21 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod query_mode;
mod search_path;
mod transaction_isolation_level;
mod visibility_mode;

use std::ops::Deref;

Expand All @@ -24,11 +25,12 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
use crate::session_config::visibility_mode::VisibilityMode;
use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 12] = [
const CONFIG_KEYS: [&str; 13] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand All @@ -41,6 +43,7 @@ const CONFIG_KEYS: [&str; 12] = [
"TRANSACTION ISOLATION LEVEL",
"QUERY_EPOCH",
"RW_BATCH_ENABLE_SORT_AGG",
"VISIBILITY_MODE",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand All @@ -57,6 +60,7 @@ const SEARCH_PATH: usize = 8;
const TRANSACTION_ISOLATION_LEVEL: usize = 9;
const QUERY_EPOCH: usize = 10;
const BATCH_ENABLE_SORT_AGG: usize = 11;
const VISIBILITY_MODE: usize = 12;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -288,6 +292,9 @@ pub struct ConfigMap {
/// see <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
search_path: SearchPath,

/// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
visibility_mode: VisibilityMode,

/// see <https://www.postgresql.org/docs/current/transaction-iso.html>
transaction_isolation_level: IsolationLevel,

Expand Down Expand Up @@ -318,6 +325,8 @@ impl ConfigMap {
self.max_split_range_gap = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(SearchPath::entry_name()) {
self.search_path = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(VisibilityMode::entry_name()) {
self.visibility_mode = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
self.query_epoch = val.as_slice().try_into()?;
} else {
Expand Down Expand Up @@ -348,6 +357,8 @@ impl ConfigMap {
Ok(self.max_split_range_gap.to_string())
} else if key.eq_ignore_ascii_case(SearchPath::entry_name()) {
Ok(self.search_path.to_string())
} else if key.eq_ignore_ascii_case(VisibilityMode::entry_name()) {
Ok(self.visibility_mode.to_string())
} else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) {
Ok(self.transaction_isolation_level.to_string())
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
Expand Down Expand Up @@ -409,6 +420,11 @@ impl ConfigMap {
setting : self.search_path.to_string(),
description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified")
},
VariableInfo{
name : VisibilityMode::entry_name().to_lowercase(),
setting : self.visibility_mode.to_string(),
description : String::from("If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.")
},
VariableInfo {
name: QueryEpoch::entry_name().to_lowercase(),
setting : self.query_epoch.to_string(),
Expand Down Expand Up @@ -461,6 +477,10 @@ impl ConfigMap {
self.search_path.clone()
}

pub fn only_checkpoint_visible(&self) -> bool {
matches!(self.visibility_mode, VisibilityMode::Checkpoint)
}

pub fn get_query_epoch(&self) -> Option<Epoch> {
if self.query_epoch.0 != 0 {
return Some((self.query_epoch.0).into());
Expand Down
97 changes: 97 additions & 0 deletions src/common/src/session_config/visibility_mode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 Singularity Data
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Contains configurations that could be accessed via "set" command.

use std::fmt::Formatter;

use super::{ConfigEntry, CONFIG_KEYS};
use crate::error::ErrorCode::{self, InvalidConfigValue};
use crate::error::RwError;
use crate::session_config::VISIBILITY_MODE;

#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
pub enum VisibilityMode {
#[default]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask the reason to read Checkpoint by default? Do we have the plan to change the default value to All?

Copy link
Contributor

@zwang28 zwang28 Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When VISIBILITY_MODE=all, a FLUSH command no more enforce a checkpoint barrier, thus may surprise user that their data is still lost even they have FLUSHed. Besides this, I don't think of other concerns not to use VISIBILITY_MODE=all by default. @hzxa21 @xxhZs
Or shall we even change FLUSH back to always enforce a checkpoint?

Copy link
Collaborator

@hzxa21 hzxa21 Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to enable VISIBILITY_MODE=all by default since it provides better freshness and normally user doesn't pay serious attention to data rewind.

shall we even change FLUSH back to always enforce a checkpoint?

This sounds reasonable to me since FLUSH normally indicates persistence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or shall we even change FLUSH back to always enforce a checkpoint?

+1, agree with Patrick

Copy link
Contributor

@zwang28 zwang28 Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix in #7188

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some earlier discussions: #4966 (comment)

Should we adopt the CHECKPOINT command now?

Checkpoint,

All,
}

impl ConfigEntry for VisibilityMode {
fn entry_name() -> &'static str {
CONFIG_KEYS[VISIBILITY_MODE]
}
}

impl TryFrom<&[&str]> for VisibilityMode {
type Error = RwError;

fn try_from(value: &[&str]) -> Result<Self, Self::Error> {
if value.len() != 1 {
return Err(ErrorCode::InternalError(format!(
"SET {} takes only one argument",
Self::entry_name()
))
.into());
}

let s = value[0];
if s.eq_ignore_ascii_case("all") {
Ok(Self::All)
} else if s.eq_ignore_ascii_case("checkpoint") {
Ok(Self::Checkpoint)
} else {
Err(InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
config_value: s.to_string(),
})?
}
}
}

impl std::fmt::Display for VisibilityMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::All => write!(f, "all"),
Self::Checkpoint => write!(f, "checkpoint"),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parse_query_mode() {
assert_eq!(
VisibilityMode::try_from(["all"].as_slice()).unwrap(),
VisibilityMode::All
);
assert_eq!(
VisibilityMode::try_from(["All"].as_slice()).unwrap(),
VisibilityMode::All
);
assert_eq!(
VisibilityMode::try_from(["checkpoint"].as_slice()).unwrap(),
VisibilityMode::Checkpoint
);
assert_eq!(
VisibilityMode::try_from(["checkPoint"].as_slice()).unwrap(),
VisibilityMode::Checkpoint
);
assert!(VisibilityMode::try_from(["ab"].as_slice()).is_err());
}
}
18 changes: 11 additions & 7 deletions src/frontend/src/handler/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ use risingwave_common::error::Result;

use super::RwPgResponse;
use crate::handler::HandlerArgs;
use crate::session::SessionImpl;

pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result<RwPgResponse> {
let client = handler_args.session.env().meta_client();
// The returned epoch >= epoch for flush, but it is okay.
let snapshot = client.flush(true).await?;
// Update max epoch to ensure read-after-write correctness.
handler_args
.session
do_flush(&handler_args.session).await?;
Ok(PgResponse::empty_result(StatementType::FLUSH))
}

pub(crate) async fn do_flush(session: &SessionImpl) -> Result<()> {
let client = session.env().meta_client();
let checkpoint = session.config().only_checkpoint_visible();
let snapshot = client.flush(checkpoint).await?;
session
.env()
.hummock_snapshot_manager()
.update_epoch(snapshot);
Ok(PgResponse::empty_result(StatementType::FLUSH))
Ok(())
}
11 changes: 4 additions & 7 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_sqlparser::ast::Statement;

use super::{PgResponseStream, RwPgResponse};
use crate::binder::{Binder, BoundSetExpr, BoundStatement};
use crate::handler::flush::do_flush;
use crate::handler::privilege::{check_privileges, resolve_privileges};
use crate::handler::util::{to_pg_field, DataChunkToRowSetAdapter};
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -97,6 +98,7 @@ pub async fn handle_query(
let stmt_type = to_statement_type(&stmt)?;
let session = handler_args.session.clone();
let query_start_time = Instant::now();
let only_checkpoint_visible = handler_args.session.config().only_checkpoint_visible();

// Subblock to make sure PlanRef (an Rc) is dropped before `await` below.
let (query, query_mode, output_schema) = {
Expand Down Expand Up @@ -137,7 +139,7 @@ pub async fn handle_query(
let hummock_snapshot_manager = session.env().hummock_snapshot_manager();
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?;
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot)
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, only_checkpoint_visible)
};
match query_mode {
QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
Expand Down Expand Up @@ -262,12 +264,7 @@ pub async fn local_execute(
pub async fn flush_for_write(session: &SessionImpl, stmt_type: StatementType) -> Result<()> {
match stmt_type {
StatementType::INSERT | StatementType::DELETE | StatementType::UPDATE => {
let client = session.env().meta_client();
let snapshot = client.flush(true).await?;
session
.env()
.hummock_snapshot_manager()
.update_epoch(snapshot);
do_flush(session).await?;
}
_ => {}
}
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ impl QueryExecution {
.collect::<Vec<Arc<StageExecution>>>();

let stage_exec = Arc::new(StageExecution::new(
// TODO: Add support to use current epoch when needed
pinned_snapshot.get_batch_query_epoch(),
self.query.stage_graph.stages[&stage_id].clone(),
worker_node_manager.clone(),
Expand Down Expand Up @@ -400,7 +399,9 @@ pub(crate) mod tests {
use crate::scheduler::distributed::QueryExecution;
use crate::scheduler::plan_fragmenter::{BatchPlanFragmenter, Query};
use crate::scheduler::worker_node_manager::WorkerNodeManager;
use crate::scheduler::{ExecutionContext, HummockSnapshotManager, QueryExecutionInfo};
use crate::scheduler::{
ExecutionContext, HummockSnapshotManager, PinnedHummockSnapshot, QueryExecutionInfo,
};
use crate::session::SessionImpl;
use crate::test_utils::MockFrontendMetaClient;
use crate::utils::Condition;
Expand All @@ -426,7 +427,7 @@ pub(crate) mod tests {
.start(
ExecutionContext::new(SessionImpl::mock().into()).into(),
worker_node_manager,
pinned_snapshot.into(),
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, true),
compute_client_pool,
catalog_reader,
query_execution_info,
Expand Down
35 changes: 15 additions & 20 deletions src/frontend/src/scheduler/hummock_snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ const UNPIN_INTERVAL_SECS: u64 = 10;

pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
pub enum PinnedHummockSnapshot {
FrontendPinned(HummockSnapshotGuard),
FrontendPinned(
HummockSnapshotGuard,
// `only_checkpoint_visible`.
// It's embedded here because we always use it together with snapshot.
bool,
Copy link
Contributor

@zwang28 zwang28 Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to embed only_checkpoint_visible here, instead of previously

  • case 1 for local execution: passing another boolean variable around. It complicates interfaces.
  • case 2 for distributed execution: reading from session config each time. It is unsafe because config can be changed between multiple reads within a same query.

@xxhZs

),
/// Other arbitrary epoch, e.g. user specified.
/// Availability and consistency of underlying data should be guaranteed accordingly.
/// Currently it's only used for querying meta snapshot backup.
Expand All @@ -44,13 +49,8 @@ pub enum PinnedHummockSnapshot {
impl PinnedHummockSnapshot {
pub fn get_batch_query_epoch(&self) -> BatchQueryEpoch {
match self {
PinnedHummockSnapshot::FrontendPinned(s) => {
// extend Epoch::Current here
BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Committed(
s.snapshot.committed_epoch,
)),
}
PinnedHummockSnapshot::FrontendPinned(s, checkpoint) => {
s.get_batch_query_epoch(*checkpoint)
}
PinnedHummockSnapshot::Other(e) => BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
Expand All @@ -59,12 +59,6 @@ impl PinnedHummockSnapshot {
}
}

impl From<HummockSnapshotGuard> for PinnedHummockSnapshot {
fn from(s: HummockSnapshotGuard) -> Self {
PinnedHummockSnapshot::FrontendPinned(s)
}
}

type SnapshotRef = Arc<ArcSwap<HummockSnapshot>>;

/// Cache of hummock snapshot in meta.
Expand Down Expand Up @@ -104,12 +98,13 @@ pub struct HummockSnapshotGuard {
}

impl HummockSnapshotGuard {
pub fn get_committed_epoch(&self) -> u64 {
self.snapshot.committed_epoch
}

pub fn get_current_epoch(&self) -> u64 {
self.snapshot.current_epoch
pub fn get_batch_query_epoch(&self, checkpoint: bool) -> BatchQueryEpoch {
let epoch = if checkpoint {
batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch)
} else {
batch_query_epoch::Epoch::Current(self.snapshot.current_epoch)
};
BatchQueryEpoch { epoch: Some(epoch) }
}
}

Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl LocalQueryExecution {
&plan_node,
&task_id,
context,
// TODO: Add support to use current epoch when needed
self.snapshot.get_batch_query_epoch(),
);
let executor = executor.build().await?;
Expand Down Expand Up @@ -245,7 +244,6 @@ impl LocalQueryExecution {
};
let local_execute_plan = LocalExecutePlan {
plan: Some(second_stage_plan_fragment),
// TODO: Add support to use current epoch when needed
epoch: Some(self.snapshot.get_batch_query_epoch()),
};
let exchange_source = ExchangeSource {
Expand Down Expand Up @@ -278,7 +276,6 @@ impl LocalQueryExecution {
};
let local_execute_plan = LocalExecutePlan {
plan: Some(second_stage_plan_fragment),
// TODO: Add support to use current epoch when needed
epoch: Some(self.snapshot.get_batch_query_epoch()),
};
// NOTE: select a random work node here.
Expand Down Expand Up @@ -311,7 +308,6 @@ impl LocalQueryExecution {

let local_execute_plan = LocalExecutePlan {
plan: Some(second_stage_plan_fragment),
// TODO: Add support to use current epoch when needed
epoch: Some(self.snapshot.get_batch_query_epoch()),
};

Expand Down
Loading