From 76009ea397e03529a48751ba0a6f3a0ff723bfe5 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Fri, 8 Nov 2024 14:52:58 -0500 Subject: [PATCH 1/3] fixing walk order to resolve priority in multi-sink pipelines --- dplutils/pipeline/graph.py | 21 ++++++++- dplutils/pipeline/stream.py | 64 +++++++++++++++------------ tests/pipeline/test_pipeline_graph.py | 19 ++++++++ 3 files changed, 75 insertions(+), 29 deletions(-) diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index 69f355d..33d9ca1 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -1,3 +1,4 @@ +from collections import defaultdict from enum import Enum from networkx import DiGraph, all_simple_paths, bfs_edges, is_directed_acyclic_graph, path_graph @@ -69,6 +70,7 @@ def _sort_key(x): return 0 if isinstance(x, TRM) else sort_key(x) sorter = (lambda x: sorted(x, key=_sort_key)) if sort_key else None + for _, node in bfs_edges(graph, source, reverse=back, sort_neighbors=sorter): if not isinstance(node, TRM): yield node @@ -101,4 +103,21 @@ def walk_back(self, source=None, sort_key=None): tasks in order of callable `sort_key`, which should return a sortable object given :class:`PipelineTask` as input. """ - return self._walk(source or TRM.sink, back=True, sort_key=sort_key) + paths = all_simple_paths(self.with_terminals().reverse(), source or TRM.sink, TRM.source) + depths = defaultdict(int) + layers = defaultdict(list) + # unlike bfs_edges/bfs_layers, we order by maximum depth from source, to + # try and ensure we prioritize outputs while also preferring tasks + # further along. + for path in paths: + for i, node in enumerate(path): + if isinstance(node, TRM) or node == source: + continue + depths[node] = max(depths[node], i) + for node, depth in depths.items(): + layers[depth].append(node) + # layers will be keyed by maximum distance from source, containing a + # list of nodes at that distance. Yield based on sort key secondarily. + for i in sorted(layers.keys()): + for node in sorted(layers[i], key=sort_key or (lambda x: 0)): + yield node diff --git a/dplutils/pipeline/stream.py b/dplutils/pipeline/stream.py index c5cb006..eca397a 100644 --- a/dplutils/pipeline/stream.py +++ b/dplutils/pipeline/stream.py @@ -165,9 +165,11 @@ def _feed_source(self, source): total_length += len(next_df) def enqueue_tasks(self): - # Work through the graph in reverse order, submitting any tasks as - # needed. Reverse order ensures we prefer to send tasks that are closer - # to the end of the pipeline and only feed as necessary. + # helper to make submission decision of a single task based on the batch + # size, exhaustion conditions, and whether the implementation deems it + # submittable. Returns flags (eligible, submitted) to indicate whether + # it was eligible to be submitted based on input queue and batch size, + # and whether it was actually submitted. def _handle_one_task(task, rank): eligible = submitted = False if len(task.data_in) == 0: @@ -179,34 +181,40 @@ def _handle_one_task(task, rank): self.logger.debug(f"Enqueueing split for <{task.name}>[bs={batch_size}]") task.split_pending.appendleft(self.split_batch_submit(batch, batch_size)) - while len(task.data_in) > 0: - num_to_merge = deque_num_merge(task.data_in, batch_size) - if num_to_merge == 0: - # If the feed is terminated and there are no more tasks that - # will feed to this one, submit everything - if self.source_exhausted and self.task_exhausted(task): - num_to_merge = len(task.data_in) - else: - break - eligible = True - if not self.task_submittable(task.task, rank): - break - merged = [task.data_in.pop().data for i in range(num_to_merge)] - self.logger.debug(f"Enqueueing merged batches <{task.name}>[n={len(merged)};bs={batch_size}]") - task.pending.appendleft(self.task_submit(task.task, merged)) - task.counter += 1 - submitted = True + num_to_merge = deque_num_merge(task.data_in, batch_size) + if num_to_merge == 0: + # If the feed is terminated and there are no more tasks that + # will feed to this one, submit everything + if self.source_exhausted and self.task_exhausted(task): + num_to_merge = len(task.data_in) + else: + return (eligible, submitted) + eligible = True + if not self.task_submittable(task.task, rank): + return (eligible, submitted) + + merged = [task.data_in.pop().data for _ in range(num_to_merge)] + self.logger.debug(f"Enqueueing merged batches <{task.name}>[n={len(merged)};bs={batch_size}]") + task.pending.appendleft(self.task_submit(task.task, merged)) + task.counter += 1 + submitted = True return (eligible, submitted) # proceed through all non-source tasks, which will be handled separately - # below due to the need to feed from generator. - rank = 0 - for task in self.stream_graph.walk_back(sort_key=lambda x: x.counter): - if task in self.stream_graph.source_tasks: - continue - eligible, _ = _handle_one_task(task, rank) - if eligible: # update rank of this task if it _could_ be done, whether or not it was - rank += 1 + # below due to the need to feed from generator. We walk backwards, + # re-evaluating the sort order of tasks of same depth after each single + # submission, implementing a kind of "fair" submission, while still + # prioritizing tasks closer to the sink. + submitted = True + while submitted: + rank = 0 + submitted = False + for task in self.stream_graph.walk_back(sort_key=lambda x: x.counter): + if task in self.stream_graph.source_tasks: + continue + eligible, submitted = _handle_one_task(task, rank) + if eligible: # update rank of this task if it _could_ be done, whether or not it was + rank += 1 # Source as many inputs as can fit on source tasks. We prioritize flushing the # input queue and secondarily on number of invocations in case batch sizes differ. diff --git a/tests/pipeline/test_pipeline_graph.py b/tests/pipeline/test_pipeline_graph.py index 0ee0b88..678a8c7 100644 --- a/tests/pipeline/test_pipeline_graph.py +++ b/tests/pipeline/test_pipeline_graph.py @@ -31,6 +31,7 @@ def graph_suite(): "multisource": make_graph_struct([(a, c), (b, c), (c, d)], [a, b], [d]), "multisink": make_graph_struct([(a, b), (b, c), (b, d)], [a], [c, d]), "branchmulti": make_graph_struct([(a, c), (b, c), (c, d), (c, e), (d, f), (e, g)], [a, b], [f, g]), + "branchmultiout": make_graph_struct([(a, b), (b, c), (b, d), (d, e), (d, f), (f, g)], [a], [c, e, g]), } @@ -79,6 +80,14 @@ def test_graph_walk_returns_node_list(self, graph_info): assert walked[-1] in graph_info.sources assert len(walked) == len(p) + def test_graph_walk_excludes_starting_node(self, graph_info): + p = PipelineGraph(graph_info.edges) + source = graph_info.sinks[0] + walked = list(p.walk_back(source)) + assert source not in walked + walked = list(p.walk_fwd(source)) + assert source not in walked + def test_graph_walk_with_priority(): test = graph_suite()["branched"] @@ -92,6 +101,16 @@ def test_graph_walk_with_priority(): assert walked == [p.task_map[i] for i in ["e", "d", "c", "b", "a"]] walked = list(p.walk_fwd(sort_key=lambda x: -x.func)) assert walked == [p.task_map[i] for i in ["a", "b", "d", "c", "e"]] + # make sure to test with multi output, which can make priority in BFS more + # challenging, specifically in the back direction. Critically below, nodes + # "b" and "d" are both 2 away from the sink at minimum, but "f" is farther + # along so it should be priority, while all sinks should still be + # prioritized + p = PipelineGraph(graph_suite()["branchmultiout"].edges) + walked = list(p.walk_back(sort_key=lambda x: x.func)) + assert walked == [p.task_map[i] for i in ["c", "e", "g", "f", "d", "b", "a"]] + walked = list(p.walk_back(sort_key=lambda x: -x.func)) + assert walked == [p.task_map[i] for i in ["g", "e", "c", "f", "d", "b", "a"]] def test_single_node_graph_to_list(): From 1806d979436aeffd352a1830a7477d4cc1884c68 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Mon, 11 Nov 2024 14:03:05 -0500 Subject: [PATCH 2/3] re-eval on submit fix --- dplutils/pipeline/stream.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dplutils/pipeline/stream.py b/dplutils/pipeline/stream.py index eca397a..1402753 100644 --- a/dplutils/pipeline/stream.py +++ b/dplutils/pipeline/stream.py @@ -215,6 +215,8 @@ def _handle_one_task(task, rank): eligible, submitted = _handle_one_task(task, rank) if eligible: # update rank of this task if it _could_ be done, whether or not it was rank += 1 + if submitted: + break # Source as many inputs as can fit on source tasks. We prioritize flushing the # input queue and secondarily on number of invocations in case batch sizes differ. From 8c2e77206a3436fec540a79878ad4129fab874f8 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Mon, 11 Nov 2024 14:54:44 -0500 Subject: [PATCH 3/3] add test for stream submission ordering logic --- tests/pipeline/test_stream_executor.py | 46 ++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/pipeline/test_stream_executor.py b/tests/pipeline/test_stream_executor.py index a0c4f7d..4c61026 100644 --- a/tests/pipeline/test_stream_executor.py +++ b/tests/pipeline/test_stream_executor.py @@ -1,3 +1,5 @@ +from collections import defaultdict + import pandas as pd import pytest from test_suite import PipelineExecutorTestSuite @@ -77,3 +79,47 @@ def generator(): pl = LocalSerialExecutor(dummy_steps, generator=generator).set_config("task1.batch_size", 1) res = [i.data for i in pl.run()] assert len(res) == 8 + + +def test_stream_submission_ordering_evaluation_priority(): + # tracking class adds counts and a parallel submission which allows us to + # locally test that the re-prioritization during submission is working. If + # so, we should expect terminal tasks having even numbers of calls and being + # preferred (as opposed to submitting n parallel all to one task, or + # submitting some to upstream tasks). + class MyExec(LocalSerialExecutor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.counts = defaultdict(int) + self.parallel_submissions = 4 + self.n_parallel = 0 + + def task_submit(self, task, data): + self.counts[task.name] += 1 + self.n_parallel += 1 + return super().task_submit(task, data) + + def task_submittable(self, t, rank): + if t in self.graph.source_tasks: + return True + return self.n_parallel < self.parallel_submissions + + def poll_tasks(self, pending): + self.n_parallel = 0 + + a = PipelineTask("a", lambda x: x, batch_size=16) + b = a("b", batch_size=1) + (c, d, e) = (b("c"), b("d"), b("e")) + # graph with multiple terminals. The large input batch size ensures we have + # work to submit in parallel to exercise the re sorting logic. + p = MyExec([(a, b), (b, c), (c, d), (c, e)], max_batches=16) + p_run = p.run() + _ = [next(p_run) for _ in range(4)] # pop number based on parallel submissions + assert p.counts["d"] == p.counts["e"] == 2 # terminals should have even counts + assert p.counts["b"] == p.counts["c"] == 4 # only just enough to submit 4 + _ = [next(p_run) for _ in range(4)] + assert p.counts["d"] == p.counts["e"] == 4 # we finish the 4 batch size + assert p.counts["b"] == p.counts["c"] == 4 # but do no more upstream work + _ = [next(p_run) for _ in range(4)] + assert p.counts["d"] == p.counts["e"] == 6 # need to get more work, as above + assert p.counts["b"] == p.counts["c"] == 8 # more upstream for that work