Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests for trampoline scheduler #264

Merged
merged 5 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rpp/rpp/schedulers/trampoline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class trampoline final : public details::scheduler_tag
}

static time_point now() { return clock_type::now(); }

private:
rpp::composite_subscription m_sub;
};
Expand Down
114 changes: 98 additions & 16 deletions src/tests/test_schedulers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,8 @@ SCENARIO("New thread scheduler depends on subscription")

SCENARIO("trampoline scheduler dispatches task in the same thread")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

WHEN("supply a simple job")
{
Expand Down Expand Up @@ -460,7 +459,7 @@ SCENARIO("trampoline scheduler dispatches task in the same thread")
}
}

static std::vector<std::string> trampoline_expected_simulate_nested_scheduling(std::string thread_id)
static std::vector<std::string> trampoline_expected_simulate_nested_scheduling(const std::string& thread_id)
{
return std::vector<std::string>{
"Task 1 starts "s + thread_id,
Expand All @@ -472,9 +471,8 @@ static std::vector<std::string> trampoline_expected_simulate_nested_scheduling(s
}
SCENARIO("trampoline scheduler defers tasks in order")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);
rpp::subscription_guard guard{sub};

WHEN("supply a job that schedules inner job that schedules inner job")
Expand All @@ -494,9 +492,8 @@ SCENARIO("trampoline scheduler is thread local")
{
WHEN("two threads are using the same trampoline scheduler")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);
rpp::subscription_guard guard{sub};

std::vector<std::string> call_stack_1;
Expand All @@ -517,9 +514,8 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription")
{
GIVEN("unsubscribed subscription")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

sub.unsubscribe();

Expand All @@ -535,9 +531,8 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription")

GIVEN("asynchronously unsubscribes subscription")
{
auto scheduler = rpp::schedulers::trampoline{};
auto sub = rpp::composite_subscription{};
auto worker = scheduler.create_worker(sub);
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

THEN("shall not see execution of inner schedulable")
{
Expand All @@ -554,6 +549,93 @@ SCENARIO("trampoline scheduler regards unsubscribed subscription")
});
}
}

WHEN("job unsubscribes subscription but still reschedules itself")
{
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

auto run_counter{0};
auto rescheduling_delay = std::chrono::nanoseconds{0};

worker.schedule([&]() -> rpp::schedulers::optional_duration
{
++run_counter;

// Schedule a job so that job queue is not empty and hence we could cover more code.
worker.schedule([&]() -> rpp::schedulers::optional_duration
{
++run_counter;
return std::nullopt;
});

sub.unsubscribe();
return rescheduling_delay;
});

THEN("shall see job runs once")
{
CHECK(run_counter == 1);
}
}

WHEN("unsubscribe subscription when draining queue sleeps for the deferred job")
{
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

auto run_counter{0};
const auto delay_short = std::chrono::duration_cast<rpp::schedulers::duration>(std::chrono::seconds{1});
const auto delay_long = std::chrono::duration_cast<rpp::schedulers::duration>(std::chrono::seconds{2});

worker.schedule([&]() -> rpp::schedulers::optional_duration
{
++run_counter;

// Schedule a recurrently deferred job so that job queue is not empty; hence we could test draining queue.
worker.schedule(delay_long, [&]() -> rpp::schedulers::optional_duration
{
++run_counter;
return delay_long;
});

// Sleep for enough of time so that trampoline thread is sleeping for the future job that we just scheduled.
std::thread([&]
{
std::this_thread::sleep_for(delay_short);
sub.unsubscribe();
}).detach();

return std::nullopt;
});

THEN("shall see job runs once")
{
CHECK(run_counter == 1);
}
}
}

SCENARIO("trampoline respects the given time-point")
{
auto sub = rpp::composite_subscription{};
auto worker = rpp::schedulers::trampoline::create_worker(sub);

auto delay = std::chrono::duration_cast<rpp::schedulers::duration>(std::chrono::seconds{1});

WHEN("schedule at future")
{
auto now = rpp::schedulers::clock_type::now();
auto future = now + delay;
worker.schedule(future, [&]() -> rpp::schedulers::optional_duration
{
THEN("this thread should sleep for the delay duration")
{
CHECK(rpp::schedulers::clock_type::now() >= now + delay);
}
return std::nullopt;
});
}
}

SCENARIO("RunLoop scheduler dispatches tasks only manually")
Expand Down