Skip to content

Commit

Permalink
Fixing multi-sender stage configurations (#951)
Browse files Browse the repository at this point in the history
This PR fixes a long overdue placeholder exception from back when we were still using `streamz`. If a user configured multiple upstream stages to a single downstream port, then you would get a `NotImplementedError`. For example, the following would cause an exception:

```python
source1 = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))
source2 = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))

sink_stage = pipe.add_stage(InMemorySinkStage(config))

pipe.add_edge(source1, sink_stage)
pipe.add_edge(source2, sink_stage)
```

Also adds a simple tests to run the above scenario.

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)
  - Devin Robison (https://github.com/drobison00)

URL: #951
  • Loading branch information
mdemoret-nv authored May 19, 2023
1 parent 3eec11d commit 23ae068
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 20 deletions.
4 changes: 2 additions & 2 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ def inner_build(builder: mrc.Builder, segment_id: str):
# Finally, execute the link phase (only necessary for circular pipelines)
# for s in source_and_stages:
for stage in segment_graph.nodes():
for port in stage.input_ports:
port.link()
for port in typing.cast(StreamWrapper, stage).input_ports:
port.link(builder=builder)

logger.info("====Building Segment Complete!====")

Expand Down
21 changes: 10 additions & 11 deletions morpheus/pipeline/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import typing

import mrc
import typing_utils

import morpheus.pipeline as _pipeline
Expand Down Expand Up @@ -80,7 +81,7 @@ def in_stream(self):
def in_type(self):
return self._input_type

def get_input_pair(self) -> StreamPair:
def get_input_pair(self, builder: mrc.Builder) -> StreamPair:
"""
Returns the input `StreamPair` which is a tuple consisting of the parent node and the parent node's output type.
"""
Expand All @@ -99,17 +100,15 @@ def get_input_pair(self) -> StreamPair:
self._is_linked = True
else:
# We have multiple senders. Create a dummy stream to connect all senders
self._input_stream = builder.make_node_component(
f"{self.parent.unique_name}-reciever[{self.port_number}]", mrc.core.operators.map(lambda x: x))

if (self.is_complete):
# Connect all streams now
# self._input_stream = streamz.Stream(upstreams=[x.out_stream for x in self._input_senders],
# asynchronous=True,
# loop=IOLoop.current())
raise NotImplementedError("Still using streamz")
for input_sender in self._input_senders:
builder.make_edge(input_sender.out_stream, self._input_stream)

self._is_linked = True
else:
# Create a dummy stream that needs to be linked later
# self._input_stream = streamz.Stream(asynchronous=True, loop=IOLoop.current())
raise NotImplementedError("Still using streamz")

# Now determine the output type from what we have
great_ancestor = greatest_ancestor(*[x.out_type for x in self._input_senders if x.is_complete])
Expand All @@ -123,7 +122,7 @@ def get_input_pair(self) -> StreamPair:

return (self._input_stream, self._input_type)

def link(self):
def link(self, builder: mrc.Builder):
"""
The linking phase determines the final type of the `Receiver` and connects all underlying stages.
Expand All @@ -146,6 +145,6 @@ def link(self):
"Invalid linking phase. Input port type does not match predicted type determined during build phase")

for out_stream in [x.out_stream for x in self._input_senders]:
out_stream.connect(self._input_stream)
builder.make_edge(out_stream, self._input_stream)

self._is_linked = True
4 changes: 2 additions & 2 deletions morpheus/pipeline/single_port_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def accepted_types(self) -> typing.Tuple:
"""
pass

def _pre_build(self) -> typing.List[StreamPair]:
in_ports_pairs = super()._pre_build()
def _pre_build(self, builder: mrc.Builder) -> typing.List[StreamPair]:
in_ports_pairs = super()._pre_build(builder=builder)

# Check the types of all inputs
for x in in_ports_pairs:
Expand Down
10 changes: 5 additions & 5 deletions morpheus/pipeline/stream_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ def build(self, builder: mrc.Builder, do_propagate=True):
assert self._pipeline is not None, "Must be attached to a pipeline before building!"

# Pre-Build returns the input pairs for each port
in_ports_pairs = self._pre_build()
in_ports_pairs = self._pre_build(builder=builder)

out_ports_pair = self._build(builder, in_ports_pairs)
out_ports_pair = self._build(builder=builder, in_ports_streams=in_ports_pairs)

# Allow stages to do any post build steps (i.e., for sinks, or timing functions)
out_ports_pair = self._post_build(builder, out_ports_pair)
out_ports_pair = self._post_build(builder=builder, out_ports_pair=out_ports_pair)

assert len(out_ports_pair) == len(self.output_ports), \
"Build must return same number of output pairs as output ports"
Expand All @@ -348,8 +348,8 @@ def build(self, builder: mrc.Builder, do_propagate=True):

dep.build(builder, do_propagate=do_propagate)

def _pre_build(self) -> typing.List[StreamPair]:
in_pairs: typing.List[StreamPair] = [x.get_input_pair() for x in self.input_ports]
def _pre_build(self, builder: mrc.Builder) -> typing.List[StreamPair]:
in_pairs: typing.List[StreamPair] = [x.get_input_pair(builder=builder) for x in self.input_ports]

return in_pairs

Expand Down
25 changes: 25 additions & 0 deletions tests/test_nonlinear_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import mrc
import mrc.core.operators as ops
import pytest
from mrc.core.node import Broadcast

from morpheus.config import Config
Expand All @@ -27,6 +28,7 @@
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from utils import assert_results
from utils.dataset_manager import DatasetManager

Expand Down Expand Up @@ -95,3 +97,26 @@ def test_forking_pipeline(config, dataset_cudf: DatasetManager):
# Get the results
assert_results(comp_higher.get_results())
assert_results(comp_lower.get_results())


@pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)])
def test_port_multi_sender(config, dataset_cudf: DatasetManager, source_count, expected_count):

filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe = Pipeline(config)

input_ports = []
for x in range(source_count):
input_port = f"input_{x}"
input_ports.append(input_port)

sink_stage = pipe.add_stage(InMemorySinkStage(config))

for x in range(source_count):
source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_edge(source_stage, sink_stage)

pipe.run()

assert len(sink_stage.get_messages()) == expected_count

0 comments on commit 23ae068

Please sign in to comment.