From e90d5db24743bf4e36a57f0773dc3cb846364bc2 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 31 Oct 2016 08:14:08 +0000 Subject: [PATCH 1/2] Fix SRL hang when exit. * Error occurred when enable Async Load in TestDataProvider. * It because DataProvider is calling getNextBatchInternal in one thread, and destructing DataProvider in other thread. * Add wait routine in DataProvider destructing. * Also fix another bug, when destructing TestDataProvider and do not read any test data. Fix #286 --- demo/semantic_role_labeling/.gitignore | 10 +++ paddle/gserver/dataproviders/DataProvider.cpp | 3 +- .../gserver/dataproviders/PyDataProvider2.cpp | 18 ++++- paddle/gserver/tests/test_PyDataProvider2.cpp | 17 +++++ paddle/utils/Locks.h | 73 +++++++++++++++++++ 5 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 demo/semantic_role_labeling/.gitignore diff --git a/demo/semantic_role_labeling/.gitignore b/demo/semantic_role_labeling/.gitignore new file mode 100644 index 0000000000000..cd90ca7bbe9be --- /dev/null +++ b/demo/semantic_role_labeling/.gitignore @@ -0,0 +1,10 @@ +*.pyc +train.log +data/feature +data/conll05st-release/ +data/src.dict +data/test.wsj.props +data/test.wsj.seq_pair +data/test.wsj.words +data/tgt.dict +output diff --git a/paddle/gserver/dataproviders/DataProvider.cpp b/paddle/gserver/dataproviders/DataProvider.cpp index 8cefbb30ada46..2cfb5a3a18c8a 100644 --- a/paddle/gserver/dataproviders/DataProvider.cpp +++ b/paddle/gserver/dataproviders/DataProvider.cpp @@ -131,9 +131,10 @@ void DoubleBuffer::asyncLoadBatch() { taskReadySem_.wait(); if (stopping_) break; - while (batchSize_ == 0) { + while (batchSize_ == 0 && !stopping_) { usleep(5); } + if (stopping_) break; do { DataBatch newBatch; diff --git a/paddle/gserver/dataproviders/PyDataProvider2.cpp b/paddle/gserver/dataproviders/PyDataProvider2.cpp index ca8b07af49ca0..d8c127cc1a832 100644 --- a/paddle/gserver/dataproviders/PyDataProvider2.cpp +++ b/paddle/gserver/dataproviders/PyDataProvider2.cpp @@ -201,7 +201,7 @@ class IPyDataProviderCache { * Here, we start a thread to read data. It is totally asynchronous for reading * data. And it support cache strategies. */ -class PyDataProvider2 : public DataProvider { +class PyDataProvider2 : public DataProvider, private WaitMethodDone { public: /** * Ctor @@ -433,26 +433,33 @@ class PyDataProvider2 : public DataProvider { inline void resetImpl(bool startNewThread) { DBG << "Reseting " << startNewThread; + exit_.store(true); if (loadThread_) { // is loading. - exit_.store(true); loadThread_->join(); loadThread_.reset(); } { PyGuard g; callingContexts_.clear(); + this->pullCV_.notify_one(); + } + this->waitNotCalling(); + { + PyGuard g; dataPool_.clear(); } poolActualSize_ = 0; - exit_ = false; + if (startNewThread && cache_->reset()) { DBG << "Start new thread."; loadThread_.reset(new std::thread([this] { + exit_ = false; loadThread(); })); callingContextCreated_.wait(); } DBG << "Reset done"; + exit_ = false; } private: @@ -529,6 +536,7 @@ class PyDataProvider2 : public DataProvider { * Loading a batch of data. */ int64_t getNextBatchInternal(int64_t size_, DataBatch *batch) { + auto guard = this->guard(); REGISTER_TIMER("PyDP2.getNextBatchInternal") CHECK_GE(size_, 0); size_t size = (size_t) size_; @@ -554,6 +562,10 @@ class PyDataProvider2 : public DataProvider { } else { // loading from cache. poolPtr = this->cache_->load(); } + if (exit_) { + // PyDataProvider is destructing. + return 0; + } CHECK(poolPtr != nullptr); std::deque& pool = *poolPtr; diff --git a/paddle/gserver/tests/test_PyDataProvider2.cpp b/paddle/gserver/tests/test_PyDataProvider2.cpp index 6bf1e32925121..b9867a728d9b4 100644 --- a/paddle/gserver/tests/test_PyDataProvider2.cpp +++ b/paddle/gserver/tests/test_PyDataProvider2.cpp @@ -353,6 +353,23 @@ TEST(PyDataProvider2, test_check) { } } +TEST(PyDataProvider2, multiThread) { + paddle::DataConfig config; + config.set_type("py2"); + config.set_files(FLAGS_train_list.c_str()); + config.set_load_data_module("test_PyDataProvider2"); + config.set_load_data_object("test_dense_no_seq"); + config.set_async_load_data(true); + + std::unique_ptr provider( + paddle::DataProvider::create(config, false)); + provider->reset(); + paddle::DataBatch batch; + provider->getNextBatch(100, &batch); + provider->reset(); + provider.reset(); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); paddle::initMain(argc, argv); diff --git a/paddle/utils/Locks.h b/paddle/utils/Locks.h index 1fc0363d34597..7396d8bfd02d6 100644 --- a/paddle/utils/Locks.h +++ b/paddle/utils/Locks.h @@ -240,4 +240,77 @@ class LockedCondition : public std::condition_variable { std::mutex mutex_; }; + +/** + * @brief Wait Some Method Done. + * + * It provide a guard when invoke a method, and give the ability to wait calling + * some method is done in another thread. The example usage are: + * + * @code{.cpp} + * class A { + * private: + * WaitMethodDone done_; + * public: + * void foo() { + * auto guard = done_.guard(); + * // your code. + * } + * + * void clear() { + * done_.waitNotCalling(); + * // ensure the foo() is not calling here. + * // do some job. + * } + * } + * @endcode + */ +class WaitMethodDone { +public: + DISABLE_COPY(WaitMethodDone); + + class CallingGuard { + public: + CallingGuard(const CallingGuard& other) = delete; + CallingGuard(CallingGuard&& other) { + self_ = other.self_; + other.self_ = nullptr; + } + + explicit CallingGuard(WaitMethodDone* self): self_(self) { + self_->cv_.notify_all([this] { + self_->isCalling_ = true; + }); + } + + ~CallingGuard() { + if (self_) { + self_->cv_.notify_all([this] { + self_->isCalling_ = false; + }); + } + } + + private: + WaitMethodDone* self_; + }; + + WaitMethodDone(): isCalling_(false) {} + + CallingGuard guard() { + return CallingGuard(this); + } + + void waitNotCalling() { + cv_.wait([this] { + return !isCalling_; + }); + } + +private: + bool isCalling_; + LockedCondition cv_; + friend class CallingGuard; +}; + } // namespace paddle From 3b1a6bdc9e6f7fc8a46d90ff7bc85fa60b4fb91d Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Fri, 4 Nov 2016 12:58:26 +0800 Subject: [PATCH 2/2] Follow comments, Use mutex is cool! --- .../gserver/dataproviders/PyDataProvider2.cpp | 9 ++- paddle/utils/Locks.h | 73 ------------------- 2 files changed, 6 insertions(+), 76 deletions(-) diff --git a/paddle/gserver/dataproviders/PyDataProvider2.cpp b/paddle/gserver/dataproviders/PyDataProvider2.cpp index d8c127cc1a832..90391a7c307d8 100644 --- a/paddle/gserver/dataproviders/PyDataProvider2.cpp +++ b/paddle/gserver/dataproviders/PyDataProvider2.cpp @@ -201,7 +201,7 @@ class IPyDataProviderCache { * Here, we start a thread to read data. It is totally asynchronous for reading * data. And it support cache strategies. */ -class PyDataProvider2 : public DataProvider, private WaitMethodDone { +class PyDataProvider2 : public DataProvider { public: /** * Ctor @@ -443,7 +443,8 @@ class PyDataProvider2 : public DataProvider, private WaitMethodDone { callingContexts_.clear(); this->pullCV_.notify_one(); } - this->waitNotCalling(); + + std::lock_guard guard(mutexForReset_); { PyGuard g; dataPool_.clear(); @@ -472,6 +473,8 @@ class PyDataProvider2 : public DataProvider, private WaitMethodDone { std::condition_variable pullCV_; std::mutex mtx_; + std::mutex mutexForReset_; + ThreadBarrier callingContextCreated_; std::unique_ptr cache_; @@ -536,7 +539,7 @@ class PyDataProvider2 : public DataProvider, private WaitMethodDone { * Loading a batch of data. */ int64_t getNextBatchInternal(int64_t size_, DataBatch *batch) { - auto guard = this->guard(); + std::lock_guard guard(mutexForReset_); REGISTER_TIMER("PyDP2.getNextBatchInternal") CHECK_GE(size_, 0); size_t size = (size_t) size_; diff --git a/paddle/utils/Locks.h b/paddle/utils/Locks.h index 7396d8bfd02d6..1fc0363d34597 100644 --- a/paddle/utils/Locks.h +++ b/paddle/utils/Locks.h @@ -240,77 +240,4 @@ class LockedCondition : public std::condition_variable { std::mutex mutex_; }; - -/** - * @brief Wait Some Method Done. - * - * It provide a guard when invoke a method, and give the ability to wait calling - * some method is done in another thread. The example usage are: - * - * @code{.cpp} - * class A { - * private: - * WaitMethodDone done_; - * public: - * void foo() { - * auto guard = done_.guard(); - * // your code. - * } - * - * void clear() { - * done_.waitNotCalling(); - * // ensure the foo() is not calling here. - * // do some job. - * } - * } - * @endcode - */ -class WaitMethodDone { -public: - DISABLE_COPY(WaitMethodDone); - - class CallingGuard { - public: - CallingGuard(const CallingGuard& other) = delete; - CallingGuard(CallingGuard&& other) { - self_ = other.self_; - other.self_ = nullptr; - } - - explicit CallingGuard(WaitMethodDone* self): self_(self) { - self_->cv_.notify_all([this] { - self_->isCalling_ = true; - }); - } - - ~CallingGuard() { - if (self_) { - self_->cv_.notify_all([this] { - self_->isCalling_ = false; - }); - } - } - - private: - WaitMethodDone* self_; - }; - - WaitMethodDone(): isCalling_(false) {} - - CallingGuard guard() { - return CallingGuard(this); - } - - void waitNotCalling() { - cv_.wait([this] { - return !isCalling_; - }); - } - -private: - bool isCalling_; - LockedCondition cv_; - friend class CallingGuard; -}; - } // namespace paddle