diff --git a/cmake/core-files.cmake b/cmake/core-files.cmake index 1a86da6f247..8c31df4c51b 100644 --- a/cmake/core-files.cmake +++ b/cmake/core-files.cmake @@ -554,6 +554,7 @@ set(MBGL_CORE_FILES include/mbgl/util/run_loop.hpp include/mbgl/util/size.hpp include/mbgl/util/string.hpp + include/mbgl/util/thread.hpp include/mbgl/util/tileset.hpp include/mbgl/util/timer.hpp include/mbgl/util/traits.hpp @@ -610,7 +611,6 @@ set(MBGL_CORE_FILES src/mbgl/util/stopwatch.cpp src/mbgl/util/stopwatch.hpp src/mbgl/util/string.cpp - src/mbgl/util/thread.hpp src/mbgl/util/thread_local.hpp src/mbgl/util/throttler.cpp src/mbgl/util/throttler.hpp diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp index a0df19208e6..93de4a948fa 100644 --- a/include/mbgl/actor/actor.hpp +++ b/include/mbgl/actor/actor.hpp @@ -45,6 +45,11 @@ namespace mbgl { purpose of the actor model: prohibiting direct concurrent access to shared state. */ + +namespace util { +template class Thread; +} // namespace util + template class Actor : public util::noncopyable { public: @@ -91,6 +96,9 @@ class Actor : public util::noncopyable { } private: + template + friend class util::Thread; + std::shared_ptr mailbox; Object object; }; diff --git a/src/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp similarity index 85% rename from src/mbgl/util/thread.hpp rename to include/mbgl/util/thread.hpp index 572f46080e1..c0cbc532ce0 100644 --- a/src/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -96,9 +96,11 @@ class Thread : public Scheduler { // sent to a paused `Object` will be queued and only processed after // `resume()` is called. void pause() { - MBGL_VERIFY_THREAD(tid); + std::unique_lock lock(pauseMutex); - assert(!paused); + if (paused) { + return; + } paused = std::make_unique>(); resumed = std::make_unique>(); @@ -116,9 +118,11 @@ class Thread : public Scheduler { // Resumes the `Object` thread previously paused by `pause()`. void resume() { - MBGL_VERIFY_THREAD(tid); + std::unique_lock lock(pauseMutex); - assert(paused); + if (!paused) { + return; + } resumed->set_value(); @@ -127,6 +131,13 @@ class Thread : public Scheduler { } private: + template + friend class BlockingThreadGuard; + + Object& getObject() { + return object->object; + } + MBGL_STORE_THREAD(tid); void schedule(std::weak_ptr mailbox) override { @@ -153,11 +164,33 @@ class Thread : public Scheduler { std::thread thread; std::unique_ptr> object; + std::mutex pauseMutex; std::unique_ptr> paused; std::unique_ptr> resumed; util::RunLoop* loop = nullptr; }; + +template +class BlockingThreadGuard { +public: + BlockingThreadGuard(Thread& thread_) + : thread(thread_) { + thread.pause(); + } + + ~BlockingThreadGuard() { + thread.resume(); + } + + Object& object() { + return thread.getObject(); + } + +private: + Thread& thread; +}; + } // namespace util } // namespace mbgl diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp index 76fb5ce3f02..1bc55eadda8 100644 --- a/test/util/thread.test.cpp +++ b/test/util/thread.test.cpp @@ -6,7 +6,9 @@ #include #include +#include #include +#include using namespace mbgl; using namespace mbgl::util; @@ -275,3 +277,62 @@ TEST(Thread, PauseResume) { thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); loop.run(); } + +TEST(Thread, PauseResumeMultiThreaded) { + RunLoop loop; + + // Thread to be paused + Thread test("Test"); + + std::promise thread1Complete; + auto future = thread1Complete.get_future(); + std::thread thread1 {[&, promise = std::move(thread1Complete)]() mutable { + // Pause the thread + test.pause(); + promise.set_value(); + }}; + + // Wait for the test thread to be paused + future.wait(); + + std::thread thread2 {[&]() { + // Pause from this thread as well and resume + test.pause(); + test.resume(); + }}; + + // Queue a message at the end of test thread's queue. + test.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + loop.run(); + + // Wait for threads + thread1.join(); + thread2.join(); +} + +TEST(Thread, DirectAccess) { + + Thread test("Test"); + + // Use the thread's object directly + std::atomic flag { false }; + auto guard = std::make_unique>( test ); + guard->object().send([&] { flag = true; }); + ASSERT_TRUE(flag); + + // Ensure messages queued up are processed + std::atomic message1Consumed { false }; + test.actor().invoke(&TestWorker::send, [&]() { message1Consumed = true; }); + + // Release the guard + guard.reset(); + + // Ensure messages send after releasing the guard are processed + std::atomic message2Consumed { false }; + test.actor().invoke(&TestWorker::send, [&]() { message2Consumed = true; }); + + while (!message1Consumed && !message2Consumed) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(10ms); + }; +}