From 95dcc200a489d9a04021e6809192a68df37dfaf9 Mon Sep 17 00:00:00 2001 From: Cyprien Noel Date: Mon, 27 Apr 2015 19:48:10 -0700 Subject: [PATCH 1/2] Thread-local Caffe --- include/caffe/common.hpp | 13 +++++------ include/caffe/internal_thread.hpp | 17 +++++++++++--- src/caffe/common.cpp | 11 ++++++++- src/caffe/internal_thread.cpp | 30 ++++++++++++++++++++++--- src/caffe/test/test_internal_thread.cpp | 30 +++++++++++++++++++++++++ 5 files changed, 87 insertions(+), 14 deletions(-) diff --git a/include/caffe/common.hpp b/include/caffe/common.hpp index 5f86bc2625b..3fa81431314 100644 --- a/include/caffe/common.hpp +++ b/include/caffe/common.hpp @@ -98,12 +98,12 @@ void GlobalInit(int* pargc, char*** pargv); class Caffe { public: ~Caffe(); - inline static Caffe& Get() { - if (!singleton_.get()) { - singleton_.reset(new Caffe()); - } - return *singleton_; - } + + // Thread local context for Caffe. Moved to common.cpp instead of + // including boost/thread.hpp to avoid a boost/NVCC issues (#1009, #1010) + // on OSX. Also fails on Linux with CUDA 7.0.18. + static Caffe& Get(); + enum Brew { CPU, GPU }; // This random number generator facade hides boost and CUDA rng @@ -158,7 +158,6 @@ class Caffe { shared_ptr random_generator_; Brew mode_; - static shared_ptr singleton_; private: // The private constructor to avoid duplicate instantiation. diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp index 815ca54605e..fb4cb257d8b 100644 --- a/include/caffe/internal_thread.hpp +++ b/include/caffe/internal_thread.hpp @@ -14,14 +14,19 @@ namespace caffe { /** * Virtual class encapsulate boost::thread for use in base class * The child class will acquire the ability to run a single thread, - * by reimplementing the virutal function InternalThreadEntry. + * by reimplementing the virtual function InternalThreadEntry. */ class InternalThread { public: - InternalThread() : thread_() {} + InternalThread(); virtual ~InternalThread(); - /** Returns true if the thread was successfully started. **/ + /** + * Caffe's thread local state will be initialized using the current + * thread values, e.g. device id, solver index etc. The random seed + * is initialized using caffe_rng_rand. + * Will not return until the internal thread has exited. + */ bool StartInternalThread(); /** Will not return until the internal thread has exited. */ @@ -34,7 +39,13 @@ class InternalThread { with the code you want your thread to run. */ virtual void InternalThreadEntry() {} + private: + void entry(); + shared_ptr thread_; + int device_; + Caffe::Brew mode_; + int rand_seed_; }; } // namespace caffe diff --git a/src/caffe/common.cpp b/src/caffe/common.cpp index af96cac40aa..0215c76ef76 100644 --- a/src/caffe/common.cpp +++ b/src/caffe/common.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -7,7 +8,15 @@ namespace caffe { -shared_ptr Caffe::singleton_; +// Make sure each thread can have different values. +static boost::thread_specific_ptr thread_instance_; + +Caffe& Caffe::Get() { + if (!thread_instance_.get()) { + thread_instance_.reset(new Caffe()); + } + return *(thread_instance_.get()); +} // random seeding int64_t cluster_seedgen(void) { diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp index c2d19d433b4..193ab093255 100644 --- a/src/caffe/internal_thread.cpp +++ b/src/caffe/internal_thread.cpp @@ -1,8 +1,17 @@ #include + #include "caffe/internal_thread.hpp" +#include "caffe/util/math_functions.hpp" namespace caffe { +InternalThread::InternalThread() + : thread_(), + device_(), + mode_(), + rand_seed_() { +} + InternalThread::~InternalThread() { WaitForInternalThreadToExit(); } @@ -11,20 +20,35 @@ bool InternalThread::is_started() const { return thread_.get() != NULL && thread_->joinable(); } - bool InternalThread::StartInternalThread() { if (!WaitForInternalThreadToExit()) { return false; } + +#ifndef CPU_ONLY + CUDA_CHECK(cudaGetDevice(&device_)); +#endif + mode_ = Caffe::mode(); + rand_seed_ = caffe_rng_rand(); + try { - thread_.reset( - new boost::thread(&InternalThread::InternalThreadEntry, this)); + thread_.reset(new boost::thread(&InternalThread::entry, this)); } catch (...) { return false; } return true; } +void InternalThread::entry() { +#ifndef CPU_ONLY + CUDA_CHECK(cudaSetDevice(device_)); +#endif + Caffe::set_mode(mode_); + Caffe::set_random_seed(rand_seed_); + + InternalThreadEntry(); +} + /** Will not return until the internal thread has exited. */ bool InternalThread::WaitForInternalThreadToExit() { if (is_started()) { diff --git a/src/caffe/test/test_internal_thread.cpp b/src/caffe/test/test_internal_thread.cpp index 31882b6db1d..390c8eda19b 100644 --- a/src/caffe/test/test_internal_thread.cpp +++ b/src/caffe/test/test_internal_thread.cpp @@ -2,6 +2,7 @@ #include "gtest/gtest.h" #include "caffe/internal_thread.hpp" +#include "caffe/util/math_functions.hpp" #include "caffe/test/test_caffe_main.hpp" @@ -19,5 +20,34 @@ TEST_F(InternalThreadTest, TestStartAndExit) { EXPECT_FALSE(thread.is_started()); } +class TestThreadA : public InternalThread { + void InternalThreadEntry() { + EXPECT_EQ(4244559767, caffe_rng_rand()); + } +}; + +class TestThreadB : public InternalThread { + void InternalThreadEntry() { + EXPECT_EQ(1726478280, caffe_rng_rand()); + } +}; + +TEST_F(InternalThreadTest, TestRandomSeed) { + TestThreadA t1; + Caffe::set_random_seed(9658361); + EXPECT_TRUE(t1.StartInternalThread()); + EXPECT_TRUE(t1.WaitForInternalThreadToExit()); + + TestThreadA t2; + Caffe::set_random_seed(9658361); + EXPECT_TRUE(t2.StartInternalThread()); + EXPECT_TRUE(t2.WaitForInternalThreadToExit()); + + TestThreadB t3; + Caffe::set_random_seed(3435563); + EXPECT_TRUE(t3.StartInternalThread()); + EXPECT_TRUE(t3.WaitForInternalThreadToExit()); +} + } // namespace caffe From 1f987e167957bc5b00a937444eec89541742ff49 Mon Sep 17 00:00:00 2001 From: Cyprien Noel Date: Tue, 28 Apr 2015 14:46:20 -0700 Subject: [PATCH 2/2] Changed the way threads are started and stopped - Interrupt the thread before waiting on join - Provide a method for looping threads to exit on demand - CHECK if start and stop succeed instead of returning an error --- include/caffe/internal_thread.hpp | 8 +++--- src/caffe/internal_thread.cpp | 33 ++++++++++++++----------- src/caffe/layers/base_data_layer.cpp | 4 +-- src/caffe/test/test_internal_thread.cpp | 16 ++++++------ 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp index fb4cb257d8b..2d67d7001dd 100644 --- a/include/caffe/internal_thread.hpp +++ b/include/caffe/internal_thread.hpp @@ -25,12 +25,11 @@ class InternalThread { * Caffe's thread local state will be initialized using the current * thread values, e.g. device id, solver index etc. The random seed * is initialized using caffe_rng_rand. - * Will not return until the internal thread has exited. */ - bool StartInternalThread(); + void StartInternalThread(); /** Will not return until the internal thread has exited. */ - bool WaitForInternalThreadToExit(); + void StopInternalThread(); bool is_started() const; @@ -39,6 +38,9 @@ class InternalThread { with the code you want your thread to run. */ virtual void InternalThreadEntry() {} + /* Should be tested when running loops to exit when requested. */ + bool must_stop(); + private: void entry(); diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp index 193ab093255..2b646d12397 100644 --- a/src/caffe/internal_thread.cpp +++ b/src/caffe/internal_thread.cpp @@ -1,4 +1,5 @@ #include +#include #include "caffe/internal_thread.hpp" #include "caffe/util/math_functions.hpp" @@ -13,17 +14,22 @@ InternalThread::InternalThread() } InternalThread::~InternalThread() { - WaitForInternalThreadToExit(); + StopInternalThread(); } bool InternalThread::is_started() const { - return thread_.get() != NULL && thread_->joinable(); + return thread_ && thread_->joinable(); } -bool InternalThread::StartInternalThread() { - if (!WaitForInternalThreadToExit()) { - return false; - } +bool InternalThread::must_stop() { + return thread_ && thread_->interruption_requested(); +} + +void InternalThread::StartInternalThread() { + // TODO switch to failing once Caffe prefetch thread is persistent. + // Threads should not be started and stopped repeatedly. + // CHECK(!is_started()); + StopInternalThread(); #ifndef CPU_ONLY CUDA_CHECK(cudaGetDevice(&device_)); @@ -33,10 +39,9 @@ bool InternalThread::StartInternalThread() { try { thread_.reset(new boost::thread(&InternalThread::entry, this)); - } catch (...) { - return false; + } catch (std::exception& e) { + CHECK(false) << e.what(); } - return true; } void InternalThread::entry() { @@ -49,16 +54,16 @@ void InternalThread::entry() { InternalThreadEntry(); } -/** Will not return until the internal thread has exited. */ -bool InternalThread::WaitForInternalThreadToExit() { +void InternalThread::StopInternalThread() { if (is_started()) { + thread_->interrupt(); try { thread_->join(); - } catch (...) { - return false; + } catch (boost::thread_interrupted&) { + } catch (std::exception& e) { + CHECK(false) << e.what(); } } - return true; } } // namespace caffe diff --git a/src/caffe/layers/base_data_layer.cpp b/src/caffe/layers/base_data_layer.cpp index 931e4a9c0ab..992fcb3f2c9 100644 --- a/src/caffe/layers/base_data_layer.cpp +++ b/src/caffe/layers/base_data_layer.cpp @@ -47,12 +47,12 @@ void BasePrefetchingDataLayer::LayerSetUp( template void BasePrefetchingDataLayer::CreatePrefetchThread() { this->data_transformer_->InitRand(); - CHECK(StartInternalThread()) << "Thread execution failed"; + StartInternalThread(); } template void BasePrefetchingDataLayer::JoinPrefetchThread() { - CHECK(WaitForInternalThreadToExit()) << "Thread joining failed"; + StopInternalThread(); } template diff --git a/src/caffe/test/test_internal_thread.cpp b/src/caffe/test/test_internal_thread.cpp index 390c8eda19b..93f1cc541cd 100644 --- a/src/caffe/test/test_internal_thread.cpp +++ b/src/caffe/test/test_internal_thread.cpp @@ -14,9 +14,9 @@ class InternalThreadTest : public ::testing::Test {}; TEST_F(InternalThreadTest, TestStartAndExit) { InternalThread thread; EXPECT_FALSE(thread.is_started()); - EXPECT_TRUE(thread.StartInternalThread()); + thread.StartInternalThread(); EXPECT_TRUE(thread.is_started()); - EXPECT_TRUE(thread.WaitForInternalThreadToExit()); + thread.StopInternalThread(); EXPECT_FALSE(thread.is_started()); } @@ -35,18 +35,18 @@ class TestThreadB : public InternalThread { TEST_F(InternalThreadTest, TestRandomSeed) { TestThreadA t1; Caffe::set_random_seed(9658361); - EXPECT_TRUE(t1.StartInternalThread()); - EXPECT_TRUE(t1.WaitForInternalThreadToExit()); + t1.StartInternalThread(); + t1.StopInternalThread(); TestThreadA t2; Caffe::set_random_seed(9658361); - EXPECT_TRUE(t2.StartInternalThread()); - EXPECT_TRUE(t2.WaitForInternalThreadToExit()); + t2.StartInternalThread(); + t2.StopInternalThread(); TestThreadB t3; Caffe::set_random_seed(3435563); - EXPECT_TRUE(t3.StartInternalThread()); - EXPECT_TRUE(t3.WaitForInternalThreadToExit()); + t3.StartInternalThread(); + t3.StopInternalThread(); } } // namespace caffe