Skip to content

Commit

Permalink
Add more tests for trampoline scheduler (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcw165 authored Sep 8, 2022
1 parent 9be8285 commit 2a52ebd
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/rpp/rpp/schedulers/trampoline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class trampoline final : public details::scheduler_tag
}

static time_point now() { return clock_type::now(); }

private:
rpp::composite_subscription m_sub;
};
Expand Down
114 changes: 98 additions & 16 deletions src/tests/test_schedulers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,8 @@ SCENARIO("New thread scheduler depends on subscription")

SCENARIO("trampoline scheduler dispatches task in the same thread")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

WHEN("supply a simple job")
{
Expand Down Expand Up @@ -460,7 +459,7 @@ SCENARIO("trampoline scheduler dispatches task in the same thread")
}
}

static std::vector<std::string> trampoline_expected_simulate_nested_scheduling(std::string thread_id)
static std::vector<std::string> trampoline_expected_simulate_nested_scheduling(const std::string& thread_id)
{
return std::vector<std::string>{
"Task 1 starts "s + thread_id,
Expand All @@ -472,9 +471,8 @@ static std::vector<std::string> trampoline_expected_simulate_nested_scheduling(s
}
SCENARIO("trampoline scheduler defers tasks in order")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);
rpp::subscription_guard guard{sub};

WHEN("supply a job that schedules inner job that schedules inner job")
Expand All @@ -494,9 +492,8 @@ SCENARIO("trampoline scheduler is thread local")
{
WHEN("two threads are using the same trampoline scheduler")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);
rpp::subscription_guard guard{sub};

std::vector<std::string> call_stack_1;
Expand All @@ -517,9 +514,8 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription")
{
GIVEN("unsubscribed subscription")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

sub.unsubscribe();

Expand All @@ -535,9 +531,8 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription")

GIVEN("asynchronously unsubscribes subscription")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

THEN("shall not see execution of inner schedulable")
{
Expand All @@ -554,6 +549,93 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription")
});
}
}

WHEN("job unsubscribes subscription but still reschedules itself")
{
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

auto run_counter{0};
auto rescheduling_delay = std::chrono::nanoseconds{0};

worker.schedule([&]() -> rpp::schedulers::optional_duration
{
++run_counter;

// Schedule a job so that job queue is not empty and hence we could cover more code.
worker.schedule([&]() -> rpp::schedulers::optional_duration
{
++run_counter;
return std::nullopt;
});

sub.unsubscribe();
return rescheduling_delay;
});

THEN("shall see job runs once")
{
CHECK(run_counter == 1);
}
}

WHEN("unsubscribe subscription when draining queue sleeps for the deferred job")
{
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

auto run_counter{0};
const auto delay_short = std::chrono::duration_cast<rpp::schedulers::duration>(std::chrono::seconds{1});
const auto delay_long = std::chrono::duration_cast<rpp::schedulers::duration>(std::chrono::seconds{2});

worker.schedule([&]() -> rpp::schedulers::optional_duration
{
++run_counter;

// Schedule a recurrently deferred job so that job queue is not empty; hence we could test draining queue.
worker.schedule(delay_long, [&]() -> rpp::schedulers::optional_duration
{
++run_counter;
return delay_long;
});

// Sleep for enough of time so that trampoline thread is sleeping for the future job that we just scheduled.
std::thread([&]
{
std::this_thread::sleep_for(delay_short);
sub.unsubscribe();
}).detach();

return std::nullopt;
});

THEN("shall see job runs once")
{
CHECK(run_counter == 1);
}
}
}

SCENARIO("trampoline respects the given time-point")
{
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

auto delay = std::chrono::duration_cast<rpp::schedulers::duration>(std::chrono::seconds{1});

WHEN("schedule at future")
{
auto now = rpp::schedulers::clock_type::now();
auto future = now + delay;
worker.schedule(future, [&]() -> rpp::schedulers::optional_duration
{
THEN("this thread should sleep for the delay duration")
{
CHECK(rpp::schedulers::clock_type::now() >= now + delay);
}
return std::nullopt;
});
}
}

SCENARIO("RunLoop scheduler dispatches tasks only manually")
Expand Down

0 comments on commit 2a52ebd

Please sign in to comment.