Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread start and stop revamp #2383

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions include/caffe/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ void GlobalInit(int* pargc, char*** pargv);
class Caffe {
public:
~Caffe();
inline static Caffe& Get() {
if (!singleton_.get()) {
singleton_.reset(new Caffe());
}
return *singleton_;
}

// Thread local context for Caffe. Moved to common.cpp instead of
// including boost/thread.hpp to avoid a boost/NVCC issues (#1009, #1010)
// on OSX. Also fails on Linux with CUDA 7.0.18.
static Caffe& Get();

enum Brew { CPU, GPU };

// This random number generator facade hides boost and CUDA rng
Expand Down Expand Up @@ -158,7 +158,6 @@ class Caffe {
shared_ptr<RNG> random_generator_;

Brew mode_;
static shared_ptr<Caffe> singleton_;

private:
// The private constructor to avoid duplicate instantiation.
Expand Down
23 changes: 18 additions & 5 deletions include/caffe/internal_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ namespace caffe {
/**
* Virtual class encapsulate boost::thread for use in base class
* The child class will acquire the ability to run a single thread,
* by reimplementing the virutal function InternalThreadEntry.
* by reimplementing the virtual function InternalThreadEntry.
*/
class InternalThread {
public:
InternalThread() : thread_() {}
InternalThread();
virtual ~InternalThread();

/** Returns true if the thread was successfully started. **/
bool StartInternalThread();
/**
* Caffe's thread local state will be initialized using the current
* thread values, e.g. device id, solver index etc. The random seed
* is initialized using caffe_rng_rand.
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a public method be deleted without being labeled as @deprecated for some time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's supposed to be implementation details of the thread mechanism used internally, so I would say yes.

void StartInternalThread();

/** Will not return until the internal thread has exited. */
bool WaitForInternalThreadToExit();
void StopInternalThread();

bool is_started() const;

Expand All @@ -34,7 +38,16 @@ class InternalThread {
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}

/* Should be tested when running loops to exit when requested. */
bool must_stop();

private:
void entry();

shared_ptr<boost::thread> thread_;
int device_;
Caffe::Brew mode_;
int rand_seed_;
};

} // namespace caffe
Expand Down
11 changes: 10 additions & 1 deletion src/caffe/common.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <boost/thread.hpp>
#include <glog/logging.h>
#include <cstdio>
#include <ctime>
Expand All @@ -7,7 +8,15 @@

namespace caffe {

shared_ptr<Caffe> Caffe::singleton_;
// Make sure each thread can have different values.
static boost::thread_specific_ptr<Caffe> thread_instance_;

Caffe& Caffe::Get() {
if (!thread_instance_.get()) {
thread_instance_.reset(new Caffe());
}
return *(thread_instance_.get());
}

// random seeding
int64_t cluster_seedgen(void) {
Expand Down
61 changes: 45 additions & 16 deletions src/caffe/internal_thread.cpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,69 @@
#include <boost/thread.hpp>
#include <exception>

#include "caffe/internal_thread.hpp"
#include "caffe/util/math_functions.hpp"

namespace caffe {

InternalThread::InternalThread()
: thread_(),
device_(),
mode_(),
rand_seed_() {
}

InternalThread::~InternalThread() {
WaitForInternalThreadToExit();
StopInternalThread();
}

bool InternalThread::is_started() const {
return thread_.get() != NULL && thread_->joinable();
return thread_ && thread_->joinable();
}

bool InternalThread::must_stop() {
return thread_ && thread_->interruption_requested();
}

void InternalThread::StartInternalThread() {
// TODO switch to failing once Caffe prefetch thread is persistent.
// Threads should not be started and stopped repeatedly.
// CHECK(!is_started());
StopInternalThread();

#ifndef CPU_ONLY
CUDA_CHECK(cudaGetDevice(&device_));
#endif
mode_ = Caffe::mode();
rand_seed_ = caffe_rng_rand();

bool InternalThread::StartInternalThread() {
if (!WaitForInternalThreadToExit()) {
return false;
}
try {
thread_.reset(
new boost::thread(&InternalThread::InternalThreadEntry, this));
} catch (...) {
return false;
thread_.reset(new boost::thread(&InternalThread::entry, this));
} catch (std::exception& e) {
CHECK(false) << e.what();
}
return true;
}

/** Will not return until the internal thread has exited. */
bool InternalThread::WaitForInternalThreadToExit() {
void InternalThread::entry() {
#ifndef CPU_ONLY
CUDA_CHECK(cudaSetDevice(device_));
#endif
Caffe::set_mode(mode_);
Caffe::set_random_seed(rand_seed_);

InternalThreadEntry();
}

void InternalThread::StopInternalThread() {
if (is_started()) {
thread_->interrupt();
try {
thread_->join();
} catch (...) {
return false;
} catch (boost::thread_interrupted&) {
} catch (std::exception& e) {
CHECK(false) << e.what();
}
}
return true;
}

} // namespace caffe
4 changes: 2 additions & 2 deletions src/caffe/layers/base_data_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::CreatePrefetchThread() {
this->data_transformer_->InitRand();
CHECK(StartInternalThread()) << "Thread execution failed";
StartInternalThread();
}

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::JoinPrefetchThread() {
CHECK(WaitForInternalThreadToExit()) << "Thread joining failed";
StopInternalThread();
}

template <typename Dtype>
Expand Down
34 changes: 32 additions & 2 deletions src/caffe/test/test_internal_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "gtest/gtest.h"

#include "caffe/internal_thread.hpp"
#include "caffe/util/math_functions.hpp"

#include "caffe/test/test_caffe_main.hpp"

Expand All @@ -13,11 +14,40 @@ class InternalThreadTest : public ::testing::Test {};
TEST_F(InternalThreadTest, TestStartAndExit) {
InternalThread thread;
EXPECT_FALSE(thread.is_started());
EXPECT_TRUE(thread.StartInternalThread());
thread.StartInternalThread();
EXPECT_TRUE(thread.is_started());
EXPECT_TRUE(thread.WaitForInternalThreadToExit());
thread.StopInternalThread();
EXPECT_FALSE(thread.is_started());
}

class TestThreadA : public InternalThread {
void InternalThreadEntry() {
EXPECT_EQ(4244559767, caffe_rng_rand());
}
};

class TestThreadB : public InternalThread {
void InternalThreadEntry() {
EXPECT_EQ(1726478280, caffe_rng_rand());
}
};

TEST_F(InternalThreadTest, TestRandomSeed) {
TestThreadA t1;
Caffe::set_random_seed(9658361);
t1.StartInternalThread();
t1.StopInternalThread();

TestThreadA t2;
Caffe::set_random_seed(9658361);
t2.StartInternalThread();
t2.StopInternalThread();

TestThreadB t3;
Caffe::set_random_seed(3435563);
t3.StartInternalThread();
t3.StopInternalThread();
}

} // namespace caffe