Skip to content

Commit

Permalink
Merge pull request #4999 from sundy-li/orderkey
Browse files Browse the repository at this point in the history
feat(query): support cluster key in fuse engine
  • Loading branch information
BohuTANG authored Apr 22, 2022
2 parents 1c3fcf2 + a216e81 commit a162385
Show file tree
Hide file tree
Showing 28 changed files with 391 additions and 207 deletions.
1 change: 1 addition & 0 deletions common/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_exception::Result;

use crate::DataBlock;

#[derive(Clone)]
pub struct SortColumnDescription {
pub column_name: String,
pub asc: bool,
Expand Down
1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ build_exceptions! {
// Database error codes.
UnknownDatabaseEngine(2701),
UnknownTableEngine(2702),
UnsupportedEngineParams(2703),

// Share error codes.
ShareAlreadyExists(2705),
Expand Down
3 changes: 3 additions & 0 deletions common/meta/types/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct TableMeta {
pub engine: String,
pub engine_options: BTreeMap<String, String>,
pub options: BTreeMap<String, String>,
// TODO(sundy): Save this as AST format
pub order_keys: Option<Vec<u8>>,
pub created_on: DateTime<Utc>,
}

Expand Down Expand Up @@ -165,6 +167,7 @@ impl Default for TableMeta {
engine_options: BTreeMap::new(),
options: BTreeMap::new(),
created_on: Utc::now(),
order_keys: None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions common/planners/src/plan_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_datavalues::DataSchemaRef;
use common_meta_types::CreateTableReq;
use common_meta_types::TableMeta;

use crate::Expression;
use crate::PlanNode;

pub type TableOptions = BTreeMap<String, String>;
Expand All @@ -32,6 +33,7 @@ pub struct CreateTablePlan {

pub table_meta: TableMeta,

pub order_keys: Vec<Expression>,
pub as_select: Option<Box<PlanNode>>,
}

Expand Down
1 change: 1 addition & 0 deletions common/planners/tests/it/plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn test_plan_display_indent() -> Result<()> {
..Default::default()
},
as_select: None,
order_keys: vec![],
});

assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
<column_name> <data_type> [ NOT NULL | NULL] [ { DEFAULT <expr> }],
<column_name> <data_type> [ NOT NULL | NULL] [ { DEFAULT <expr> }],
...
)
) [CLUSTER BY(<expr> [, <expr>, ...] )]

<data_type>:
TINYINT
Expand Down
47 changes: 31 additions & 16 deletions query/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::catalogs::Catalog;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::sessions::QueryContext;
use crate::storages::StorageDescription;

pub struct CreateTableInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -63,27 +64,41 @@ impl Interpreter for CreateTableInterpreter {
.await?;

let engine = self.plan.engine();

if self
let name_not_duplicate = self
.ctx
.get_catalog()
.list_tables(&*self.plan.tenant, &*self.plan.db)
.await?
.iter()
.all(|table| table.name() != self.plan.table.as_str())
&& self
.ctx
.get_catalog()
.get_table_engines()
.iter()
.all(|desc| {
desc.engine_name.to_string().to_lowercase() != engine.to_string().to_lowercase()
})
{
return Err(ErrorCode::UnknownTableEngine(format!(
"Unknown table engine {}",
engine
)));
.all(|table| table.name() != self.plan.table.as_str());

let engine_desc: Option<StorageDescription> = self
.ctx
.get_catalog()
.get_table_engines()
.iter()
.find(|desc| {
desc.engine_name.to_string().to_lowercase() == engine.to_string().to_lowercase()
})
.cloned();

match engine_desc {
Some(engine) => {
if !self.plan.order_keys.is_empty() && !engine.support_order_key {
return Err(ErrorCode::UnsupportedEngineParams(format!(
"Unsupported cluster key for engine: {}",
engine.engine_name
)));
}
}
None => {
if name_not_duplicate {
return Err(ErrorCode::UnknownTableEngine(format!(
"Unknown table engine {}",
engine
)));
}
}
}

match &self.plan.as_select {
Expand Down
25 changes: 19 additions & 6 deletions query/src/interpreters/interpreter_table_show_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Interpreter for ShowCreateTableInterpreter {
let engine = table.engine();
let schema = table.schema();

let mut table_info = format!("CREATE TABLE `{}` (\n", name);
let mut table_create_sql = format!("CREATE TABLE `{}` (\n", name);

// Append columns.
{
Expand All @@ -88,13 +88,26 @@ impl Interpreter for ShowCreateTableInterpreter {
// y
// )
let columns_str = format!("{}\n", columns.join(",\n"));
table_info.push_str(&columns_str);
table_create_sql.push_str(&columns_str);
}

let table_engine = format!(") ENGINE={}", engine);
table_info.push_str(table_engine.as_str());
table_info.push_str({
let mut opts = table.options().iter().collect::<Vec<_>>();
table_create_sql.push_str(table_engine.as_str());

let table_info = table.get_table_info();
if let Some(order) = &table_info.meta.order_keys {
let expressions: Vec<Expression> = serde_json::from_slice(order.as_slice())?;
let order_keys_str = expressions
.iter()
.map(|expr| expr.column_name())
.collect::<Vec<_>>()
.join(", ");

table_create_sql.push_str(format!(" CLUSTER BY ({})", order_keys_str).as_str());
}

table_create_sql.push_str({
let mut opts = table_info.options().iter().collect::<Vec<_>>();
opts.sort_by_key(|(k, _)| *k);
opts.iter()
.filter(|(k, _)| !is_internal_opt_key(k))
Expand All @@ -112,7 +125,7 @@ impl Interpreter for ShowCreateTableInterpreter {

let block = DataBlock::create(show_schema.clone(), vec![
Series::from_data(vec![name.as_bytes()]),
Series::from_data(vec![table_info.into_bytes()]),
Series::from_data(vec![table_create_sql.into_bytes()]),
]);
tracing::debug!("Show create table executor result: {:?}", block);

Expand Down
10 changes: 10 additions & 0 deletions query/src/sql/parsers/parser_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use sqlparser::ast::ColumnDef;
use sqlparser::ast::ColumnOptionDef;
use sqlparser::ast::TableConstraint;
use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Token;
use sqlparser::tokenizer::Word;
Expand Down Expand Up @@ -60,6 +61,14 @@ impl<'a> DfParser<'a> {

let engine = self.parse_table_engine()?;

// parse cluster key
let mut order_keys = vec![];
if self.parser.parse_keywords(&[Keyword::CLUSTER, Keyword::BY]) {
self.parser.expect_token(&Token::LParen)?;
order_keys = self.parser.parse_comma_separated(Parser::parse_expr)?;
self.parser.expect_token(&Token::RParen)?;
}

// parse table options: https://dev.mysql.com/doc/refman/8.0/en/create-table.html
let options = self.parse_options()?;

Expand All @@ -81,6 +90,7 @@ impl<'a> DfParser<'a> {
name: table_name,
columns,
engine,
order_keys,
options,
like: table_like,
query,
Expand Down
17 changes: 17 additions & 0 deletions query/src/sql/statements/statement_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_planners::PlanNode;
use common_tracing::tracing;
use sqlparser::ast::ColumnDef;
use sqlparser::ast::ColumnOption;
use sqlparser::ast::Expr;
use sqlparser::ast::ObjectName;

use super::analyzer_expr::ExpressionAnalyzer;
Expand All @@ -50,6 +51,7 @@ pub struct DfCreateTable {
pub name: ObjectName,
pub columns: Vec<ColumnDef>,
pub engine: String,
pub order_keys: Vec<Expr>,
pub options: BTreeMap<String, String>,

// The table name after "create .. like" statement.
Expand All @@ -67,6 +69,8 @@ impl AnalyzableStatement for DfCreateTable {
let mut table_meta = self.table_meta(ctx.clone(), db.as_str()).await?;
let if_not_exists = self.if_not_exists;
let tenant = ctx.get_tenant();

let expression_analyzer = ExpressionAnalyzer::create(ctx.clone());
let as_select_plan_node = match &self.query {
// CTAS
Some(query_statement) => {
Expand All @@ -91,13 +95,26 @@ impl AnalyzableStatement for DfCreateTable {
None => None,
};

let mut order_keys = vec![];
for k in self.order_keys.iter() {
let expr = expression_analyzer.analyze(k).await?;
validate_expression(&expr, &table_meta.schema)?;
order_keys.push(expr);
}

if !order_keys.is_empty() {
let order_keys_v = serde_json::to_vec(&order_keys)?;
table_meta.order_keys = Some(order_keys_v);
}

Ok(AnalyzedResult::SimpleQuery(Box::new(
PlanNode::CreateTable(CreateTablePlan {
if_not_exists,
tenant,
db,
table,
table_meta,
order_keys,
as_select: as_select_plan_node,
}),
)))
Expand Down
10 changes: 10 additions & 0 deletions query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::TableInfo;
use common_planners::Expression;
use common_planners::Extras;
use common_planners::Partitions;
use common_planners::ReadDataSourcePlan;
Expand Down Expand Up @@ -49,13 +50,21 @@ use crate::storages::TableStatistics;
pub struct FuseTable {
pub(crate) table_info: TableInfo,
pub(crate) meta_location_generator: TableMetaLocationGenerator,

pub(crate) order_keys: Vec<Expression>,
}

impl FuseTable {
pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result<Box<dyn Table>> {
let storage_prefix = Self::parse_storage_prefix(&table_info)?;
let mut order_keys = Vec::new();
if let Some(order) = &table_info.meta.order_keys {
order_keys = serde_json::from_slice(order.as_slice())?;
}

Ok(Box::new(FuseTable {
table_info,
order_keys,
meta_location_generator: TableMetaLocationGenerator::with_prefix(storage_prefix),
}))
}
Expand All @@ -78,6 +87,7 @@ impl FuseTable {
StorageDescription {
engine_name: "FUSE".to_string(),
comment: "FUSE Storage Engine".to_string(),
support_order_key: true,
}
}
}
Expand Down
73 changes: 73 additions & 0 deletions query/src/storages/fuse/operations/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ use std::sync::Arc;

use async_stream::stream;
use common_cache::Cache;
use common_datablocks::SortColumnDescription;
use common_datavalues::DataSchemaRefExt;
use common_exception::Result;
use common_infallible::RwLock;
use common_planners::Expression;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;

use crate::pipelines::new::processors::port::InputPort;
use crate::pipelines::new::processors::BlockCompactor;
use crate::pipelines::new::processors::ExpressionTransform;
use crate::pipelines::new::processors::ProjectionTransform;
use crate::pipelines::new::processors::TransformCompact;
use crate::pipelines::new::processors::TransformSortPartial;
use crate::pipelines::new::NewPipeline;
use crate::pipelines::new::SinkPipeBuilder;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -111,6 +117,73 @@ impl FuseTable {
)
})?;

if !self.order_keys.is_empty() {
let input_schema = self.table_info.schema();
let mut merged = input_schema.fields().clone();

for expr in &self.order_keys {
let cname = expr.column_name();
if !merged.iter().any(|x| x.name() == &cname) {
merged.push(expr.to_data_field(&input_schema)?);
}
}

let output_schema = DataSchemaRefExt::create(merged);

if output_schema != input_schema {
pipeline.add_transform(|transform_input_port, transform_output_port| {
ExpressionTransform::try_create(
transform_input_port,
transform_output_port,
input_schema.clone(),
output_schema.clone(),
self.order_keys.clone(),
ctx.clone(),
)
})?;
}

// sort
let sort_descs: Vec<SortColumnDescription> = self
.order_keys
.iter()
.map(|expr| SortColumnDescription {
column_name: expr.column_name(),
asc: true,
nulls_first: false,
})
.collect();

pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformSortPartial::try_create(
transform_input_port,
transform_output_port,
None,
sort_descs.clone(),
)
})?;

// Remove unused columns before sink
if output_schema != input_schema {
let project_exprs: Vec<Expression> = input_schema
.fields()
.iter()
.map(|f| Expression::Column(f.name().to_owned()))
.collect();

pipeline.add_transform(|transform_input_port, transform_output_port| {
ProjectionTransform::try_create(
transform_input_port,
transform_output_port,
output_schema.clone(),
input_schema.clone(),
project_exprs.clone(),
ctx.clone(),
)
})?;
}
}

let mut sink_pipeline_builder = SinkPipeBuilder::create();
let acc = Arc::new(RwLock::new(None));
for _ in 0..pipeline.output_len() {
Expand Down
Loading

1 comment on commit a162385

@vercel
Copy link

@vercel vercel bot commented on a162385 Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.rs
databend-git-main-databend.vercel.app
databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.