Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add files list for the COPY command #4328

Merged
merged 8 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ common-arrow = { path = "../arrow" }
anyhow = "1.0.55"
backtrace = "0.3.64"
octocrab = "0.15.4"
opendal = "0.1.4"
paste = "1.0.6"
prost = "0.9.0"
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ build_exceptions! {
DalTransportError(3003),
DalPathNotFound(3004),
SerdeError(3005),
DalS3Error(3006),
DalError(3006),
DalStatError(3007),
}

Expand Down
7 changes: 7 additions & 0 deletions common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,10 @@ impl From<ErrorCode> for tonic::Status {
}
}
}

// OpenDAL error.
impl From<opendal::error::Error> for ErrorCode {
fn from(error: opendal::error::Error) -> Self {
ErrorCode::DalError(format!("{:?}", error))
}
}
2 changes: 2 additions & 0 deletions common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff"

# Crates.io dependencies
bytes = "1.1.0"
futures = "0.3.21"
opendal = "0.1.4"
serde = { version = "1.0.136", features = ["derive"] }

[dev-dependencies]
Expand Down
105 changes: 105 additions & 0 deletions common/io/src/files/file_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;

use common_exception::ErrorCode;
use common_exception::Result;
use futures::StreamExt;
use opendal::credential::Credential;
use opendal::ObjectMode;
use opendal::Operator;
use opendal::Reader;

pub struct S3File {}

impl S3File {
// Open a s3 operator.
pub async fn open(
s3_endpoint: &str,
s3_bucket: &str,
aws_key_id: &str,
aws_secret_key: &str,
) -> Result<Operator> {
let mut builder = opendal::services::s3::Backend::build();

// Endpoint url.
builder.endpoint(s3_endpoint);

// Bucket.
builder.bucket(s3_bucket);

// Credentials.
if !aws_key_id.is_empty() {
let credential = Credential::hmac(aws_key_id, aws_secret_key);
builder.credential(credential);
}

let accessor = builder
.finish()
.await
.map_err(|e| ErrorCode::DalError(format!("s3 dal build error:{:?}", e)))?;
Ok(opendal::Operator::new(accessor))
}

// Read a file, returns the reader.
// file_name is the Some(/path/to/path/xx.csv)
pub async fn read(
file_name: Option<String>,
s3_endpoint: &str,
s3_bucket: &str,
aws_key_id: &str,
aws_secret_key: &str,
) -> Result<Reader> {
let operator = Self::open(s3_endpoint, s3_bucket, aws_key_id, aws_secret_key).await?;
let path = file_name.unwrap_or_else(|| "".to_string());
Ok(operator.object(&path).reader())
}

// Get the files in the path.
pub async fn list(
s3_endpoint: &str,
s3_bucket: &str,
path: &str,
aws_key_id: &str,
aws_secret_key: &str,
) -> Result<Vec<String>> {
let mut list: Vec<String> = vec![];
let operator = Self::open(s3_endpoint, s3_bucket, aws_key_id, aws_secret_key).await?;

// Check the path object mode is DIR or FILE.
let mode = operator.object(path).metadata().await?.mode();
match mode {
ObjectMode::FILE => {
list.push(path.to_string());
}
ObjectMode::DIR => {
let mut objects = operator.objects(path);
while let Some(object) = objects.next().await {
let meta = object?.metadata().await?;
let new_path = Path::new(path).join(meta.path());
list.push(new_path.to_string_lossy().to_string());
}
}
other => {
return Err(ErrorCode::DalError(format!(
"S3 list() can not handle the object mode: {:?}",
other
)))
}
}

Ok(list)
}
}
17 changes: 17 additions & 0 deletions common/io/src/files/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod file_s3;

pub use file_s3::S3File;
1 change: 1 addition & 0 deletions common/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod prelude;
mod binary_read;
mod binary_write;
mod buf_read;
mod files;
mod marshal;
mod options_deserializer;
mod stat_buffer;
Expand Down
1 change: 1 addition & 0 deletions common/io/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use crate::binary_write::put_uvarint;
pub use crate::binary_write::BinaryWrite;
pub use crate::binary_write::BinaryWriteBuf;
pub use crate::buf_read::BufReadExt;
pub use crate::files::S3File;
pub use crate::marshal::Marshal;
pub use crate::options_deserializer::OptionsDeserializer;
pub use crate::options_deserializer::OptionsDeserializerError;
Expand Down
4 changes: 4 additions & 0 deletions common/planners/src/plan_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct CopyPlan {
pub from: ReadDataSourcePlan,
pub validation_mode: ValidationMode,
pub files: Vec<String>,
pub pattern: String,
}

impl CopyPlan {
Expand All @@ -77,6 +78,9 @@ impl Debug for CopyPlan {
if !self.files.is_empty() {
write!(f, " ,files:{:?}", self.files)?;
}
if !self.pattern.is_empty() {
write!(f, " ,pattern:{:?}", self.pattern)?;
}
write!(f, " ,validation_mode:{:?}", self.validation_mode)
}
}
78 changes: 71 additions & 7 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::S3File;
use common_meta_types::StageStorage;
use common_planners::CopyPlan;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
Expand All @@ -24,6 +28,7 @@ use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;
use futures::TryStreamExt;
use regex::Regex;

use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
Expand All @@ -42,6 +47,49 @@ impl CopyInterpreter {
Ok(Arc::new(CopyInterpreter { ctx, plan }))
}

// List the files.
// There are two cases here:
// 1. If the plan.files is not empty, we already set the files sets to the COPY command with: `files=(<file1>, <file2>)` syntax, only need to add the prefix to the file.
// 2. If the plan.files is empty, there are also two case:
// 2.1 If the path is a file like /path/to/path/file, S3File::list() will return the same file path.
// 2.2 If the path is a folder, S3File::list() will return all the files in it.
async fn list_files(&self) -> Result<Vec<String>> {
let files = match &self.plan.from.source_info {
SourceInfo::S3ExternalSource(table_info) => {
let storage = &table_info.stage_info.stage_params.storage;
match &storage {
StageStorage::S3(s3) => {
let path = &s3.path;

// Here we add the path to the file: /path/to/path/file1.
if !self.plan.files.is_empty() {
let mut files_with_path = vec![];
for file in &self.plan.files {
let new_path = Path::new(path).join(file);
files_with_path.push(new_path.to_string_lossy().to_string());
}
Ok(files_with_path)
} else {
let endpoint = &self.ctx.get_config().storage.s3.endpoint_url;
let bucket = &s3.bucket;

let key_id = &s3.credentials_aws_key_id;
let secret_key = &s3.credentials_aws_secret_key;

S3File::list(endpoint, bucket, path, key_id, secret_key).await
}
}
}
}
other => Err(ErrorCode::LogicalError(format!(
"Cannot list files for the source info: {:?}",
other
))),
};

files
}

// Rewrite the ReadDataSourcePlan.S3ExternalSource.file_name to new file name.
fn rewrite_read_plan_file_name(
mut plan: ReadDataSourcePlan,
Expand Down Expand Up @@ -116,14 +164,30 @@ impl Interpreter for CopyInterpreter {
&self,
mut _input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let files = self.plan.files.clone();
let mut files = self.list_files().await?;

// Pattern match check.
let pattern = &self.plan.pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;

let matched_files = files
.iter()
.filter(|file| regex.is_match(file))
.cloned()
.collect();
files = matched_files;
}

if files.is_empty() {
self.copy_one_file_to_table(None).await?;
} else {
for file in files {
self.copy_one_file_to_table(Some(file)).await?;
}
tracing::info!("copy file list:{:?}, pattern:{}", &files, pattern,);

for file in files {
self.copy_one_file_to_table(Some(file)).await?;
}

Ok(Box::pin(DataBlockStream::create(
Expand Down
13 changes: 10 additions & 3 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_planners::Part;
use common_planners::Partitions;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
use common_planners::S3ExternalTableInfo;
use common_planners::SourceInfo;
use common_planners::Statistics;
use common_streams::AbortStream;
Expand Down Expand Up @@ -97,7 +98,9 @@ impl QueryContext {
SourceInfo::TableSource(table_info) => {
self.build_table_by_table_info(table_info, plan.tbl_args.clone())
}
SourceInfo::S3ExternalSource(_s3_table_info) => self.build_s3_external_table(),
SourceInfo::S3ExternalSource(s3_table_info) => {
self.build_s3_external_by_table_info(s3_table_info, plan.tbl_args.clone())
}
}
}

Expand All @@ -121,8 +124,12 @@ impl QueryContext {
// Build s3 external table by stage info, this is used in:
// COPY INTO t1 FROM 's3://'
// 's3://' here is a s3 external stage, and build it to the external table.
fn build_s3_external_table(&self) -> Result<Arc<dyn Table>> {
S3ExternalTable::try_create()
fn build_s3_external_by_table_info(
&self,
table_info: &S3ExternalTableInfo,
_table_args: Option<Vec<Expression>>,
) -> Result<Arc<dyn Table>> {
S3ExternalTable::try_create(table_info.clone())
}

pub fn get_scan_progress(&self) -> Arc<Progress> {
Expand Down
8 changes: 8 additions & 0 deletions query/src/sql/parsers/parser_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ impl<'a> DfParser<'a> {
self.expect_token(")")?;
}

// PATTERN = '<regex_pattern>'
let mut pattern = "".to_string();
if self.consume_token("PATTERN") {
self.expect_token("=")?;
pattern = self.parse_value_or_ident()?;
}

// file_format = (type = csv field_delimiter = '|' skip_header = 1)
let mut file_format_options = HashMap::default();
if self.consume_token("FILE_FORMAT") {
Expand Down Expand Up @@ -107,6 +114,7 @@ impl<'a> DfParser<'a> {
encryption_options,
file_format_options,
files,
pattern,
on_error,
size_limit,
validation_mode,
Expand Down
Loading