Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement pg_catalog upon local execution mode #3387

Merged
merged 12 commits into from
Jun 22, 2022
69 changes: 69 additions & 0 deletions e2e_test/batch/catalog/pg_cast.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
query IIIT
SELECT * FROM pg_catalog.pg_cast
----
0 16 23 EXPLICIT
1 16 1043 ASSIGN
2 21 23 IMPLICIT
3 21 20 IMPLICIT
4 21 1700 IMPLICIT
5 21 700 IMPLICIT
6 21 701 IMPLICIT
7 21 1043 ASSIGN
8 23 16 EXPLICIT
9 23 21 ASSIGN
10 23 20 IMPLICIT
11 23 1700 IMPLICIT
12 23 700 IMPLICIT
13 23 701 IMPLICIT
14 23 1043 ASSIGN
15 20 21 ASSIGN
16 20 23 ASSIGN
17 20 1700 IMPLICIT
18 20 700 IMPLICIT
19 20 701 IMPLICIT
20 20 1043 ASSIGN
21 1700 21 ASSIGN
22 1700 23 ASSIGN
23 1700 20 ASSIGN
24 1700 700 IMPLICIT
25 1700 701 IMPLICIT
26 1700 1043 ASSIGN
27 700 21 ASSIGN
28 700 23 ASSIGN
29 700 20 ASSIGN
30 700 1700 ASSIGN
31 700 701 IMPLICIT
32 700 1043 ASSIGN
33 701 21 ASSIGN
34 701 23 ASSIGN
35 701 20 ASSIGN
36 701 1700 ASSIGN
37 701 700 ASSIGN
38 701 1043 ASSIGN
39 1043 16 ASSIGN
40 1043 21 ASSIGN
41 1043 23 ASSIGN
42 1043 20 ASSIGN
43 1043 1700 ASSIGN
44 1043 700 ASSIGN
45 1043 701 ASSIGN
46 1043 1082 ASSIGN
47 1043 1114 ASSIGN
48 1043 1184 ASSIGN
49 1043 1083 ASSIGN
50 1043 1043 ASSIGN
51 1082 1043 ASSIGN
52 1082 1114 IMPLICIT
53 1082 1184 IMPLICIT
54 1114 1043 ASSIGN
55 1114 1082 ASSIGN
56 1114 1184 IMPLICIT
57 1114 1083 ASSIGN
58 1184 1043 ASSIGN
59 1184 1082 ASSIGN
60 1184 1114 ASSIGN
61 1184 1083 ASSIGN
62 1083 1043 ASSIGN
63 1083 1043 IMPLICIT
64 1043 1043 ASSIGN
65 1043 1083 ASSIGN
5 changes: 5 additions & 0 deletions e2e_test/batch/catalog/pg_namespace.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
query IT rowsort
SELECT nspname FROM pg_catalog.pg_namespace;
----
pg_catalog
public
16 changes: 16 additions & 0 deletions e2e_test/batch/catalog/pg_type.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
query IT
SELECT * FROM pg_catalog.pg_type;
----
16 bool
20 int8
21 int2
23 int4
700 float4
701 float8
1043 varchar
1082 date
1083 time
1114 timestamp
1184 timestamptz
1186 interval
1700 numeric
1 change: 1 addition & 0 deletions e2e_test/batch/local_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SET QUERY_MODE TO local;
include ./basic/*.slt.part
include ./aggregate/*.slt.part
include ./types/*.slt.part
include ./catalog/*.slt.part

statement ok
SET QUERY_MODE TO distributed;
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/database/test.slt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
query T
query T rowsort
show schemas;
----
public
pg_catalog

statement ok
create table ddl_t (v1 int not null);
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ show databases;
----
dev

query T
query T rowsort
show schemas;
----
public
pg_catalog

query T
show tables;
Expand Down
6 changes: 6 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ message RowSeqScanNode {
ScanRange scan_range = 3;
}

message SysRowSeqScanNode {
string table_name = 1;
repeated plan_common.ColumnDesc column_descs = 2;
}

// The range to scan, which specifies a consecutive range of the PK
// and can represent: (Suppose there are N columns in the PK)
// - full table scan: `eq_conds` is empty, and `lower_bound` & `upper_bound` are `None`
Expand Down Expand Up @@ -204,6 +209,7 @@ message PlanNode {
SortMergeJoinNode sort_merge_join = 22;
HopWindowNode hop_window = 25;
TableFunctionNode table_function = 26;
SysRowSeqScanNode sys_row_seq_scan = 27;
}
string identity = 24;
}
Expand Down
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod order_by;
mod project;
mod row_seq_scan;
mod sort_agg;
mod sys_row_seq_scan;
mod table_function;
#[cfg(test)]
pub mod test_utils;
Expand Down Expand Up @@ -62,6 +63,7 @@ pub use trace::*;
pub use update::*;
pub use values::*;

use crate::executor::sys_row_seq_scan::SysRowSeqScanExecutorBuilder;
use crate::task::{BatchTaskContext, TaskId};

pub type BoxedExecutor = Box<dyn Executor>;
Expand Down Expand Up @@ -185,6 +187,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::MergeSortExchange => MergeSortExchangeExecutorBuilder,
NodeBody::TableFunction => TableFunctionExecutorBuilder,
NodeBody::HopWindow => HopWindowExecutor,
NodeBody::SysRowSeqScan => SysRowSeqScanExecutorBuilder,
}
.await?;
let input_desc = real_executor.identity().to_string();
Expand Down
125 changes: 125 additions & 0 deletions src/batch/src/executor/sys_row_seq_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, SysCatalogReaderRef};
use risingwave_common::error::{Result, RwError};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct SysRowSeqScanExecutor {
table_name: String,
schema: Schema,
column_ids: Vec<ColumnId>,
identity: String,

sys_catalog_reader: SysCatalogReaderRef,
}

impl SysRowSeqScanExecutor {
pub fn new(
table_name: String,
schema: Schema,
column_id: Vec<ColumnId>,
identity: String,
sys_catalog_reader: SysCatalogReaderRef,
) -> Self {
Self {
table_name,
schema,
column_ids: column_id,
identity,
sys_catalog_reader,
}
}
}

pub struct SysRowSeqScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(
inputs.is_empty(),
"Row sequential scan should not have input executor!"
);
let seq_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::SysRowSeqScan
)?;
let sys_catalog_reader = source.context.sys_catalog_reader_ref();

let table_name = seq_scan_node.table_name.clone();
let column_descs = seq_scan_node
.column_descs
.iter()
.map(|column_desc| ColumnDesc::from(column_desc.clone()))
.collect_vec();

let column_ids = column_descs.iter().map(|d| d.column_id).collect_vec();
let schema = Schema::new(column_descs.iter().map(Into::into).collect_vec());
Ok(Box::new(SysRowSeqScanExecutor::new(
table_name,
schema,
column_ids,
source.plan_node().get_identity().clone(),
sys_catalog_reader.unwrap(),
)))
}
}

impl Executor for SysRowSeqScanExecutor {
fn schema(&self) -> &Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_executor()
}
}

impl SysRowSeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_executor(self: Box<Self>) {
let rows = self.sys_catalog_reader.read_table(&self.table_name).await?;
let filtered_rows = rows
.iter()
.map(|row| {
let datums = self
.column_ids
.iter()
.map(|column_id| row.0.get(column_id.get_id() as usize).cloned().unwrap())
.collect_vec();
Row::new(datums)
})
.collect_vec();

let chunk = DataChunk::from_rows(&filtered_rows, &self.schema.data_types())
.map_err(RwError::from)?;
yield chunk
}
}
8 changes: 8 additions & 0 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use risingwave_common::catalog::SysCatalogReaderRef;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
Expand All @@ -32,6 +33,9 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {
/// Returns error if the task of `task_output_id` doesn't run in same worker as current task.
fn get_task_output(&self, task_output_id: TaskOutputId) -> Result<TaskOutput>;

/// Get system catalog reader, used to read system table.
fn sys_catalog_reader_ref(&self) -> Option<SysCatalogReaderRef>;

/// Whether `peer_addr` is in same as current task.
fn is_local_addr(&self, peer_addr: &HostAddr) -> bool;

Expand Down Expand Up @@ -67,6 +71,10 @@ impl BatchTaskContext for ComputeNodeContext {
.take_output(&task_output_id.to_prost())
}

fn sys_catalog_reader_ref(&self) -> Option<SysCatalogReaderRef> {
None
}

fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
is_local_address(self.env.server_address(), peer_addr)
}
Expand Down
18 changes: 18 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,36 @@ mod column;
mod physical_table;
mod schema;
pub mod test_utils;

use core::fmt;
use std::sync::Arc;

use async_trait::async_trait;
pub use column::*;
pub use physical_table::*;
pub use schema::{test_utils as schema_test_utils, Field, Schema};

use crate::array::Row;
use crate::error::Result;

pub const DEFAULT_DATABASE_NAME: &str = "dev";
pub const DEFAULT_SCHEMA_NAME: &str = "public";
pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
pub const DEFAULT_SUPPER_USER: &str = "root";
// This is for compatibility with customized utils for PostgreSQL.
pub const DEFAULT_SUPPER_USER_FOR_PG: &str = "postgres";

pub const RESERVED_PG_CATALOG_TABLE_ID: i32 = 1000;

/// The local system catalog reader in the frontend node.
#[async_trait]
pub trait SysCatalogReader: Sync + Send + 'static {
async fn read_table(&self, table_name: &str) -> Result<Vec<Row>>;
}

pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;

pub type CatalogVersion = u64;

pub enum CatalogId {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use expr::bind_data_type;
pub use insert::BoundInsert;
pub use query::BoundQuery;
pub use relation::{
BoundBaseTable, BoundJoin, BoundSource, BoundTableFunction, BoundTableSource,
BoundBaseTable, BoundJoin, BoundSource, BoundSystemTable, BoundTableFunction, BoundTableSource,
BoundWindowTableFunction, FunctionType, Relation, WindowTableFunctionKind,
};
pub use select::BoundSelect;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod window_table_function;
pub use join::BoundJoin;
pub use subquery::BoundSubquery;
pub use table_function::BoundTableFunction;
pub use table_or_source::{BoundBaseTable, BoundSource, BoundTableSource};
pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable, BoundTableSource};
pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};

/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
Expand All @@ -40,6 +40,7 @@ pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKin
pub enum Relation {
Source(Box<BoundSource>),
BaseTable(Box<BoundBaseTable>),
SystemTable(Box<BoundSystemTable>),
Subquery(Box<BoundSubquery>),
Join(Box<BoundJoin>),
WindowTableFunction(Box<BoundWindowTableFunction>),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/relation/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Binder {
// generate_series ( start i32, stop i32, step i32 )
if args.len() != 3 {
return Err(ErrorCode::BindError(
"the length of args of generate series funciton should be 3".to_string(),
"the length of args of generate series function should be 3".to_string(),
)
.into());
}
Expand Down
Loading