Skip to content

Commit

Permalink
Polish race-condition. test with error and completed events.
Browse files Browse the repository at this point in the history
  • Loading branch information
tcw165 committed Sep 6, 2022
1 parent 95bb3c1 commit b313e53
Showing 1 changed file with 45 additions and 16 deletions.
61 changes: 45 additions & 16 deletions src/tests/test_take_until.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,53 @@ SCENARIO("take_until mirrors both source observable and trigger observable", "[t

SCENARIO("take_until can handle race condition")
{
GIVEN("source observable and trigger observable emits in different thread")
GIVEN("observer consumes on_next slower than source sends on_next and on_completed events")
{
std::atomic_bool on_completed_called{false};

rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::trampoline{})
.take_until(rpp::source::interval(std::chrono::seconds{2}, rpp::schedulers::new_thread{}))
.subscribe([&](const auto&)
{
CHECK(!on_completed_called);
std::this_thread::sleep_for(std::chrono::seconds(3));
CHECK(!on_completed_called);
} /* on_next */,
{} /* on_error */,
[&]()
{
on_completed_called = true;
} /* on_completed */);

CHECK(on_completed_called);
THEN("on_completed can not interleave with on_next")
{
rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::trampoline{})
.take_until(rpp::source::interval(std::chrono::seconds{2}, rpp::schedulers::new_thread{}))
.as_blocking()
.subscribe([&](const auto&)
{
CHECK(!on_completed_called);
std::this_thread::sleep_for(std::chrono::seconds(3));
CHECK(!on_completed_called);
} /* on_next */,
{} /* on_error */,
[&]()
{
on_completed_called = true;
} /* on_completed */);

CHECK(on_completed_called);
}
}

GIVEN("observer consumes on_next slower than source sends on_next and on_error events")
{
std::atomic_bool on_error_called{false};
auto subject = rpp::subjects::publish_subject<int>{};

THEN("on_error can't interleave with on_next")
{
rpp::source::just(1, 1, 1)
.take_until(subject.get_observable())
.as_blocking()
.subscribe([&](auto &&)
{
CHECK(!on_error_called);
std::thread{[&] {
subject.get_subscriber().on_error(std::exception_ptr{});
}}.detach();
std::this_thread::sleep_for(std::chrono::seconds{1});
CHECK(!on_error_called);
} /* on_next */,
[&](auto) { on_error_called = true; } /* on_error */,
{} /* on_completed */);
CHECK(on_error_called);
}
}
}

0 comments on commit b313e53

Please sign in to comment.