Skip to content

Commit

Permalink
Add insert/update/delete to LogicalPlan and add SQL planner support (#…
Browse files Browse the repository at this point in the history
…4902)

* Squash

* PR feedback

* PR feedback

* DmlStatement

* Cleanup

* PR feedback

* Fix insert
  • Loading branch information
Brent Gardner authored Jan 17, 2023
1 parent 279440b commit aa8f139
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 19 deletions.
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,12 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateView".to_string(),
))
}
LogicalPlan::Dml(_) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
Err(DataFusionError::Internal(
"Unsupported logical plan: Write".to_string(),
))
}
LogicalPlan::SetVariable(_) => {
Err(DataFusionError::Internal(
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ pub use logical_plan::{
build_join_schema, union, wrap_projection_for_join_if_necessary, UNNAMED_TABLE,
},
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, PlanVisitor, Projection,
Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DmlStatement, DropTable,
DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType,
Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, PlanVisitor,
Projection, Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias,
TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window, WriteOp,
};
pub use nullif::SUPPORTED_NULLIF_TYPES;
pub use operator::Operator;
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ mod plan;
pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare, Projection, Repartition,
SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Values, Window,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DmlStatement, DropTable,
DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType,
Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare, Projection,
Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Values, Window, WriteOp,
};

pub use display::display_schema;
Expand Down
42 changes: 42 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub enum LogicalPlan {
SetVariable(SetVariable),
/// Prepare a statement
Prepare(Prepare),
/// Insert / Update / Delete
Dml(DmlStatement),
}

impl LogicalPlan {
Expand Down Expand Up @@ -158,6 +160,7 @@ impl LogicalPlan {
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
LogicalPlan::DropView(DropView { schema, .. }) => schema,
LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema,
LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema,
}
}

Expand Down Expand Up @@ -218,6 +221,7 @@ impl LogicalPlan {
LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::SetVariable(_) => vec![],
LogicalPlan::Dml(DmlStatement { table_schema, .. }) => vec![table_schema],
}
}

Expand Down Expand Up @@ -316,6 +320,7 @@ impl LogicalPlan {
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Prepare(_) => Ok(()),
}
}
Expand All @@ -342,6 +347,7 @@ impl LogicalPlan {
LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::Dml(write) => vec![&write.input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Prepare(Prepare { input, .. }) => {
Expand Down Expand Up @@ -544,6 +550,7 @@ impl LogicalPlan {
}
LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?,
LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
LogicalPlan::Dml(write) => write.input.accept(visitor)?,
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation(_)
Expand Down Expand Up @@ -929,6 +936,9 @@ impl LogicalPlan {
}
Ok(())
}
LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
write!(f, "Write: op=[{op}] table=[{table_name}]")
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
Expand Down Expand Up @@ -1504,6 +1514,38 @@ pub struct CreateExternalTable {
pub options: HashMap<String, String>,
}

#[derive(Clone)]
pub enum WriteOp {
Insert,
Delete,
Update,
Ctas,
}

impl Display for WriteOp {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
WriteOp::Insert => write!(f, "Insert"),
WriteOp::Delete => write!(f, "Delete"),
WriteOp::Update => write!(f, "Update"),
WriteOp::Ctas => write!(f, "Ctas"),
}
}
}

/// The operator that modifies the content of a database (adapted from substrait WriteRel)
#[derive(Clone)]
pub struct DmlStatement {
/// The table name
pub table_name: OwnedTableReference,
/// The schema of the table (must align with Rel input)
pub table_schema: DFSchemaRef,
/// The type of operation to perform
pub op: WriteOp,
/// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
pub input: Arc<LogicalPlan>,
}

/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
/// `Expr::Placeholder` expressions that are filled in during execution
#[derive(Clone)]
Expand Down
15 changes: 13 additions & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::logical_plan::{
SubqueryAlias, Union, Values, Window,
};
use crate::{
BinaryExpr, Cast, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator,
TableScan, TryCast,
BinaryExpr, Cast, DmlStatement, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder,
Operator, TableScan, TryCast,
};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::{
Expand Down Expand Up @@ -489,6 +489,17 @@ pub fn from_plan(
schema.clone(),
)?))
}
LogicalPlan::Dml(DmlStatement {
table_name,
table_schema,
op,
..
}) => Ok(LogicalPlan::Dml(DmlStatement {
table_name: table_name.clone(),
table_schema: table_schema.clone(),
op: op.clone(),
input: Arc::new(inputs[0].clone()),
})),
LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
values: expr
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::SetVariable(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, config)?
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ fn optimize_plan(
| LogicalPlan::DropView(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Extension { .. }
| LogicalPlan::Prepare(_) => {
let expr = plan.expressions();
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::SetVariable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropView",
)),
LogicalPlan::Dml(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Write",
)),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

fn plan_from_tables(
pub(crate) fn plan_from_tables(
&self,
mut from: Vec<TableWithJoins>,
planner_context: &mut PlannerContext,
Expand Down
Loading

0 comments on commit aa8f139

Please sign in to comment.