Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stage): support stage list #4472

Merged
merged 15 commits into from
Mar 17, 2022
Merged
3 changes: 2 additions & 1 deletion .github/actions/test_stateful_standalone/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ runs:
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_EC2_METADATA_DISABLED=true

aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data s3://testbucket/admin/data --recursive
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data/ontime_200.csv s3://testbucket/admin/data/ontime_200_v1.csv
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data/ontime_200.parquet s3://testbucket/admin/data/ontime_200_v1.parquet

- name: Run Stateless Tests with Standalone mode (ubuntu-latest only)
- name: Run Stateful Tests with Standalone mode (ubuntu-latest only)
shell: bash
run: |
bash ./scripts/ci/ci-run-stateful-tests-standalone-s3.sh
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/ast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ common-functions = { path = "../functions" }
# TODO (andylokandy): Use the version from crates.io once
# https://github.com/brendanzab/codespan/pull/331 is released.
codespan-reporting = { git = "https://github.com/brendanzab/codespan", rev = "c84116f5" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "64795ad" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "0b1b0d7" }

# Crates.io dependencies
async-trait = "0.1.52"
Expand Down
2 changes: 1 addition & 1 deletion common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ tonic = "0.6.2"

# Github dependencies
bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "64795ad" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "0b1b0d7" }
2 changes: 0 additions & 2 deletions common/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ common-datavalues = { path = "../datavalues" }
common-exception = { path = "../exception" }
common-io = { path = "../io" }

# Github dependencies

# Crates.io dependencies
base64 = "0.13.0"
blake3 = "1.3.1"
Expand Down
9 changes: 6 additions & 3 deletions common/io/src/files/file_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl S3File {
s3_bucket: &str,
aws_key_id: &str,
aws_secret_key: &str,
root: &str,
) -> Result<Operator> {
let mut builder = opendal::services::s3::Backend::build();

Expand All @@ -36,6 +37,7 @@ impl S3File {

// Bucket.
builder.bucket(s3_bucket);
builder.root(root);

// Credentials.
if !aws_key_id.is_empty() {
Expand All @@ -50,10 +52,9 @@ impl S3File {
Ok(opendal::Operator::new(accessor))
}

// Get the files in the path.
// Get the files in the path, if the path is not exist, return an empty list.
pub async fn list(operator: &Operator, path: &str) -> Result<Vec<String>> {
let mut list: Vec<String> = vec![];
// Check the path object mode is DIR or FILE.
let mode = operator.object(path).metadata().await?.mode();
match mode {
ObjectMode::FILE => {
Expand All @@ -64,7 +65,9 @@ impl S3File {
while let Some(object) = objects.next().await {
let mut object = object?;
let meta = object.metadata_cached().await?;
list.push(meta.path().to_string());
if meta.mode() == ObjectMode::FILE {
list.push(meta.path().to_string());
}
}
}
other => {
Expand Down
11 changes: 11 additions & 0 deletions common/io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,14 @@ pub fn get_abs_path(root: &str, path: &str) -> String {

PathBuf::from(root).join(path).to_string_lossy().to_string()
}

// todo(xuanwo): opendal support meta name (https://github.com/datafuselabs/opendal/issues/150)
#[inline]
pub fn get_file_name(path: &str) -> String {
let path = path
.split('/')
.filter(|v| !v.is_empty())
.collect::<Vec<&str>>();

path[path.len() - 1].to_string()
}
7 changes: 7 additions & 0 deletions common/io/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ fn convert_test() {
assert_eq!(convert_number_size(1022_f64), "1.02 thousand");
assert_eq!(convert_number_size(10222_f64), "10.22 thousand");
}

#[test]
fn path_test() {
assert_eq!(get_abs_path("ab/c", "d"), "ab/c/d".to_string());
assert_eq!(get_abs_path("/ab/c", "d"), "/ab/c/d".to_string());
assert_eq!(get_abs_path("/ab/c", "/d/e"), "/ab/c/d/e".to_string());
}
1 change: 0 additions & 1 deletion common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl Default for FileFormatOptions {
pub struct StageS3Storage {
// `example-bucket` in `s3://example-bucket/path/to/object`
pub bucket: String,
// `path/to/object` in `s3://example-bucket/path/to/object`
pub path: String,
pub credentials_aws_key_id: String,
pub credentials_aws_secret_key: String,
Expand Down
6 changes: 4 additions & 2 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ mod plan_insert_into;
mod plan_kill;
mod plan_limit;
mod plan_limit_by;
mod plan_list;
mod plan_node;
mod plan_node_builder;
mod plan_node_display;
mod plan_node_display_indent;
mod plan_node_extras;
mod plan_node_rewriter;
mod plan_node_s3_external_table;
mod plan_node_s3_stage_table;
mod plan_node_stage;
mod plan_node_statistics;
mod plan_node_visitor;
Expand Down Expand Up @@ -151,12 +152,13 @@ pub use plan_insert_into::InsertPlan;
pub use plan_kill::KillPlan;
pub use plan_limit::LimitPlan;
pub use plan_limit_by::LimitByPlan;
pub use plan_list::ListPlan;
pub use plan_node::PlanNode;
pub use plan_node_builder::PlanBuilder;
pub use plan_node_extras::Extras;
pub use plan_node_rewriter::PlanRewriter;
pub use plan_node_rewriter::RewriteHelper;
pub use plan_node_s3_external_table::S3ExternalTableInfo;
pub use plan_node_s3_stage_table::S3StageTableInfo;
pub use plan_node_stage::StageKind;
pub use plan_node_stage::StagePlan;
pub use plan_node_statistics::Statistics;
Expand Down
50 changes: 50 additions & 0 deletions common/planners/src/plan_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use common_datavalues::prelude::ToDataType;
use common_datavalues::prelude::Vu8;
use common_datavalues::DataField;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_types::UserStageInfo;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)]
pub struct ListPlan {
pub stage: UserStageInfo,
pub path: String,
pub pattern: String,
}

impl ListPlan {
pub fn schema(&self) -> DataSchemaRef {
// TODO add more fields
let field = DataField::new("file_name", Vu8::to_data_type());
Arc::new(DataSchema::new(vec![field]))
}
}

impl Debug for ListPlan {
// Ignore the schema.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "List {:?}", self.stage)?;
if !self.pattern.is_empty() {
write!(f, " ,pattern:{:?}", self.pattern)?;
}
Ok(())
}
}
10 changes: 10 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::InsertPlan;
use crate::KillPlan;
use crate::LimitByPlan;
use crate::LimitPlan;
use crate::ListPlan;
use crate::OptimizeTablePlan;
use crate::ProjectionPlan;
use crate::ReadDataSourcePlan;
Expand Down Expand Up @@ -104,6 +105,9 @@ pub enum PlanNode {
// Call.
Call(CallPlan),

// List
List(ListPlan),

// Show.
Show(ShowPlan),

Expand Down Expand Up @@ -237,6 +241,9 @@ impl PlanNode {
PlanNode::DropUserStage(v) => v.schema(),
PlanNode::DescribeUserStage(v) => v.schema(),

// List
PlanNode::List(v) => v.schema(),

// UDF.
PlanNode::CreateUserUDF(v) => v.schema(),
PlanNode::DropUserUDF(v) => v.schema(),
Expand Down Expand Up @@ -330,6 +337,9 @@ impl PlanNode {
PlanNode::DropUserStage(_) => "DropUserStagePlan",
PlanNode::DescribeUserStage(_) => "DescribeUserStagePlan",

// List
PlanNode::List(_) => "ListPlan",

// UDF.
PlanNode::CreateUserUDF(_) => "CreateUserUDFPlan",
PlanNode::DropUserUDF(_) => "DropUserUDFPlan",
Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_node_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::InsertPlan;
use crate::KillPlan;
use crate::LimitByPlan;
use crate::LimitPlan;
use crate::ListPlan;
use crate::OptimizeTablePlan;
use crate::PlanBuilder;
use crate::PlanNode;
Expand Down Expand Up @@ -171,6 +172,7 @@ pub trait PlanRewriter: Sized {
PlanNode::CreateUserStage(plan) => self.rewrite_create_user_stage(plan),
PlanNode::DropUserStage(plan) => self.rewrite_drop_user_stage(plan),
PlanNode::DescribeUserStage(plan) => self.rewrite_describe_user_stage(plan),
PlanNode::List(plan) => self.rewrite_list(plan),

// UDF.
PlanNode::CreateUserUDF(plan) => self.rewrite_create_user_udf(plan),
Expand Down Expand Up @@ -378,6 +380,10 @@ pub trait PlanRewriter: Sized {
Ok(PlanNode::DescribeUserStage(plan.clone()))
}

fn rewrite_list(&mut self, plan: &ListPlan) -> Result<PlanNode> {
Ok(PlanNode::List(plan.clone()))
}

fn rewrite_drop_table(&mut self, plan: &DropTablePlan) -> Result<PlanNode> {
Ok(PlanNode::DropTable(plan.clone()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use common_datavalues::DataSchemaRef;
use common_meta_types::UserStageInfo;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq)]
pub struct S3ExternalTableInfo {
pub struct S3StageTableInfo {
pub schema: DataSchemaRef,
pub file_name: Option<String>,
pub stage_info: UserStageInfo,
pub path: String,
}

impl S3ExternalTableInfo {
impl S3StageTableInfo {
pub fn schema(&self) -> DataSchemaRef {
self.schema.clone()
}
Expand All @@ -35,7 +36,7 @@ impl S3ExternalTableInfo {
}
}

impl Debug for S3ExternalTableInfo {
impl Debug for S3StageTableInfo {
// Ignore the schema.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.stage_info)
Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_node_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::InsertPlan;
use crate::KillPlan;
use crate::LimitByPlan;
use crate::LimitPlan;
use crate::ListPlan;
use crate::OptimizeTablePlan;
use crate::PlanNode;
use crate::ProjectionPlan;
Expand Down Expand Up @@ -183,6 +184,7 @@ pub trait PlanVisitor {
PlanNode::CreateUserStage(plan) => self.visit_create_user_stage(plan),
PlanNode::DropUserStage(plan) => self.visit_drop_user_stage(plan),
PlanNode::DescribeUserStage(plan) => self.visit_describe_user_stage(plan),
PlanNode::List(plan) => self.visit_list(plan),

// UDF.
PlanNode::CreateUserUDF(plan) => self.visit_create_user_udf(plan),
Expand Down Expand Up @@ -371,6 +373,10 @@ pub trait PlanVisitor {
Ok(())
}

fn visit_list(&mut self, _: &ListPlan) -> Result<()> {
Ok(())
}

fn visit_drop_table(&mut self, _: &DropTablePlan) -> Result<()> {
Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions common/planners/src/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@ use common_meta_types::TableInfo;
use crate::Expression;
use crate::Extras;
use crate::Partitions;
use crate::S3ExternalTableInfo;
use crate::S3StageTableInfo;
use crate::Statistics;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum SourceInfo {
// Normal table source, 'fuse/system'.
TableSource(TableInfo),

// S3 external source, 's3://'.
S3ExternalSource(S3ExternalTableInfo),
// S3 internal/external source, 's3://'.
S3StageSource(S3StageTableInfo),
}

impl SourceInfo {
pub fn schema(&self) -> Arc<DataSchema> {
match self {
SourceInfo::TableSource(table_info) => table_info.schema(),
SourceInfo::S3ExternalSource(table_info) => table_info.schema(),
SourceInfo::S3StageSource(table_info) => table_info.schema(),
}
}

pub fn desc(&self) -> String {
match self {
SourceInfo::TableSource(table_info) => table_info.desc.clone(),
SourceInfo::S3ExternalSource(table_info) => table_info.desc(),
SourceInfo::S3StageSource(table_info) => table_info.desc(),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions docs/doc/03-reference/03-sql/06-list/_category_.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
label: 'List Commands'
link:
type: generated-index
title: 'List Commands'
23 changes: 23 additions & 0 deletions docs/doc/03-reference/03-sql/06-list/list-stage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
title: List files in a stage
---


## Syntax

```
list @<stage_name> [pattern = '<regexp_pattern>']
```

## Examples

```sql
MySQL [(none)]> list @named_external_stage PATTERN = 'ontime.*parquet';
+-----------------------+
| file_name |
+-----------------------+
| ontime_200.parquet |
| ontime_200_v1.parquet |
+-----------------------+
2 rows in set (2.150 sec)
```
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff"
cargo-license = { git = "https://github.com/datafuse-extras/cargo-license", rev = "f1ce4a2" }
opensrv-clickhouse = { git = "https://github.com/datafuselabs/opensrv", rev = "9690be9", package = "opensrv-clickhouse" }
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "9690be9", package = "opensrv-mysql" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "64795ad" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "0b1b0d7" }

# Crates.io dependencies
ahash = "0.7.6"
Expand Down
Loading