Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47609][SQL] Making CacheLookup more optimal to minimize cache miss #49150

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ahshahid
Copy link

What changes were proposed in this pull request?

Currently the CacheLookup relies on the fragment ( sub plan of incoming query) matching exactly the analyzed Logical Plan in CacheManager for which InMemoryRelation exists.
This limits efficiency of the lookup.. for eg
consider a simple case
we have an InMemoryRelation available for a plan as

Project(a, b, c, d) -> Cached IMR
|
Project( x as a, b, b+a as c, b - c as d)

and the incoming plan is say
Project( a, b, c)
|
Project( x as a, b, b+a as c, b - c as d)

Currently the incoming look up will not be able to use Cached IMR, as top level Projects do not match.
But it is possible to use the IMR by putting a projection of (a, b c)
i.e Project( a, b, c)
|
Cached IMR

Below describes how more complex plans can use cache IMR , with filters present in between Projects.
**The main idea is : for each incoming fragment of incoming subplan, check if there is an exact match plan in the cache data. ( this is the current code). If matches use that, else look for a partial match **

** A partial match, among other requirements, also mandates that incoming_plan.child is canonically same as cachePlan.child **

It can be argued that why only 1 level depth equality is needed, as it is possible that the cache plan is usable even if the child do not match, but grand child match.

i.e incoming_plan.child.child == cachePlan.child .child
and in that sense the argument can be extended to the leaf..

So we limit to 1 child level check for partial match for following reasons:

  1. Reduce the complexity
  2. Keep lookup time in reasonable limit
  3. Hopefully the next PR in line PR-SPARK-45959, which aggresively collapses two adjacents Projects or 2 Projects interspersed with Filters, into a single project in analyzer phase, if possible, means that in most cases, the child match would be good enough for partial match requirement..

Case 1: using InMemoryRelation in a plan having 2 consecutive Projects.

We start with a DataFrame df1 with plan = Project2 -> X
and then we cache this df1. So that the CachedRepresentation has ( IMR and the logical Plan as Project2 -> X)

Now we create a new data frame Df2 = Some Project -> X
Clearly Project may no longer be same as Project2, so a direct check with CacheManager will not result in matching IMR.
But clearly X are same .
So the criteria is : an IMR can be used IFF following conditions are met

X for both is same ( i.e incoming Project's child and CachedPlan's Project's child are same)
All the NamedExpressions of Incoming Project are expressable in terms of output of Project2 ( which is what IMR's output is )
To do the check for above point 2, we consider following logic
Now given that X for both are same, which means their outputs are equivalent, so we remap the cached plan's Project2 in terms of output attribute ( Expr IDs) of X of incoming Project Plan
This will help us find out following
Those NamedExpressions of incoming Project which are directly same as NamedExpressions of Project 2
Those NamedExpressions of incoming Project which are some functions of output of Project 2
Those NamedExpressions of incoming Project which are sort of Literal Constants and independent of output of Project2
Those NamedExpressions of incoming Project which are functions of some attributes but those attributes are unavailable in the output of Project2
So so long as above # 4 types of NamedExpressions are empty, it means that InMemoryRelation of the CachedPlan is usable.
and this above logic is coded in CacheManager. The logic involves modifying the NamedExpressions in incoming Project, in terms of the Seq[Attributes] which will be forced on the IMR.

Case 2: using InMemoryRelation in a plan resulting from collapse of Projects interspersed with Filters.

We start with a DataFrame df1 with plan = Filter3 -> Filter4 -> Project2 -> X
and then we cache this df1. So that the CachedRepresentation has ( IMR and the logical Plan as
Filter3 -> Filter4 -> Project2 -> X )

Now we create a new data frame Df2 = Project -> Filter1 -> Filter2 -> Filter3 -> Filter4 -> X

Clearly here the cached plan chain
Filter3 -> Filter4 -> Project2 -> X
is no longer directly similar to
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
But it is still possible to use IMR as actually the cached plan's LogicalPlan can be used as a subtree of the incoming Plan.

The logic for such check is partly the same as above for 2 consecutive Projects, with some handling for filters.
The algo for this is as follows

Identify the "child" X from the incoming plan and the Cached Plan 's Logical Plan. for similarity check.
For incoming Plan, we reach X, and store all the consecutive Filter Chain.
For the Cached Plan, we identify the first encountered Project , which is Project 2, and its child which is X.

so we have X from both incoming and cached plan, and we identify the incoming project "Project" and the CachedPlan's "Project2".
Now we can apply the Rule of case 1 of two consecutive Projects, and correctly modify the NamedExpressions of incoming Project , in terms of Seq[Attributes] which will be enforced upon the IMR.

But we also need to ensure that the filter chain present in Cached Plan i.e Filter3 -> Filter4 is a subset of filter chain in the incoming Plan , which is Filter1 -> Filter2 -> Filter3 - Filter4.
Now thing to note is that
for incoming plan it is
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
In the above chain, the filters are expressed in terms of output of X
But for cached plan the filters are expressed in terms of output of Project2.
Filter3 -> Filter4 -> Project2 -> X

So for comparison we need to express the filter chain of Cached Plan in terms of X, by pulling up the P2 above filters such that
it is now
Project2 -> Filter3' -> Filter4' -> -> X

Now we can see that if we compare
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X

and find that Filter3' -> Filter4' is a subset of Filter1 -> Filter2 -> Filter3 - Filter4
and as the Project and Project2 are already compatible ( by case point 1)
we can use cached IMR , with a modified Project with partial filter chain.

i.e we should be able to get a plan like

Project -> Filter1 -> Filter2 -> IMR.

Why are the changes needed?

This PR attempts to make cache look up more robust to make use of existing IMR as much as possible.
And this change is a neccessary requirement for another opened PR which collapses projects aggresively PR_SPARK-45959

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new tests and relying on existing tests.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Dec 11, 2024
@ahshahid ahshahid changed the title SPARK-47609. Making CacheLookup more optimal to minimize cache miss [SPARK-47609][SQL] Making CacheLookup more optimal to minimize cache miss Dec 11, 2024
@ahshahid
Copy link
Author

Previous chain of comments:
From @anchovYu
Hi @ahshahid , thanks for the proposal and the PR. However, the current Dataframe cache design has a lot of design flaws, I would worry that improving the cache hit rate in this case will make these problems worse:

stale results
The current design of Dataframe cache is unable to proactively detect data staleness. Consider the case the table is changed by some other applications - Dataframe cache never knows that and it leads to outdated results. In this case, increasing the hit rate potentially increases the risk of hitting stale data.
unstable performance
Since Dataframe cache is shared across Spark sessions and applied automatically, one session could use the data cached by another session in the same driver, or no longer be able to use the cached data when it is invalidated by another session in the same driver, all without any notice. It results in unpredictable performance for even two consecutive executions on the same query in one session in the same driver. Similarly, increasing the hit rate may make it easier to encounter such unstable performance issues.
cc @cloud-fan

From @ahshahid
@anchovYu
I see, what you are saying.
Actually this PR arose as sub - pr for the issue spark-45959.
The PR for the same is #43854
The increase of the cache hit is a by product of the above PR.
From my experience and even the current customer use cases, this issue ( spark-45959) has caused increased in compile time from say under a minute to anything ranging from 2hrs - 8 hrs +.
Whether the issues of stale cache etc increase or not would very much depend upon the nature of the dataframe cached .
Also most of the time the user would cache a dataframe and then keep building new data frames from the base. So in that sense, even currently , there is no way to prevent new data frames from not using cached data , if the sub-plan matches.
And whatever fix goes in for stale cache resolution issue would automatically apply to this change too.
So pls consider this PR per se not in the light of increasing the cache hit, but as a requirement to resolve the issue of spark-45959.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant