Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing walk order to resolve priority in multi-sink pipelines #120

Merged
merged 3 commits into from
Nov 13, 2024

Conversation

amitschang
Copy link
Member

@amitschang amitschang commented Nov 8, 2024

When adding multiple sink tasks along the pipeline, it was observed that the further-along tasks do not end up getting priority to run, which could cause a pipeline to effectively run a path to a single output closer to the start and pile up a queue on downstream tasks. Looks like this is due to order of BFS search going backwards, which ends up at some nodes closer to start (following output closer to start) quicker.

Meaning for instance in the pipeline below, the green and yellow paths take more priority. Due to batch sizes, the green path would remain unscheduled, but the yellow path with single row batches and single cpu requests would simply keep taking priority and never submit the while tasks.

image

The fix here is to penalize the tasks further from the sink along any path, at the same time the sink tasks are all considered the same rank, and thus we can continue to write out quickly. Any tasks at same rank are ordered as before (by execution count).

Note: This comes along with a small additional change: instead of submitting as many invocations of a particular task that fit at a time, we continue to evaluate the sort order, walking backward each time - this makes the pipeline "fairer" in a sense.

Copy link

codecov bot commented Nov 8, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

📢 Thoughts on this report? Let us know!

task.pending.appendleft(self.task_submit(task.task, merged))
task.counter += 1
submitted = True
num_to_merge = deque_num_merge(task.data_in, batch_size)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: the change in this block simply removes the for loop, submitting (if eligible) only a single invocation of task at a time (to enable the fairness re-evaluation). The break -> return makes the diff look like more than whitespace.

Comment on lines +201 to +211
submitted = True
while submitted:
rank = 0
submitted = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below this is unchanged. This is here to implement fairness, so long as anything has been submitted, we keep walking the graph from top down but with new sort order. Once nothing is submitted, either no room or need sources.

@amitschang amitschang marked this pull request as ready for review November 8, 2024 20:48
@amitschang amitschang requested a review from a team as a code owner November 8, 2024 20:48
Copy link

@xiangchenjhu xiangchenjhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sorting logic in graph.py is well-structured, and I spent additional time reviewing the scheduling details and trade-off methods in stream.py. While I may not have fully delved into and understood every detail of the scheduling process (though I grasp most of them), I have no concerns with the current changes. Thanks for the helpful explanations provided in the comments.
Approved

@xiangchenjhu xiangchenjhu self-requested a review November 13, 2024 20:11
@amitschang
Copy link
Member Author

thanks!

@amitschang amitschang merged commit bc95a0b into main Nov 13, 2024
11 checks passed
@amitschang amitschang deleted the fix-task-walk-order branch November 13, 2024 20:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants