Skip to content

Commit

Permalink
compile datafusion_ext
Browse files Browse the repository at this point in the history
  • Loading branch information
vrongmeal committed Jan 17, 2024
1 parent 934ea4d commit 42afbeb
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 2,440 deletions.
2,399 changes: 0 additions & 2,399 deletions crates/datafusion_ext/src/cast.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/datafusion_ext/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod cast;
pub mod errors;
pub mod metrics;
pub mod planner;
Expand Down
6 changes: 3 additions & 3 deletions crates/datafusion_ext/src/planner/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

pub(super) async fn sql_named_function_to_expr(
&self,
&mut self,
expr: SQLExpr,
fun: BuiltinScalarFunction,
schema: &DFSchema,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

async fn sql_fn_arg_to_logical_expr(
&self,
&mut self,
sql: FunctionArg,
schema: &DFSchema,
planner_context: &mut PlannerContext,
Expand Down Expand Up @@ -268,7 +268,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

pub(super) async fn function_args_to_expr(
&self,
&mut self,
args: Vec<FunctionArg>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
Expand Down
2 changes: 1 addition & 1 deletion crates/datafusion_ext/src/planner/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

pub(super) async fn sql_case_identifier_to_expr(
&self,
&mut self,
operand: Option<Box<SQLExpr>>,
conditions: Vec<SQLExpr>,
results: Vec<SQLExpr>,
Expand Down
4 changes: 2 additions & 2 deletions crates/datafusion_ext/src/planner/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion::sql::sqlparser::parser::ParserError::ParserError;
impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
#[async_recursion]
pub(crate) async fn sql_expr_to_logical_expr(
&self,
&mut self,
sql: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
Expand Down Expand Up @@ -714,7 +714,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

async fn sql_agg_with_filter_to_expr(
&self,
&mut self,
expr: SQLExpr,
filter: SQLExpr,
schema: &DFSchema,
Expand Down
2 changes: 1 addition & 1 deletion crates/datafusion_ext/src/planner/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
/// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`).
/// If false, interpret numeric literals as constant values.
pub(crate) async fn order_by_to_sort_expr(
&self,
&mut self,
exprs: &[OrderByExpr],
schema: &DFSchema,
planner_context: &mut PlannerContext,
Expand Down
2 changes: 1 addition & 1 deletion crates/datafusion_ext/src/planner/expr/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

pub(super) async fn sql_array_literal(
&self,
&mut self,
elements: Vec<SQLExpr>,
schema: &DFSchema,
) -> Result<Expr> {
Expand Down
2 changes: 1 addition & 1 deletion crates/datafusion_ext/src/planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
/// Generate a relational expression from a select SQL expression
#[async_recursion]
async fn sql_select_to_rex(
&self,
&mut self,
sql: SelectItem,
plan: &LogicalPlan,
empty_from: bool,
Expand Down
13 changes: 6 additions & 7 deletions crates/datafusion_ext/src/planner/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::planner::{AsyncContextProvider, SqlQueryPlanner};
use async_recursion::async_recursion;
use datafusion::common::{DataFusionError, Result};
use datafusion::common::{not_impl_err, DataFusionError, Result};
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion::sql::planner::PlannerContext;
use datafusion::sql::sqlparser::ast::{SetExpr, SetOperator, SetQuantifier};
Expand All @@ -42,14 +42,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
SetQuantifier::All => true,
SetQuantifier::Distinct | SetQuantifier::None => false,
SetQuantifier::ByName => {
return Err(DataFusionError::NotImplemented(
"UNION BY NAME not implemented".to_string(),
));
return not_impl_err!("UNION BY NAME not implemented");
}
SetQuantifier::AllByName => {
return Err(DataFusionError::NotImplemented(
"UNION ALL BY NAME not implemented".to_string(),
))
return not_impl_err!("UNION ALL BY NAME not implemented");
}
SetQuantifier::DistinctByName => {
return not_impl_err!("UNION DISTINCT BY NAME not implemented");
}
};

Expand Down
4 changes: 2 additions & 2 deletions crates/datafusion_ext/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub trait TreeNodeExt: TreeNode {
where
F: FnMut(Self) -> Result<Transformed<Self>>,
{
let after_op_children = self.map_children(|node| node.transform_up_mut(op))?;
let after_op_children = self.map_children(|node| TreeNode::transform_up_mut(node, op))?;

let new_node = op(after_op_children)?.into();
Ok(new_node)
Expand All @@ -23,7 +23,7 @@ pub trait TreeNodeExt: TreeNode {
F: FnMut(Self) -> Result<Transformed<Self>>,
{
let after_op = op(self)?.into();
after_op.map_children(|node| node.transform_down_mut(op))
after_op.map_children(|node| TreeNode::transform_down_mut(node, op))
}
}

Expand Down
37 changes: 20 additions & 17 deletions crates/datafusion_ext/src/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod inner;
mod utils;
mod value;
use constants::*;
use datafusion::arrow::array::{ListBuilder, StringBuilder};
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::config::{ConfigExtension, ExtensionOptions};
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -247,25 +248,27 @@ impl VarProvider for SessionVars {
"current_schema" => ScalarValue::Utf8(self.search_path().first().cloned()),
"connection_id" => ScalarValue::Utf8(Some(self.connection_id().to_string())),
"current_schemas" => {
let schemas = self
.search_path()
.into_iter()
.map(|path| ScalarValue::Utf8(Some(path)))
.collect::<Vec<_>>();
ScalarValue::List(
Some(schemas),
Field::new("item", DataType::Utf8, true).into(),
)
let search_path = self.search_path();
let mut list = ListBuilder::with_capacity(
StringBuilder::with_capacity(search_path.len(), search_path.len() * 10),
/* list capacity = */ 1,
);
list.append_value(search_path.into_iter().map(Some));
let list = list.finish();
ScalarValue::List(Arc::new(list))
}
"current_schemas_include_implicit" => {
let schemas = self
.implicit_search_path_iter()
.map(|path| ScalarValue::Utf8(Some(path)))
.collect::<Vec<_>>();
ScalarValue::List(
Some(schemas),
Field::new("item", DataType::Utf8, true).into(),
)
let implicit_search_path = self.implicit_search_path();
let mut list = ListBuilder::with_capacity(
StringBuilder::with_capacity(
implicit_search_path.len(),
implicit_search_path.len() * 10,
),
/* list capacity = */ 1,
);
list.append_value(implicit_search_path.into_iter().map(Some));
let list = list.finish();
ScalarValue::List(Arc::new(list))
}
s => Err(datafusion::error::DataFusionError::External(
VarError::UnknownVariable(s.to_string()).into(),
Expand Down
54 changes: 49 additions & 5 deletions crates/sqlbuiltins/src/functions/scalars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,60 @@ fn get_nth_scalar_value(
match input.get(n) {
Some(input) => match input {
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(op(scalar.clone())?)),
ColumnarValue::Array(arr) => Ok(ColumnarValue::Array(scalar_iter_to_array(
(0..arr.len()).map(|idx| -> Result<ScalarValue, ExtensionError> {
Ok(op(ScalarValue::try_from_array(arr, idx)?)?)
}),
)?)),
ColumnarValue::Array(arr) => Ok(ColumnarValue::Array(apply_op_to_col_array(arr, op)?)),
},
None => Err(BuiltinError::MissingValueAtIndex(n)),
}
}

fn apply_op_to_col_array(
arr: &dyn Array,
op: &dyn Fn(ScalarValue) -> Result<ScalarValue, BuiltinError>,
) -> Result<Arc<dyn Array>> {
let mut check_err: Result<()> = Ok(());

let filter_fn = |res: Result<ScalarValue>| {
match (check_err, res) {
// If check_err is already an error, return None (so we
// don't iterate further).
(Err(_), _) => None,
(_, Err(e)) => {
// Set check_err to the corresponding error.
check_err = Err(e);
None
}
(_, Ok(scalar)) => Some(scalar),
}
};

let iter = (0..arr.len())
.filter_map(|idx| {
let res = ScalarValue::try_from_array(arr, idx);
filter_fn(res)
})
.filter_map(|s| {
let res = op(s);
filter_fn(res)
});

// NB: ScalarValue::iter_to_array accepts an iterator over
// Item = ScalarValue but we have an iterator over Item =
// Result<ScalarValue>. To convert, we filter-map the errors but
// that doesn't let us catch the error. To catch the error, we
// have `check_err` (a result) that is initially set to Ok and
// set to the error when the iterator is actually iterated over
// the values in iter_to_array. We filter out all values once
// the check_err is set to an error value so we don't iterate
// any further.
//
// A simpler solution would have been to collect all the values
// in another container (such as a Vec) but that would result in
// extra space being used.
let arr = ScalarValue::iter_to_array(iter)?;
check_err?;
Ok(arr)
}

fn try_from_u64_scalar(scalar: ScalarValue) -> Result<u64, BuiltinError> {
match scalar {
ScalarValue::Int8(Some(value)) => safe_up_cast_integer_scalar(value as i64),
Expand Down

0 comments on commit 42afbeb

Please sign in to comment.