Skip to content

Commit

Permalink
Add option to folly::channels::merge to close when any input closes
Browse files Browse the repository at this point in the history
Summary: This diff adds an option to folly::channels::merge to close if any of the inputs close. The default remains the same (to only close when all the inputs close, or any input throws).

Differential Revision: D49762036

fbshipit-source-id: 4f04ee1b5cf28525a154cc2c8b5716b4e59f86c3
  • Loading branch information
Andrew Smith authored and facebook-github-bot committed Dec 3, 2023
1 parent 4542228 commit f675595
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 11 deletions.
28 changes: 19 additions & 9 deletions folly/experimental/channels/Merge-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace detail {
* object will then be deleted once each remaining input receiver transitions to
* the CancellationProcessed state (after we receive each cancelled callback).
*/
template <typename TValue>
template <typename TValue, bool WaitForAllInputsToClose>
class MergeProcessor : public IChannelCallback {
public:
MergeProcessor(
Expand Down Expand Up @@ -214,11 +214,14 @@ class MergeProcessor : public IChannelCallback {
CHECK_EQ(getReceiverState(receiver), ChannelState::CancellationTriggered);
receivers_.erase(receiver);
(ChannelBridgePtr<TValue>(receiver));
if (closeResult.exception.has_value()) {
// We received an exception. We need to close the sender and all other
// receivers.
if (closeResult.exception.has_value() || !WaitForAllInputsToClose) {
// We need to close the sender and all other receivers.
if (getSenderState() == ChannelState::Active) {
sender_->senderClose(std::move(closeResult.exception.value()));
if (closeResult.exception.has_value()) {
sender_->senderClose(std::move(closeResult.exception.value()));
} else {
sender_->senderClose();
}
}
for (auto* otherReceiver : receivers_) {
if (getReceiverState(otherReceiver) == ChannelState::Active) {
Expand Down Expand Up @@ -278,11 +281,18 @@ class MergeProcessor : public IChannelCallback {
template <typename TReceiver, typename TValue>
Receiver<TValue> merge(
std::vector<TReceiver> inputReceivers,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor) {
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
bool waitForAllInputsToClose) {
auto [outputReceiver, outputSender] = Channel<TValue>::create();
auto* processor = new detail::MergeProcessor<TValue>(
std::move(outputSender), std::move(executor));
processor->start(std::move(inputReceivers));
if (waitForAllInputsToClose) {
auto* processor = new detail::MergeProcessor<TValue, true>(
std::move(outputSender), std::move(executor));
processor->start(std::move(inputReceivers));
} else {
auto* processor = new detail::MergeProcessor<TValue, false>(
std::move(outputSender), std::move(executor));
processor->start(std::move(inputReceivers));
}
return std::move(outputReceiver);
}
} // namespace channels
Expand Down
8 changes: 7 additions & 1 deletion folly/experimental/channels/Merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ namespace channels {
*
* @param executor: A SequencedExecutor used to merge input values.
*
* @param waitForAllInputsToClose: When true, if any input receiver closes
* without an exception, the channel continues to merge values from the other
* input receivers until all input receivers are closed. If false, the channel
* closes as soon as any input receiver has closed.
*
* Example:
*
* // Example function that returns a list of receivers
Expand All @@ -46,7 +51,8 @@ namespace channels {
template <typename TReceiver, typename TValue = typename TReceiver::ValueType>
Receiver<TValue> merge(
std::vector<TReceiver> inputReceivers,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor);
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
bool waitForAllInputsToClose = true);
} // namespace channels
} // namespace folly

Expand Down
37 changes: 36 additions & 1 deletion folly/experimental/channels/test/MergeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TEST_F(MergeFixture, ReceiveValues_ReturnMergedValues) {
executor_.drain();
}

TEST_F(MergeFixture, OneInputClosed_ContinuesMerging) {
TEST_F(MergeFixture, OneInputClosed_WaitForAllInputsToClose_ContinuesMerging) {
auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create();
auto [receiver3, sender3] = Channel<int>::create();
Expand Down Expand Up @@ -113,6 +113,41 @@ TEST_F(MergeFixture, OneInputClosed_ContinuesMerging) {
executor_.drain();
}

TEST_F(MergeFixture, OneInputClosed_DoNotWaitForAllInputsToClose_StopsMerging) {
auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create();
auto [receiver3, sender3] = Channel<int>::create();
auto mergedReceiver = merge(
toVector(
std::move(receiver1), std::move(receiver2), std::move(receiver3)),
&executor_,
false /* waitForAllInputsToClose */);

EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onClosed());

auto callbackHandle = processValues(std::move(mergedReceiver));
executor_.drain();

sender1.write(1);
sender2.write(2);
sender3.write(3);
std::move(sender3).close();
executor_.drain();

sender1.write(4);
executor_.drain();

sender2.write(5);
executor_.drain();

std::move(sender1).close();
std::move(sender2).close();
executor_.drain();
}

TEST_F(MergeFixture, OneInputThrows_OutputClosedWithException) {
auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create();
Expand Down

0 comments on commit f675595

Please sign in to comment.