Skip to content

Commit

Permalink
Updated to v3.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bshoshany committed Jul 13, 2022
1 parent 9d43f5d commit cca2760
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 153 deletions.
62 changes: 41 additions & 21 deletions BS_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
/**
* @file BS_thread_pool.hpp
* @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com)
* @version 3.0.0
* @date 2022-05-30
* @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
* @version 3.1.0
* @date 2022-07-13
* @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
*
* @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS:synced_stream, and BS::timer.
*/

#define BS_THREAD_POOL_VERSION "v3.0.0 (2022-05-30)"
#define BS_THREAD_POOL_VERSION "v3.1.0 (2022-07-13)"

#include <atomic> // std::atomic
#include <chrono> // std::chrono
Expand Down Expand Up @@ -38,22 +38,22 @@ using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concur
* @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
*/
template <typename T>
class multi_future
class [[nodiscard]] multi_future
{
public:
/**
* @brief Construct a multi_future object with the given number of futures.
*
* @param num_futures_ The desired number of futures to store.
*/
explicit multi_future(const size_t num_futures_ = 0) : f(num_futures_) {}
multi_future(const size_t num_futures_ = 0) : f(num_futures_) {}

/**
* @brief Get the results from all the futures stored in this multi_future object.
*
* @return A vector containing the results.
*/
std::vector<T> get()
[[nodiscard]] std::vector<T> get()
{
std::vector<T> results(f.size());
for (size_t i = 0; i < f.size(); ++i)
Expand Down Expand Up @@ -85,7 +85,7 @@ class multi_future
/**
* @brief A fast, lightweight, and easy-to-use C++17 thread pool class.
*/
class thread_pool
class [[nodiscard]] thread_pool
{
public:
// ============================
Expand All @@ -97,7 +97,7 @@ class thread_pool
*
* @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
*/
explicit thread_pool(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) : thread_count(thread_count_ ? thread_count_ : std::thread::hardware_concurrency()), threads(std::make_unique<std::thread[]>(thread_count_ ? thread_count_ : std::thread::hardware_concurrency()))
thread_pool(const concurrency_t thread_count_ = 0) : thread_count(determine_thread_count(thread_count_)), threads(std::make_unique<std::thread[]>(determine_thread_count(thread_count_)))
{
create_threads();
}
Expand All @@ -120,7 +120,7 @@ class thread_pool
*
* @return The number of queued tasks.
*/
size_t get_tasks_queued() const
[[nodiscard]] size_t get_tasks_queued() const
{
const std::scoped_lock tasks_lock(tasks_mutex);
return tasks.size();
Expand All @@ -131,7 +131,7 @@ class thread_pool
*
* @return The number of running tasks.
*/
size_t get_tasks_running() const
[[nodiscard]] size_t get_tasks_running() const
{
const std::scoped_lock tasks_lock(tasks_mutex);
return tasks_total - tasks.size();
Expand All @@ -142,7 +142,7 @@ class thread_pool
*
* @return The total number of tasks.
*/
size_t get_tasks_total() const
[[nodiscard]] size_t get_tasks_total() const
{
return tasks_total;
}
Expand All @@ -152,7 +152,7 @@ class thread_pool
*
* @return The number of threads.
*/
concurrency_t get_thread_count() const
[[nodiscard]] concurrency_t get_thread_count() const
{
return thread_count;
}
Expand All @@ -172,7 +172,7 @@ class thread_pool
* @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can be used to obtain the values returned by each block.
*/
template <typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
multi_future<R> parallelize_loop(const T1& first_index, const T2& index_after_last, const F& loop, size_t num_blocks = 0)
[[nodiscard]] multi_future<R> parallelize_loop(const T1& first_index, const T2& index_after_last, const F& loop, size_t num_blocks = 0)
{
T first_index_T = static_cast<T>(first_index);
T index_after_last_T = static_cast<T>(index_after_last);
Expand Down Expand Up @@ -226,13 +226,13 @@ class thread_pool
*
* @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
*/
void reset(const concurrency_t thread_count_ = std::thread::hardware_concurrency())
void reset(const concurrency_t thread_count_ = 0)
{
const bool was_paused = paused;
paused = true;
wait_for_tasks();
destroy_threads();
thread_count = thread_count_ ? thread_count_ : std::thread::hardware_concurrency();
thread_count = determine_thread_count(thread_count_);
threads = std::make_unique<std::thread[]>(thread_count);
paused = was_paused;
create_threads();
Expand All @@ -249,7 +249,7 @@ class thread_pool
* @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one.
*/
template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
std::future<R> submit(const F& task, const A&... args)
[[nodiscard]] std::future<R> submit(const F& task, const A&... args)
{
std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
push_task(
Expand Down Expand Up @@ -331,6 +331,25 @@ class thread_pool
}
}

/**
* @brief Determine how many threads the pool should have, based on the parameter passed to the constructor or reset().
*
* @param thread_count_ The parameter passed to the constructor or reset(). If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread.
* @return The number of threads to use for constructing the pool.
*/
[[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_)
{
if (thread_count_ > 0)
return thread_count_;
else
{
if (std::thread::hardware_concurrency() > 0)
return std::thread::hardware_concurrency();
else
return 1;
}
}

/**
* @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by push_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait_for_tasks() in case it is waiting.
*/
Expand All @@ -347,6 +366,7 @@ class thread_pool
tasks.pop();
tasks_lock.unlock();
task();
tasks_lock.lock();
--tasks_total;
if (waiting)
task_done_cv.notify_one();
Expand Down Expand Up @@ -413,15 +433,15 @@ class thread_pool
/**
* @brief A helper class to synchronize printing to an output stream by different threads.
*/
class synced_stream
class [[nodiscard]] synced_stream
{
public:
/**
* @brief Construct a new synced stream.
*
* @param out_stream_ The output stream to print to. The default value is std::cout.
*/
explicit synced_stream(std::ostream& out_stream_ = std::cout) : out_stream(out_stream_) {};
synced_stream(std::ostream& out_stream_ = std::cout) : out_stream(out_stream_) {}

/**
* @brief Print any number of items into the output stream. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print.
Expand Down Expand Up @@ -469,7 +489,7 @@ class synced_stream
/**
* @brief A helper class to measure execution time for benchmarking purposes.
*/
class timer
class [[nodiscard]] timer
{
public:
/**
Expand All @@ -493,7 +513,7 @@ class timer
*
* @return The number of milliseconds.
*/
std::chrono::milliseconds::rep ms() const
[[nodiscard]] std::chrono::milliseconds::rep ms() const
{
return (std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_time)).count();
}
Expand Down
Loading

0 comments on commit cca2760

Please sign in to comment.