-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Design how to respect output stream ordering #424
Comments
We may want to consider following Spark's approach here, since we are already implementing other methods such as For sorting, Spark has: /** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder]
/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] |
My thoughts: I think it will be simpler, as @tustvold has suggested, to do the majority / all of sort based optimizations (e.g. optimize away a Sort) on the
Encoding the requirements / assumptions of In terms of physical plans, what about adding something like |
I agree that doing this at the logical plan level is simpler. My concern is that later on, if/when we make Ballista viable, we may want to dynamically optimize the query while it is executing, based on statistics from completed stages, and I was hoping to avoid Spark's approach of going back to the logical plan for this, and just optimize the physical plan. However, we can always revisit this later. |
I do think dynamically changing the plan (or algorithms) based on actual execution experience is currently state of the art. I have often wondered if it is better done within the operators themselves (like maybe a join deciding to switch to sort-merge-join when it filled up its hash tables, or sampling both inputs to decide which was smaller. etc) I would have to think of the kinds of dynamic plan changes we might want to do |
Proposed fix: #1776 |
Related PR: #7671 (review) |
As identified in #423 and #378 (and #412) there needs to be a mechanism for physical operators to express their behaviour with respect to sort order, so that optimisation passes can handle it correctly.
It is assumed that the initial physical plan created from the logical plan is valid, and that the requirement is for the optimisation phase to not alter the plan in a way that violates its implicit ordering requirements. I think it is therefore sufficient to encode some notion of sort sensitivity, as opposed to what the sort order necessarily is. I believe any optimisations related to the specific sort orders being utilised would take place at the LogicalPlan level, and avoiding this at the physical layer sidesteps issues around equality for PhysicalExpressions, etc...
The proposal would be to introduce a new member function to
ExecutionPlan
calledpartition_order()
that returns a variant of a new enumPartitionOrder
. This would have three variants:PartitionOrder::Preserving
- operators that preserve the ordering of their input partition(s) - e.g.FilterExec
,CoalesceBatchesExec
PartitionOrder::Sensitive
- operators which rely on the order of their input partition(s) - e.g.GlobalLimitExec
,LocalLimitExec
PartitionOrder::Agnostic
- operators which do not rely on, nor preserve the order of their input partition(s) - e.g.HashAggregateExec
,MergeExec
,RepartitionExec
Note that the formulation does not distinguish between 1 or many partitions, as this is a detail already encapsulated by
required_child_distribution
(although I do wonder if this should be a property of the plan and not the operators themselves). There is no mechanism to express an ordering requirement across partitions, I'm not sure that this would be useful.The default implementation of
partition_order()
would returnPartitionOrder::Sensitive
. Or to put it another way, unless explicitly told otherwise the optimiser cannot assume that an operator isn't sensitive to the ordering of its inputs.The
Repartition
pass would then be modified to only insert aRepartitionExec
on branches of the graph that have noPartitionOrder::Sensitive
operators without an interveningPartitionOrder::Agnostic
operator. This would fix #423.AddMergeExec
could additionally be modified to error if it find itself needing to insert aMergeExec
on an order sensitive branch.Eventually as certain forms of
RepartitionExec
are order preserving, e.g. splitting a single partition into multiple, this could be codified and combined with a modified version ofAddMergeExec
that inserts an order preserving merge. This would naturally fit into the proposed framework.I'm not sure how ordering is typically handled in query engines, so if there is a standard solution I'd be happy to go with that instead, but thought I'd write up the simplest solution I can see to the issue in #423
The text was updated successfully, but these errors were encountered: