From 2a52ebd27a60edacd69d89b9a07c0872787ae183 Mon Sep 17 00:00:00 2001 From: TC Wang Date: Thu, 8 Sep 2022 12:09:44 -0700 Subject: [PATCH] Add more tests for trampoline scheduler (#264) --- .../rpp/schedulers/trampoline_scheduler.hpp | 1 + src/tests/test_schedulers.cpp | 114 +++++++++++++++--- 2 files changed, 99 insertions(+), 16 deletions(-) diff --git a/src/rpp/rpp/schedulers/trampoline_scheduler.hpp b/src/rpp/rpp/schedulers/trampoline_scheduler.hpp index c7d97e741..5da72d0ac 100644 --- a/src/rpp/rpp/schedulers/trampoline_scheduler.hpp +++ b/src/rpp/rpp/schedulers/trampoline_scheduler.hpp @@ -80,6 +80,7 @@ class trampoline final : public details::scheduler_tag } static time_point now() { return clock_type::now(); } + private: rpp::composite_subscription m_sub; }; diff --git a/src/tests/test_schedulers.cpp b/src/tests/test_schedulers.cpp index ee65a8913..7596aa8d6 100644 --- a/src/tests/test_schedulers.cpp +++ b/src/tests/test_schedulers.cpp @@ -417,9 +417,8 @@ SCENARIO("New thread scheduler depends on subscription") SCENARIO("trampoline scheduler dispatches task in the same thread") { - auto scheduler = rpp::schedulers::trampoline{}; - auto sub = rpp::composite_subscription{}; - auto worker = scheduler.create_worker(sub); + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); WHEN("supply a simple job") { @@ -460,7 +459,7 @@ SCENARIO("trampoline scheduler dispatches task in the same thread") } } -static std::vector trampoline_expected_simulate_nested_scheduling(std::string thread_id) +static std::vector trampoline_expected_simulate_nested_scheduling(const std::string& thread_id) { return std::vector{ "Task 1 starts "s + thread_id, @@ -472,9 +471,8 @@ static std::vector trampoline_expected_simulate_nested_scheduling(s } SCENARIO("trampoline scheduler defers tasks in order") { - auto scheduler = rpp::schedulers::trampoline{}; - auto sub = rpp::composite_subscription{}; - auto worker = scheduler.create_worker(sub); + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); rpp::subscription_guard guard{sub}; WHEN("supply a job that schedules inner job that schedules inner job") @@ -494,9 +492,8 @@ SCENARIO("trampoline scheduler is thread local") { WHEN("two threads are using the same trampoline scheduler") { - auto scheduler = rpp::schedulers::trampoline{}; - auto sub = rpp::composite_subscription{}; - auto worker = scheduler.create_worker(sub); + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); rpp::subscription_guard guard{sub}; std::vector call_stack_1; @@ -517,9 +514,8 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription") { GIVEN("unsubscribed subscription") { - auto scheduler = rpp::schedulers::trampoline{}; - auto sub = rpp::composite_subscription{}; - auto worker = scheduler.create_worker(sub); + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); sub.unsubscribe(); @@ -535,9 +531,8 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription") GIVEN("asynchronously unsubscribes subscription") { - auto scheduler = rpp::schedulers::trampoline{}; - auto sub = rpp::composite_subscription{}; - auto worker = scheduler.create_worker(sub); + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); THEN("shall not see execution of inner schedulable") { @@ -554,6 +549,93 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription") }); } } + + WHEN("job unsubscribes subscription but still reschedules itself") + { + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); + + auto run_counter{0}; + auto rescheduling_delay = std::chrono::nanoseconds{0}; + + worker.schedule([&]() -> rpp::schedulers::optional_duration + { + ++run_counter; + + // Schedule a job so that job queue is not empty and hence we could cover more code. + worker.schedule([&]() -> rpp::schedulers::optional_duration + { + ++run_counter; + return std::nullopt; + }); + + sub.unsubscribe(); + return rescheduling_delay; + }); + + THEN("shall see job runs once") + { + CHECK(run_counter == 1); + } + } + + WHEN("unsubscribe subscription when draining queue sleeps for the deferred job") + { + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); + + auto run_counter{0}; + const auto delay_short = std::chrono::duration_cast(std::chrono::seconds{1}); + const auto delay_long = std::chrono::duration_cast(std::chrono::seconds{2}); + + worker.schedule([&]() -> rpp::schedulers::optional_duration + { + ++run_counter; + + // Schedule a recurrently deferred job so that job queue is not empty; hence we could test draining queue. + worker.schedule(delay_long, [&]() -> rpp::schedulers::optional_duration + { + ++run_counter; + return delay_long; + }); + + // Sleep for enough of time so that trampoline thread is sleeping for the future job that we just scheduled. + std::thread([&] + { + std::this_thread::sleep_for(delay_short); + sub.unsubscribe(); + }).detach(); + + return std::nullopt; + }); + + THEN("shall see job runs once") + { + CHECK(run_counter == 1); + } + } +} + +SCENARIO("trampoline respects the given time-point") +{ + auto sub = rpp::composite_subscription{}; + auto worker = rpp::schedulers::trampoline::create_worker(sub); + + auto delay = std::chrono::duration_cast(std::chrono::seconds{1}); + + WHEN("schedule at future") + { + auto now = rpp::schedulers::clock_type::now(); + auto future = now + delay; + worker.schedule(future, [&]() -> rpp::schedulers::optional_duration + { + THEN("this thread should sleep for the delay duration") + { + CHECK(rpp::schedulers::clock_type::now() >= now + delay); + } + return std::nullopt; + }); + } } SCENARIO("RunLoop scheduler dispatches tasks only manually")