Skip to content
This repository has been archived by the owner on Aug 8, 2023. It is now read-only.

Direct, thread-safe, access to thread managed object #9750

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/core-files.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ set(MBGL_CORE_FILES
include/mbgl/util/run_loop.hpp
include/mbgl/util/size.hpp
include/mbgl/util/string.hpp
include/mbgl/util/thread.hpp
include/mbgl/util/tileset.hpp
include/mbgl/util/timer.hpp
include/mbgl/util/traits.hpp
Expand Down Expand Up @@ -610,7 +611,6 @@ set(MBGL_CORE_FILES
src/mbgl/util/stopwatch.cpp
src/mbgl/util/stopwatch.hpp
src/mbgl/util/string.cpp
src/mbgl/util/thread.hpp
src/mbgl/util/thread_local.hpp
src/mbgl/util/throttler.cpp
src/mbgl/util/throttler.hpp
Expand Down
8 changes: 8 additions & 0 deletions include/mbgl/actor/actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ namespace mbgl {
purpose of the actor model: prohibiting direct concurrent access to shared state.
*/


namespace util {
template <class> class Thread;
} // namespace util

template <class Object>
class Actor : public util::noncopyable {
public:
Expand Down Expand Up @@ -91,6 +96,9 @@ class Actor : public util::noncopyable {
}

private:
template<typename U>
friend class util::Thread;

std::shared_ptr<Mailbox> mailbox;
Object object;
};
Expand Down
41 changes: 37 additions & 4 deletions src/mbgl/util/thread.hpp → include/mbgl/util/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ class Thread : public Scheduler {
// sent to a paused `Object` will be queued and only processed after
// `resume()` is called.
void pause() {
MBGL_VERIFY_THREAD(tid);
std::unique_lock<std::mutex> lock(pauseMutex);

assert(!paused);
if (paused) {
return;
}

paused = std::make_unique<std::promise<void>>();
resumed = std::make_unique<std::promise<void>>();
Expand All @@ -116,9 +118,11 @@ class Thread : public Scheduler {

// Resumes the `Object` thread previously paused by `pause()`.
void resume() {
MBGL_VERIFY_THREAD(tid);
std::unique_lock<std::mutex> lock(pauseMutex);

assert(paused);
if (!paused) {
return;
}

resumed->set_value();

Expand All @@ -127,6 +131,13 @@ class Thread : public Scheduler {
}

private:
template <class U>
friend class BlockingThreadGuard;

Object& getObject() {
return object->object;
}

MBGL_STORE_THREAD(tid);

void schedule(std::weak_ptr<Mailbox> mailbox) override {
Expand All @@ -153,11 +164,33 @@ class Thread : public Scheduler {
std::thread thread;
std::unique_ptr<Actor<Object>> object;

std::mutex pauseMutex;
std::unique_ptr<std::promise<void>> paused;
std::unique_ptr<std::promise<void>> resumed;

util::RunLoop* loop = nullptr;
};


template <class Object>
class BlockingThreadGuard {
public:
BlockingThreadGuard(Thread<Object>& thread_)
: thread(thread_) {
thread.pause();
}

~BlockingThreadGuard() {
thread.resume();
}

Object& object() {
return thread.getObject();
}

private:
Thread<Object>& thread;
};

} // namespace util
} // namespace mbgl
61 changes: 61 additions & 0 deletions test/util/thread.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <mbgl/util/timer.hpp>

#include <atomic>
#include <chrono>
#include <memory>
#include <thread>

using namespace mbgl;
using namespace mbgl::util;
Expand Down Expand Up @@ -275,3 +277,62 @@ TEST(Thread, PauseResume) {
thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
loop.run();
}

TEST(Thread, PauseResumeMultiThreaded) {
RunLoop loop;

// Thread to be paused
Thread<TestWorker> test("Test");

std::promise<void> thread1Complete;
auto future = thread1Complete.get_future();
std::thread thread1 {[&, promise = std::move(thread1Complete)]() mutable {
// Pause the thread
test.pause();
promise.set_value();
}};

// Wait for the test thread to be paused
future.wait();

std::thread thread2 {[&]() {
// Pause from this thread as well and resume
test.pause();
test.resume();
}};

// Queue a message at the end of test thread's queue.
test.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
loop.run();

// Wait for threads
thread1.join();
thread2.join();
}

TEST(Thread, DirectAccess) {

Thread<TestWorker> test("Test");

// Use the thread's object directly
std::atomic<bool> flag { false };
auto guard = std::make_unique<BlockingThreadGuard<TestWorker>>( test );
guard->object().send([&] { flag = true; });
ASSERT_TRUE(flag);

// Ensure messages queued up are processed
std::atomic<bool> message1Consumed { false };
test.actor().invoke(&TestWorker::send, [&]() { message1Consumed = true; });

// Release the guard
guard.reset();

// Ensure messages send after releasing the guard are processed
std::atomic<bool> message2Consumed { false };
test.actor().invoke(&TestWorker::send, [&]() { message2Consumed = true; });

while (!message1Consumed && !message2Consumed) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(10ms);
};
}