-
Notifications
You must be signed in to change notification settings - Fork 573
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement pg_catalog upon local execution mode (#3387)
- Loading branch information
1 parent
86e7742
commit 3ab970c
Showing
48 changed files
with
1,045 additions
and
132 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
query IIIT | ||
SELECT * FROM pg_catalog.pg_cast | ||
---- | ||
0 16 23 EXPLICIT | ||
1 16 1043 ASSIGN | ||
2 21 23 IMPLICIT | ||
3 21 20 IMPLICIT | ||
4 21 1700 IMPLICIT | ||
5 21 700 IMPLICIT | ||
6 21 701 IMPLICIT | ||
7 21 1043 ASSIGN | ||
8 23 16 EXPLICIT | ||
9 23 21 ASSIGN | ||
10 23 20 IMPLICIT | ||
11 23 1700 IMPLICIT | ||
12 23 700 IMPLICIT | ||
13 23 701 IMPLICIT | ||
14 23 1043 ASSIGN | ||
15 20 21 ASSIGN | ||
16 20 23 ASSIGN | ||
17 20 1700 IMPLICIT | ||
18 20 700 IMPLICIT | ||
19 20 701 IMPLICIT | ||
20 20 1043 ASSIGN | ||
21 1700 21 ASSIGN | ||
22 1700 23 ASSIGN | ||
23 1700 20 ASSIGN | ||
24 1700 700 IMPLICIT | ||
25 1700 701 IMPLICIT | ||
26 1700 1043 ASSIGN | ||
27 700 21 ASSIGN | ||
28 700 23 ASSIGN | ||
29 700 20 ASSIGN | ||
30 700 1700 ASSIGN | ||
31 700 701 IMPLICIT | ||
32 700 1043 ASSIGN | ||
33 701 21 ASSIGN | ||
34 701 23 ASSIGN | ||
35 701 20 ASSIGN | ||
36 701 1700 ASSIGN | ||
37 701 700 ASSIGN | ||
38 701 1043 ASSIGN | ||
39 1043 16 ASSIGN | ||
40 1043 21 ASSIGN | ||
41 1043 23 ASSIGN | ||
42 1043 20 ASSIGN | ||
43 1043 1700 ASSIGN | ||
44 1043 700 ASSIGN | ||
45 1043 701 ASSIGN | ||
46 1043 1082 ASSIGN | ||
47 1043 1114 ASSIGN | ||
48 1043 1184 ASSIGN | ||
49 1043 1083 ASSIGN | ||
50 1043 1043 ASSIGN | ||
51 1082 1043 ASSIGN | ||
52 1082 1114 IMPLICIT | ||
53 1082 1184 IMPLICIT | ||
54 1114 1043 ASSIGN | ||
55 1114 1082 ASSIGN | ||
56 1114 1184 IMPLICIT | ||
57 1114 1083 ASSIGN | ||
58 1184 1043 ASSIGN | ||
59 1184 1082 ASSIGN | ||
60 1184 1114 ASSIGN | ||
61 1184 1083 ASSIGN | ||
62 1083 1043 ASSIGN | ||
63 1083 1043 IMPLICIT | ||
64 1043 1043 ASSIGN | ||
65 1043 1083 ASSIGN |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
query IT rowsort | ||
SELECT nspname FROM pg_catalog.pg_namespace; | ||
---- | ||
pg_catalog | ||
public |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
query IT | ||
SELECT * FROM pg_catalog.pg_type; | ||
---- | ||
16 bool | ||
20 int8 | ||
21 int2 | ||
23 int4 | ||
700 float4 | ||
701 float8 | ||
1043 varchar | ||
1082 date | ||
1083 time | ||
1114 timestamp | ||
1184 timestamptz | ||
1186 interval | ||
1700 numeric |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
query T | ||
query T rowsort | ||
show schemas; | ||
---- | ||
public | ||
pg_catalog | ||
|
||
statement ok | ||
create table ddl_t (v1 int not null); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
// Copyright 2022 Singularity Data | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use futures_async_stream::try_stream; | ||
use itertools::Itertools; | ||
use risingwave_common::array::{DataChunk, Row}; | ||
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, SysCatalogReaderRef}; | ||
use risingwave_common::error::{Result, RwError}; | ||
use risingwave_pb::batch_plan::plan_node::NodeBody; | ||
|
||
use crate::executor::{ | ||
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, | ||
}; | ||
use crate::task::BatchTaskContext; | ||
|
||
pub struct SysRowSeqScanExecutor { | ||
table_name: String, | ||
schema: Schema, | ||
column_ids: Vec<ColumnId>, | ||
identity: String, | ||
|
||
sys_catalog_reader: SysCatalogReaderRef, | ||
} | ||
|
||
impl SysRowSeqScanExecutor { | ||
pub fn new( | ||
table_name: String, | ||
schema: Schema, | ||
column_id: Vec<ColumnId>, | ||
identity: String, | ||
sys_catalog_reader: SysCatalogReaderRef, | ||
) -> Self { | ||
Self { | ||
table_name, | ||
schema, | ||
column_ids: column_id, | ||
identity, | ||
sys_catalog_reader, | ||
} | ||
} | ||
} | ||
|
||
pub struct SysRowSeqScanExecutorBuilder {} | ||
|
||
#[async_trait::async_trait] | ||
impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder { | ||
async fn new_boxed_executor<C: BatchTaskContext>( | ||
source: &ExecutorBuilder<C>, | ||
inputs: Vec<BoxedExecutor>, | ||
) -> Result<BoxedExecutor> { | ||
ensure!( | ||
inputs.is_empty(), | ||
"Row sequential scan should not have input executor!" | ||
); | ||
let seq_scan_node = try_match_expand!( | ||
source.plan_node().get_node_body().unwrap(), | ||
NodeBody::SysRowSeqScan | ||
)?; | ||
let sys_catalog_reader = source.context.sys_catalog_reader_ref(); | ||
|
||
let table_name = seq_scan_node.table_name.clone(); | ||
let column_descs = seq_scan_node | ||
.column_descs | ||
.iter() | ||
.map(|column_desc| ColumnDesc::from(column_desc.clone())) | ||
.collect_vec(); | ||
|
||
let column_ids = column_descs.iter().map(|d| d.column_id).collect_vec(); | ||
let schema = Schema::new(column_descs.iter().map(Into::into).collect_vec()); | ||
Ok(Box::new(SysRowSeqScanExecutor::new( | ||
table_name, | ||
schema, | ||
column_ids, | ||
source.plan_node().get_identity().clone(), | ||
sys_catalog_reader.unwrap(), | ||
))) | ||
} | ||
} | ||
|
||
impl Executor for SysRowSeqScanExecutor { | ||
fn schema(&self) -> &Schema { | ||
&self.schema | ||
} | ||
|
||
fn identity(&self) -> &str { | ||
&self.identity | ||
} | ||
|
||
fn execute(self: Box<Self>) -> BoxedDataChunkStream { | ||
self.do_executor() | ||
} | ||
} | ||
|
||
impl SysRowSeqScanExecutor { | ||
#[try_stream(boxed, ok = DataChunk, error = RwError)] | ||
async fn do_executor(self: Box<Self>) { | ||
let rows = self.sys_catalog_reader.read_table(&self.table_name).await?; | ||
let filtered_rows = rows | ||
.iter() | ||
.map(|row| { | ||
let datums = self | ||
.column_ids | ||
.iter() | ||
.map(|column_id| row.0.get(column_id.get_id() as usize).cloned().unwrap()) | ||
.collect_vec(); | ||
Row::new(datums) | ||
}) | ||
.collect_vec(); | ||
|
||
let chunk = DataChunk::from_rows(&filtered_rows, &self.schema.data_types()) | ||
.map_err(RwError::from)?; | ||
yield chunk | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.