From 453dfdebdac86abd55e12bb0b85705d77d04a1c5 Mon Sep 17 00:00:00 2001 From: Cyprien Noel Date: Thu, 19 Mar 2015 22:15:25 -0700 Subject: [PATCH 1/3] add blocking queue for synchronous things --- include/caffe/util/blocking_queue.hpp | 73 +++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 include/caffe/util/blocking_queue.hpp diff --git a/include/caffe/util/blocking_queue.hpp b/include/caffe/util/blocking_queue.hpp new file mode 100644 index 00000000000..4b59c4959e7 --- /dev/null +++ b/include/caffe/util/blocking_queue.hpp @@ -0,0 +1,73 @@ +#ifndef CAFFE_UTIL_BLOCKING_QUEUE_H_ +#define CAFFE_UTIL_BLOCKING_QUEUE_H_ + +#include +#include + +namespace caffe { + +template +class blocking_queue { + public: + void push(const T& t) { + boost::mutex::scoped_lock lock(mutex_); + queue_.push(t); + lock.unlock(); + cond_push_.notify_one(); + } + + bool empty() const { + boost::mutex::scoped_lock lock(mutex_); + return queue_.empty(); + } + + bool try_pop(T& t) { + boost::mutex::scoped_lock lock(mutex_); + + if (queue_.empty()) + return false; + + t = queue_.front(); + queue_.pop(); + return true; + } + + T pop(const string& log_on_wait = "") { + boost::mutex::scoped_lock lock(mutex_); + + while (queue_.empty()) { + if (!log_on_wait.empty()) { + time_t now = time(0); + if (now - last_wait_log_ > 5) { + last_wait_log_ = now; + LOG(INFO) << log_on_wait; + } + } + condition_.wait(lock); + } + + T t = queue_.front(); + queue_.pop(); + return t; + } + + // Return element without removing it + T peek() { + boost::mutex::scoped_lock lock(mutex_); + + while (queue_.empty()) + condition_.wait(lock); + + return queue_.front(); + } + + private: + std::queue queue_; + mutable boost::mutex mutex_; + boost::condition_variable condition_; + time_t last_wait_log_; +}; + +} // namespace caffe + +#endif From 910197c7542e6ab32bddd1863688bda488f1808d Mon Sep 17 00:00:00 2001 From: Jonathan L Long Date: Thu, 19 Mar 2015 22:26:38 -0700 Subject: [PATCH 2/3] simplify blocking queue --- include/caffe/util/blocking_queue.hpp | 42 +++++++-------------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/include/caffe/util/blocking_queue.hpp b/include/caffe/util/blocking_queue.hpp index 4b59c4959e7..2d2d46a96cf 100644 --- a/include/caffe/util/blocking_queue.hpp +++ b/include/caffe/util/blocking_queue.hpp @@ -9,6 +9,9 @@ namespace caffe { template class blocking_queue { public: + explicit blocking_queue() { } + virtual ~blocking_queue() { } + void push(const T& t) { boost::mutex::scoped_lock lock(mutex_); queue_.push(t); @@ -21,32 +24,9 @@ class blocking_queue { return queue_.empty(); } - bool try_pop(T& t) { - boost::mutex::scoped_lock lock(mutex_); - - if (queue_.empty()) - return false; - - t = queue_.front(); - queue_.pop(); - return true; - } - - T pop(const string& log_on_wait = "") { + T pop() { + T t = peek(); boost::mutex::scoped_lock lock(mutex_); - - while (queue_.empty()) { - if (!log_on_wait.empty()) { - time_t now = time(0); - if (now - last_wait_log_ > 5) { - last_wait_log_ = now; - LOG(INFO) << log_on_wait; - } - } - condition_.wait(lock); - } - - T t = queue_.front(); queue_.pop(); return t; } @@ -54,18 +34,18 @@ class blocking_queue { // Return element without removing it T peek() { boost::mutex::scoped_lock lock(mutex_); - - while (queue_.empty()) - condition_.wait(lock); - + while (queue_.empty()) { + cond_push_.wait(lock); + } return queue_.front(); } private: std::queue queue_; mutable boost::mutex mutex_; - boost::condition_variable condition_; - time_t last_wait_log_; + boost::condition_variable cond_push_; + + DISABLE_COPY_AND_ASSIGN(blocking_queue); }; } // namespace caffe From 61e7216e2967f8cc1153f323dddbf88b94bf5663 Mon Sep 17 00:00:00 2001 From: Jonathan L Long Date: Thu, 19 Mar 2015 22:29:25 -0700 Subject: [PATCH 3/3] add blocking_queue::wait_for_empty for blocking until done --- include/caffe/util/blocking_queue.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/include/caffe/util/blocking_queue.hpp b/include/caffe/util/blocking_queue.hpp index 2d2d46a96cf..26f5645e32e 100644 --- a/include/caffe/util/blocking_queue.hpp +++ b/include/caffe/util/blocking_queue.hpp @@ -24,10 +24,20 @@ class blocking_queue { return queue_.empty(); } + void wait_for_empty() { + boost::mutex::scoped_lock lock(mutex_); + while (!queue_.empty()) { + cond_empty_.wait(lock); + } + } + T pop() { T t = peek(); boost::mutex::scoped_lock lock(mutex_); queue_.pop(); + if (queue_.empty()) { + cond_empty_.notify_all(); + } return t; } @@ -44,6 +54,7 @@ class blocking_queue { std::queue queue_; mutable boost::mutex mutex_; boost::condition_variable cond_push_; + boost::condition_variable cond_empty_; DISABLE_COPY_AND_ASSIGN(blocking_queue); };