-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix the schema mismatch between logical and physical for aggregate function, add AggregateUDFImpl::is_null
#11989
Conversation
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
let physical_input_schema_from_logical: Arc<Schema> = | ||
logical_input_schema.as_ref().clone().into(); | ||
|
||
debug_assert_eq!(physical_input_schema_from_logical, physical_input_schema, "Physical input schema should be the same as the one converted from logical input schema. Please file an issue or send the PR"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main goal of the change is to ensure they are the same. And, we pass physical_input_schema
through the function that require input's schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Did you consider making this function return an internal_error
rather than debug_assert ?
If we are concerned about breaking existing tests, we could add a config setting like datafusion.optimizer.skip_failed_rules
to let users bypass the check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The objective here is to ensure that the logical schema from ExprSchemable
and the physical schema from ExecutionPlan.schema()
are equivalent. if they are not, it indicates a potential schema mismatch issue. This is also why you can see the code change in this PR are mostly fixing schema related things and they are all required thus I don't think we should let user bypass the check 🤔
If we encounter inconsistent schemas, it raises an important question: Which schema should we use?
Did you consider making this function return an internal_error rather than debug_assert ?
It looks good to me
@@ -1599,11 +1603,10 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( | |||
let ordering_reqs: Vec<PhysicalSortExpr> = | |||
physical_sort_exprs.clone().unwrap_or(vec![]); | |||
|
|||
let schema: Schema = logical_input_schema.clone().into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workaround cleanup
datafusion/expr/src/expr_schema.rs
Outdated
WindowFunctionDefinition::AggregateUDF(func) => { | ||
// TODO: UDF should be able to customize nullability | ||
if func.name() == "count" { | ||
// TODO: there is issue unsolved for count with window, should return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not so familiar with window function yet, leave it as TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can file a ticket to track this -- ideally it would eventually be part of the window function definition itself rather than relying on names
use datafusion_physical_expr::window::WindowExpr; | ||
use std::sync::Arc; | ||
|
||
pub(crate) fn create_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the common function to utils. The logic is the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good change to me, but I don't fully understand how it is all connected . Thank you for taking this on @jayzhan211
I am quite concerned about the use of unsafe
but otherwise I think all this PR needs is some TODOs with ticket references and it would be good to go from my perspective.
let physical_input_schema_from_logical: Arc<Schema> = | ||
logical_input_schema.as_ref().clone().into(); | ||
|
||
debug_assert_eq!(physical_input_schema_from_logical, physical_input_schema, "Physical input schema should be the same as the one converted from logical input schema. Please file an issue or send the PR"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Did you consider making this function return an internal_error
rather than debug_assert ?
If we are concerned about breaking existing tests, we could add a config setting like datafusion.optimizer.skip_failed_rules
to let users bypass the check
datafusion/expr/src/expr_schema.rs
Outdated
WindowFunctionDefinition::AggregateUDF(func) => { | ||
// TODO: UDF should be able to customize nullability | ||
if func.name() == "count" { | ||
// TODO: there is issue unsolved for count with window, should return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can file a ticket to track this -- ideally it would eventually be part of the window function definition itself rather than relying on names
datafusion/expr/src/expr_schema.rs
Outdated
@@ -328,10 +328,45 @@ impl ExprSchemable for Expr { | |||
Ok(true) | |||
} | |||
} | |||
Expr::WindowFunction(WindowFunction { fun, .. }) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change required for this PR or is it a "drive by" improvement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required
datafusion/expr/src/expr_schema.rs
Outdated
} | ||
} | ||
Expr::ScalarFunction(ScalarFunction { func, args }) => { | ||
// If all the element in coalesce is non-null, the result is non-null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add an API to ScalarUDFImpl to signal its null/non-nullness (as a follow on PR) instead of hard coding this function name
func.is_nullable(args)
datafusion/expr/src/udaf.rs
Outdated
@@ -196,6 +196,10 @@ impl AggregateUDF { | |||
self.inner.state_fields(args) | |||
} | |||
|
|||
pub fn fields(&self, args: StateFieldsArgs) -> Result<Field> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we document this function and what it is for (also in AggregateUdfImpl)?
Also, the name is strange to me -- it is fields
but it returns a single Field
and the corresponding function on AggregateUDFImpl
is called field
(no s
) 🤔
@@ -171,6 +171,9 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> { | |||
fn get_minmax_desc(&self) -> Option<(Field, bool)> { | |||
None | |||
} | |||
|
|||
/// Get function's name, for example `count(x)` returns `count` | |||
fn func_name(&self) -> &str; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason this isn't name()
? func_name
is fine, it just seems inconsistent with the rest of the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to identify function (i.e. count), there is name()
already, but it includes arguments (i.e. count(x)), which is not I want.
Alternative way is introduce nullable()
for AggregateUDF, so we don't need name checking. Maybe I should done it before this PR.
*union_nullable = *union_nullable || plan_field.is_nullable(); | ||
|
||
// Safety: Length is checked | ||
unsafe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this unsafe block is unecessary -- this isn't a performance critical piece of code. I think izip
or just manuallly zip
ping three times would be better
@@ -80,6 +80,14 @@ impl WindowExpr for PlainAggregateWindowExpr { | |||
} | |||
|
|||
fn field(&self) -> Result<Field> { | |||
// TODO: Fix window function to always return non-null for count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment -- can we please file a ticket to track it (and add the ticket reference to the comments)?
@@ -97,6 +97,10 @@ impl BuiltInWindowExpr { | |||
} | |||
|
|||
impl WindowExpr for BuiltInWindowExpr { | |||
fn func_name(&self) -> Result<&str> { | |||
not_impl_err!("function name not determined") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why wouldn't we implement func_name for a built in window function 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is because I don't need it -- for name checking in nullable
I think |
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@@ -435,6 +459,10 @@ impl AggregateExpr for AggregateFunctionExpr { | |||
.is_descending() | |||
.and_then(|flag| self.field().ok().map(|f| (f, flag))) | |||
} | |||
|
|||
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to add 4 places for a new function, might be room to improve 🤔
Maybe we don't need AggregateExpr
since there is only one implement at all. I think trait is useful if there are at least 2 implementation shares similar function. Similar idea from #11810
use std::collections::{HashMap, VecDeque}; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move them back to top?
@@ -35,6 +30,9 @@ use datafusion_common::utils::evaluate_partition_ranges; | |||
use datafusion_common::{Result, ScalarValue}; | |||
use datafusion_expr::window_state::{WindowAggState, WindowFrameContext}; | |||
use datafusion_expr::WindowFrame; | |||
use std::any::Any; | |||
use std::ops::Range; | |||
use std::sync::Arc; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move them back to the top?
/// while `count` returns 0 if input is Null | ||
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> { | ||
ScalarValue::try_from(data_type) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can improve the docuemention?
Thanks @alamb @berkaysynnada @ozankabak |
@jayzhan211 I'm in the process of upgrading spiceai to use DataFusion 42 and I'm running into the schema mismatch error from this PR:
I have a custom TableProvider and I get the error when running a let expected_plan = [
"+---------------+--------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------------------------------+",
"| logical_plan | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |",
"| | BytesProcessedNode |",
"| | TableScan: non_federated_abc projection=[] |",
"| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] |",
"| | CoalescePartitionsExec |",
"| | AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] |",
"| | BytesProcessedExec |",
"| | SchemaCastScanExec |",
"| | RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 |",
"| | SqlExec sql=SELECT \"id\", \"created_at\" FROM non_federated_abc |",
"| | |",
"+---------------+--------------------------------------------------------------------------------+",
]; ( My assumption of what is going on here is that logically no columns are required for the logical plan to come up with the count of the number of rows, but the TableProvider has to return all of the columns because it needs the rows to perform the count aggregation. But it ends up throwing away the columns because they get erased in the aggregation. Thus the check that the physical schema and the logical schema are equal is not strictly needed for this plan. Does that sound right? |
@itsjunetime is also having some issues related to this ticket during our upgrade of DataFusion, I am not sure if they are releated |
What is the logical schema and physical schema you have (the error)? I think they should be consistent even for count (*) statement. They (logical & physical)should either have all the columns or no columns |
I added [/Users/phillip/code/apache/datafusion/datafusion/core/src/physical_planner.rs:676:21] &physical_input_schema = Schema {
fields: [
Field {
name: "id",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "created_at",
data_type: Timestamp(
Nanosecond,
Some(
"+00:00",
),
),
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
}
[/Users/phillip/code/apache/datafusion/datafusion/core/src/physical_planner.rs:677:21] &physical_input_schema_from_logical = Schema {
fields: [],
metadata: {},
}
thread 'acceleration::query_push_down::acceleration_with_and_without_federation' panicked at crates/runtime/tests/acceleration/query_push_down.rs:218:10:
collect working: Internal("Physical input schema should be the same as the one converted from logical input schema.") |
I think the ideal way is to have something like a
Could we keep all the fields for the logical plan to make them consistent? |
That seems fine to me. |
I'm getting issues where both schemas are equivalent except for the metadata on the fields. I think (correct me if I'm wrong) that field metadata doesn't actually need to be equivalent for the invariant that this error is trying to catch to be upheld. I think we could comfortably switch from directly comparing the schemas with I'm also seeing issues that don't propagate with this exact error (but rather complain that |
If the metadata is mismatched, it indicates we lost the metadata somewhere when passing through the schema info, so I think it makes sense to check the metadata too. Maybe we should figure out the reason why metadata is mismatched first |
…nction, add `AggregateUDFImpl::is_null` (apache#11989) * schema assertion and fix the mismatch from logical and physical Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * add more msg Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * cleanup Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm test1 Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * nullable for scalar func Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * nullable Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm field Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm unsafe block and use internal error Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm func_name Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm nullable option Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * add test Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * add more msg Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix test Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm row number Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * Update datafusion/expr/src/udaf.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/expr/src/udaf.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * fix failed test from apache#12050 Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * cleanup Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * add doc Signed-off-by: jayzhan211 <jayzhan211@gmail.com> --------- Signed-off-by: jayzhan211 <jayzhan211@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
thank you! |
Which issue does this PR close?
Closes #.
Part of #11782 , it would be nice to cleanup schema before fighting with physical name
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?