From a319bca27ef341180f48c37008d11e2ec21d5c26 Mon Sep 17 00:00:00 2001 From: Jonas Tepe Date: Mon, 16 May 2016 10:56:18 +0200 Subject: [PATCH] Initial commit of all src and the README.md file. --- LICENSE_1_0.txt | 23 ++++++ README.md | 4 +- listing_1.1.cpp | 13 ++++ listing_10.1.cpp | 42 +++++++++++ listing_2.1.cpp | 35 +++++++++ listing_2.2.cpp | 46 ++++++++++++ listing_2.3.cpp | 58 +++++++++++++++ listing_2.4.cpp | 61 ++++++++++++++++ listing_2.5.cpp | 27 +++++++ listing_2.6.cpp | 56 ++++++++++++++ listing_2.7.cpp | 23 ++++++ listing_2.8.cpp | 67 +++++++++++++++++ listing_3.1.cpp | 26 +++++++ listing_3.10.cpp | 27 +++++++ listing_3.11.cpp | 28 +++++++ listing_3.12.cpp | 56 ++++++++++++++ listing_3.13.cpp | 30 ++++++++ listing_3.2.cpp | 44 +++++++++++ listing_3.3.cpp | 24 ++++++ listing_3.4.cpp | 24 ++++++ listing_3.5.cpp | 68 +++++++++++++++++ listing_3.6.cpp | 29 ++++++++ listing_3.7.cpp | 64 ++++++++++++++++ listing_3.8.cpp | 56 ++++++++++++++ listing_3.9.cpp | 29 ++++++++ listing_4.1.cpp | 64 ++++++++++++++++ listing_4.10.cpp | 27 +++++++ listing_4.11.cpp | 18 +++++ listing_4.12.cpp | 24 ++++++ listing_4.13.cpp | 23 ++++++ listing_4.14.cpp | 12 +++ listing_4.15.cpp | 43 +++++++++++ listing_4.16.cpp | 31 ++++++++ listing_4.2.cpp | 30 ++++++++ listing_4.3.cpp | 22 ++++++ listing_4.4.cpp | 58 +++++++++++++++ listing_4.5.cpp | 73 ++++++++++++++++++ listing_4.6.cpp | 16 ++++ listing_4.7.cpp | 34 +++++++++ listing_4.8.cpp | 9 +++ listing_4.9.cpp | 40 ++++++++++ listing_5.1.cpp | 16 ++++ listing_5.10.cpp | 39 ++++++++++ listing_5.11.cpp | 41 +++++++++++ listing_5.12.cpp | 33 +++++++++ listing_5.13.cpp | 34 +++++++++ listing_5.2.cpp | 22 ++++++ listing_5.3.cpp | 17 +++++ listing_5.4.cpp | 46 ++++++++++++ listing_5.5.cpp | 31 ++++++++ listing_5.6.cpp | 74 +++++++++++++++++++ listing_5.7.cpp | 46 ++++++++++++ listing_5.8.cpp | 31 ++++++++ listing_5.9.cpp | 42 +++++++++++ listing_6.1.cpp | 56 ++++++++++++++ listing_6.10.cpp | 44 +++++++++++ listing_6.11.cpp | 108 +++++++++++++++++++++++++++ listing_6.12.cpp | 20 +++++ listing_6.13.cpp | 101 +++++++++++++++++++++++++ listing_6.2.cpp | 72 ++++++++++++++++++ listing_6.3.cpp | 68 +++++++++++++++++ listing_6.4.cpp | 56 ++++++++++++++ listing_6.5.cpp | 46 ++++++++++++ listing_6.6.cpp | 64 ++++++++++++++++ listing_6.7.cpp | 29 ++++++++ listing_6.8.cpp | 15 ++++ listing_6.9.cpp | 49 +++++++++++++ listing_7.1.cpp | 18 +++++ listing_7.10.cpp | 38 ++++++++++ listing_7.11.cpp | 46 ++++++++++++ listing_7.12.cpp | 88 ++++++++++++++++++++++ listing_7.13.cpp | 62 ++++++++++++++++ listing_7.14.cpp | 20 +++++ listing_7.15.cpp | 59 +++++++++++++++ listing_7.16.cpp | 31 ++++++++ listing_7.17.cpp | 27 +++++++ listing_7.18.cpp | 20 +++++ listing_7.19.cpp | 27 +++++++ listing_7.2.cpp | 23 ++++++ listing_7.20.cpp | 33 +++++++++ listing_7.21.cpp | 55 ++++++++++++++ listing_7.3.cpp | 31 ++++++++ listing_7.4.cpp | 25 +++++++ listing_7.5.cpp | 57 +++++++++++++++ listing_7.6.cpp | 36 +++++++++ listing_7.7.cpp | 49 +++++++++++++ listing_7.8.cpp | 51 +++++++++++++ listing_7.9.cpp | 32 ++++++++ listing_8.1.cpp | 102 ++++++++++++++++++++++++++ listing_8.10.cpp | 45 ++++++++++++ listing_8.11.cpp | 93 +++++++++++++++++++++++ listing_8.12.cpp | 24 ++++++ listing_8.13.cpp | 98 +++++++++++++++++++++++++ listing_8.2.cpp | 49 +++++++++++++ listing_8.3.cpp | 56 ++++++++++++++ listing_8.4.cpp | 44 +++++++++++ listing_8.5.cpp | 20 +++++ listing_8.6.cpp | 50 +++++++++++++ listing_8.7.cpp | 44 +++++++++++ listing_8.8.cpp | 24 ++++++ listing_8.9.cpp | 76 +++++++++++++++++++ listing_9.1.cpp | 53 ++++++++++++++ listing_9.10.cpp | 9 +++ listing_9.11.cpp | 57 +++++++++++++++ listing_9.12.cpp | 77 +++++++++++++++++++ listing_9.13.cpp | 38 ++++++++++ listing_9.2.cpp | 62 ++++++++++++++++ listing_9.3.cpp | 31 ++++++++ listing_9.4.cpp | 12 +++ listing_9.5.cpp | 58 +++++++++++++++ listing_9.6.cpp | 58 +++++++++++++++ listing_9.7.cpp | 53 ++++++++++++++ listing_9.8.cpp | 112 ++++++++++++++++++++++++++++ listing_9.9.cpp | 31 ++++++++ listing_a.1.cpp | 23 ++++++ listing_a.2.cpp | 18 +++++ listing_a.3.cpp | 23 ++++++ listing_a.4.cpp | 8 ++ listing_c.1.cpp | 50 +++++++++++++ listing_c.10.cpp | 51 +++++++++++++ listing_c.2.cpp | 22 ++++++ listing_c.3.cpp | 16 ++++ listing_c.4.cpp | 65 ++++++++++++++++ listing_c.5.cpp | 71 ++++++++++++++++++ listing_c.6.cpp | 118 ++++++++++++++++++++++++++++++ listing_c.7.cpp | 187 +++++++++++++++++++++++++++++++++++++++++++++++ listing_c.8.cpp | 73 ++++++++++++++++++ listing_c.9.cpp | 117 +++++++++++++++++++++++++++++ 128 files changed, 5728 insertions(+), 2 deletions(-) create mode 100644 LICENSE_1_0.txt create mode 100644 listing_1.1.cpp create mode 100644 listing_10.1.cpp create mode 100644 listing_2.1.cpp create mode 100644 listing_2.2.cpp create mode 100644 listing_2.3.cpp create mode 100644 listing_2.4.cpp create mode 100644 listing_2.5.cpp create mode 100644 listing_2.6.cpp create mode 100644 listing_2.7.cpp create mode 100644 listing_2.8.cpp create mode 100644 listing_3.1.cpp create mode 100644 listing_3.10.cpp create mode 100644 listing_3.11.cpp create mode 100644 listing_3.12.cpp create mode 100644 listing_3.13.cpp create mode 100644 listing_3.2.cpp create mode 100644 listing_3.3.cpp create mode 100644 listing_3.4.cpp create mode 100644 listing_3.5.cpp create mode 100644 listing_3.6.cpp create mode 100644 listing_3.7.cpp create mode 100644 listing_3.8.cpp create mode 100644 listing_3.9.cpp create mode 100644 listing_4.1.cpp create mode 100644 listing_4.10.cpp create mode 100644 listing_4.11.cpp create mode 100644 listing_4.12.cpp create mode 100644 listing_4.13.cpp create mode 100644 listing_4.14.cpp create mode 100644 listing_4.15.cpp create mode 100644 listing_4.16.cpp create mode 100644 listing_4.2.cpp create mode 100644 listing_4.3.cpp create mode 100644 listing_4.4.cpp create mode 100644 listing_4.5.cpp create mode 100644 listing_4.6.cpp create mode 100644 listing_4.7.cpp create mode 100644 listing_4.8.cpp create mode 100644 listing_4.9.cpp create mode 100644 listing_5.1.cpp create mode 100644 listing_5.10.cpp create mode 100644 listing_5.11.cpp create mode 100644 listing_5.12.cpp create mode 100644 listing_5.13.cpp create mode 100644 listing_5.2.cpp create mode 100644 listing_5.3.cpp create mode 100644 listing_5.4.cpp create mode 100644 listing_5.5.cpp create mode 100644 listing_5.6.cpp create mode 100644 listing_5.7.cpp create mode 100644 listing_5.8.cpp create mode 100644 listing_5.9.cpp create mode 100644 listing_6.1.cpp create mode 100644 listing_6.10.cpp create mode 100644 listing_6.11.cpp create mode 100644 listing_6.12.cpp create mode 100644 listing_6.13.cpp create mode 100644 listing_6.2.cpp create mode 100644 listing_6.3.cpp create mode 100644 listing_6.4.cpp create mode 100644 listing_6.5.cpp create mode 100644 listing_6.6.cpp create mode 100644 listing_6.7.cpp create mode 100644 listing_6.8.cpp create mode 100644 listing_6.9.cpp create mode 100644 listing_7.1.cpp create mode 100644 listing_7.10.cpp create mode 100644 listing_7.11.cpp create mode 100644 listing_7.12.cpp create mode 100644 listing_7.13.cpp create mode 100644 listing_7.14.cpp create mode 100644 listing_7.15.cpp create mode 100644 listing_7.16.cpp create mode 100644 listing_7.17.cpp create mode 100644 listing_7.18.cpp create mode 100644 listing_7.19.cpp create mode 100644 listing_7.2.cpp create mode 100644 listing_7.20.cpp create mode 100644 listing_7.21.cpp create mode 100644 listing_7.3.cpp create mode 100644 listing_7.4.cpp create mode 100644 listing_7.5.cpp create mode 100644 listing_7.6.cpp create mode 100644 listing_7.7.cpp create mode 100644 listing_7.8.cpp create mode 100644 listing_7.9.cpp create mode 100644 listing_8.1.cpp create mode 100644 listing_8.10.cpp create mode 100644 listing_8.11.cpp create mode 100644 listing_8.12.cpp create mode 100644 listing_8.13.cpp create mode 100644 listing_8.2.cpp create mode 100644 listing_8.3.cpp create mode 100644 listing_8.4.cpp create mode 100644 listing_8.5.cpp create mode 100644 listing_8.6.cpp create mode 100644 listing_8.7.cpp create mode 100644 listing_8.8.cpp create mode 100644 listing_8.9.cpp create mode 100644 listing_9.1.cpp create mode 100644 listing_9.10.cpp create mode 100644 listing_9.11.cpp create mode 100644 listing_9.12.cpp create mode 100644 listing_9.13.cpp create mode 100644 listing_9.2.cpp create mode 100644 listing_9.3.cpp create mode 100644 listing_9.4.cpp create mode 100644 listing_9.5.cpp create mode 100644 listing_9.6.cpp create mode 100644 listing_9.7.cpp create mode 100644 listing_9.8.cpp create mode 100644 listing_9.9.cpp create mode 100644 listing_a.1.cpp create mode 100644 listing_a.2.cpp create mode 100644 listing_a.3.cpp create mode 100644 listing_a.4.cpp create mode 100644 listing_c.1.cpp create mode 100644 listing_c.10.cpp create mode 100644 listing_c.2.cpp create mode 100644 listing_c.3.cpp create mode 100644 listing_c.4.cpp create mode 100644 listing_c.5.cpp create mode 100644 listing_c.6.cpp create mode 100644 listing_c.7.cpp create mode 100644 listing_c.8.cpp create mode 100644 listing_c.9.cpp diff --git a/LICENSE_1_0.txt b/LICENSE_1_0.txt new file mode 100644 index 0000000..36b7cd9 --- /dev/null +++ b/LICENSE_1_0.txt @@ -0,0 +1,23 @@ +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index e242c53..4de10bf 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# concurrencyinaction -Concurrency in Action source code examples +# Concurrency in Action source code +This repo just contains the source code of the book [Concurrency in Action](https://www.manning.com/books/c-plus-plus-concurrency-in-action) to easily browse the source. Nothing more, nothing less. Cheers 🍷. diff --git a/listing_1.1.cpp b/listing_1.1.cpp new file mode 100644 index 0000000..92170a8 --- /dev/null +++ b/listing_1.1.cpp @@ -0,0 +1,13 @@ +#include +#include + +void hello() +{ + std::cout<<"Hello Concurrent World\n"; +} + +int main() +{ + std::thread t(hello); + t.join(); +} diff --git a/listing_10.1.cpp b/listing_10.1.cpp new file mode 100644 index 0000000..865b94b --- /dev/null +++ b/listing_10.1.cpp @@ -0,0 +1,42 @@ +void test_concurrent_push_and_pop_on_empty_queue() +{ + threadsafe_queue q; + + std::promise go,push_ready,pop_ready; + std::shared_future ready(go.get_future()); + + std::future push_done; + std::future pop_done; + + try + { + push_done=std::async(std::launch::async, + [&q,ready,&push_ready]() + { + push_ready.set_value(); + ready.wait(); + q.push(42); + } + ); + pop_done=std::async(std::launch::async, + [&q,ready,&pop_ready]() + { + pop_ready.set_value(); + ready.wait(); + return q.pop(); + } + ); + push_ready.get_future().wait(); + pop_ready.get_future().wait(); + go.set_value(); + + push_done.get(); + assert(pop_done.get()==42); + assert(q.empty()); + } + catch(...) + { + go.set_value(); + throw; + } +} diff --git a/listing_2.1.cpp b/listing_2.1.cpp new file mode 100644 index 0000000..45d2193 --- /dev/null +++ b/listing_2.1.cpp @@ -0,0 +1,35 @@ +#include + +void do_something(int& i) +{ + ++i; +} + +struct func +{ + int& i; + + func(int& i_):i(i_){} + + void operator()() + { + for(unsigned j=0;j<1000000;++j) + { + do_something(i); + } + } +}; + + +void oops() +{ + int some_local_state=0; + func my_func(some_local_state); + std::thread my_thread(my_func); + my_thread.detach(); +} + +int main() +{ + oops(); +} diff --git a/listing_2.2.cpp b/listing_2.2.cpp new file mode 100644 index 0000000..c952752 --- /dev/null +++ b/listing_2.2.cpp @@ -0,0 +1,46 @@ +#include + +void do_something(int& i) +{ + ++i; +} + +struct func +{ + int& i; + + func(int& i_):i(i_){} + + void operator()() + { + for(unsigned j=0;j<1000000;++j) + { + do_something(i); + } + } +}; + +void do_something_in_current_thread() +{} + +void f() +{ + int some_local_state=0; + func my_func(some_local_state); + std::thread t(my_func); + try + { + do_something_in_current_thread(); + } + catch(...) + { + t.join(); + throw; + } + t.join(); +} + +int main() +{ + f(); +} diff --git a/listing_2.3.cpp b/listing_2.3.cpp new file mode 100644 index 0000000..5c48223 --- /dev/null +++ b/listing_2.3.cpp @@ -0,0 +1,58 @@ +#include + +class thread_guard +{ + std::thread& t; +public: + explicit thread_guard(std::thread& t_): + t(t_) + {} + ~thread_guard() + { + if(t.joinable()) + { + t.join(); + } + } + thread_guard(thread_guard const&)=delete; + thread_guard& operator=(thread_guard const&)=delete; +}; + +void do_something(int& i) +{ + ++i; +} + +struct func +{ + int& i; + + func(int& i_):i(i_){} + + void operator()() + { + for(unsigned j=0;j<1000000;++j) + { + do_something(i); + } + } +}; + +void do_something_in_current_thread() +{} + + +void f() +{ + int some_local_state; + func my_func(some_local_state); + std::thread t(my_func); + thread_guard g(t); + + do_something_in_current_thread(); +} + +int main() +{ + f(); +} diff --git a/listing_2.4.cpp b/listing_2.4.cpp new file mode 100644 index 0000000..702afa7 --- /dev/null +++ b/listing_2.4.cpp @@ -0,0 +1,61 @@ +#include +#include + +void open_document_and_display_gui(std::string const& filename) +{} + +bool done_editing() +{ + return true; +} + +enum command_type{ + open_new_document +}; + + +struct user_command +{ + command_type type; + + user_command(): + type(open_new_document) + {} +}; + +user_command get_user_input() +{ + return user_command(); +} + +std::string get_filename_from_user() +{ + return "foo.doc"; +} + +void process_user_input(user_command const& cmd) +{} + +void edit_document(std::string const& filename) +{ + open_document_and_display_gui(filename); + while(!done_editing()) + { + user_command cmd=get_user_input(); + if(cmd.type==open_new_document) + { + std::string const new_name=get_filename_from_user(); + std::thread t(edit_document,new_name); + t.detach(); + } + else + { + process_user_input(cmd); + } + } +} + +int main() +{ + edit_document("bar.doc"); +} diff --git a/listing_2.5.cpp b/listing_2.5.cpp new file mode 100644 index 0000000..5913bef --- /dev/null +++ b/listing_2.5.cpp @@ -0,0 +1,27 @@ +#include + +void some_function() +{} + +void some_other_function(int) +{} + +std::thread f() +{ + void some_function(); + return std::thread(some_function); +} +std::thread g() +{ + void some_other_function(int); + std::thread t(some_other_function,42); + return t; +} + +int main() +{ + std::thread t1=f(); + t1.join(); + std::thread t2=g(); + t2.join(); +} diff --git a/listing_2.6.cpp b/listing_2.6.cpp new file mode 100644 index 0000000..f08fff6 --- /dev/null +++ b/listing_2.6.cpp @@ -0,0 +1,56 @@ +#include +#include + +class scoped_thread +{ + std::thread t; +public: + explicit scoped_thread(std::thread t_): + t(std::move(t_)) + { + if(!t.joinable()) + throw std::logic_error("No thread"); + } + ~scoped_thread() + { + t.join(); + } + scoped_thread(scoped_thread const&)=delete; + scoped_thread& operator=(scoped_thread const&)=delete; +}; + +void do_something(int& i) +{ + ++i; +} + +struct func +{ + int& i; + + func(int& i_):i(i_){} + + void operator()() + { + for(unsigned j=0;j<1000000;++j) + { + do_something(i); + } + } +}; + +void do_something_in_current_thread() +{} + +void f() +{ + int some_local_state; + scoped_thread t(std::thread(func(some_local_state))); + + do_something_in_current_thread(); +} + +int main() +{ + f(); +} diff --git a/listing_2.7.cpp b/listing_2.7.cpp new file mode 100644 index 0000000..4aa81d2 --- /dev/null +++ b/listing_2.7.cpp @@ -0,0 +1,23 @@ +#include +#include +#include +#include + +void do_work(unsigned id) +{} + +void f() +{ + std::vector threads; + for(unsigned i=0;i<20;++i) + { + threads.push_back(std::thread(do_work,i)); + } + std::for_each(threads.begin(),threads.end(), + std::mem_fn(&std::thread::join)); +} + +int main() +{ + f(); +} diff --git a/listing_2.8.cpp b/listing_2.8.cpp new file mode 100644 index 0000000..c2dc415 --- /dev/null +++ b/listing_2.8.cpp @@ -0,0 +1,67 @@ +#include +#include +#include +#include +#include +#include + +template +struct accumulate_block +{ + void operator()(Iterator first,Iterator last,T& result) + { + result=std::accumulate(first,last,result); + } +}; + +template +T parallel_accumulate(Iterator first,Iterator last,T init) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return init; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + std::vector results(num_threads); + std::vector threads(num_threads-1); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + threads[i]=std::thread( + accumulate_block(), + block_start,block_end,std::ref(results[i])); + block_start=block_end; + } + accumulate_block()(block_start,last,results[num_threads-1]); + + std::for_each(threads.begin(),threads.end(), + std::mem_fn(&std::thread::join)); + + return std::accumulate(results.begin(),results.end(),init); +} + +int main() +{ + std::vector vi; + for(int i=0;i<10;++i) + { + vi.push_back(10); + } + int sum=parallel_accumulate(vi.begin(),vi.end(),5); + std::cout<<"sum="< +#include +#include + +std::list some_list; +std::mutex some_mutex; + +void add_to_list(int new_value) +{ + std::lock_guard guard(some_mutex); + some_list.push_back(new_value); +} +bool list_contains(int value_to_find) +{ + std::lock_guard guard(some_mutex); + return std::find(some_list.begin(),some_list.end(),value_to_find) + != some_list.end(); +} + +#include + +int main() +{ + add_to_list(42); + std::cout<<"contains(1)="< +class threadsafe_stack +{ +private: + std::stack data; + mutable std::mutex m; +public: + threadsafe_stack(){} + threadsafe_stack(const threadsafe_stack& other) + { + std::lock_guard lock(other.m); + data=other.data; + } + threadsafe_stack& operator=(const threadsafe_stack&) = delete; + + void push(T new_value) + { + std::lock_guard lock(m); + data.push(new_value); + } + std::shared_ptr pop() + { + std::lock_guard lock(m); + if(data.empty()) throw empty_stack(); + std::shared_ptr const res(std::make_shared(data.top())); + data.pop(); + return res; + } + void pop(T& value) + { + std::lock_guard lock(m); + if(data.empty()) throw empty_stack(); + value=data.top(); + data.pop(); + } + bool empty() const + { + std::lock_guard lock(m); + return data.empty(); + } +}; + +int main() +{ + threadsafe_stack si; + si.push(5); + si.pop(); + if(!si.empty()) + { + int x; + si.pop(x); + } + +} diff --git a/listing_3.6.cpp b/listing_3.6.cpp new file mode 100644 index 0000000..e2a48fd --- /dev/null +++ b/listing_3.6.cpp @@ -0,0 +1,29 @@ +#include + +class some_big_object +{}; + +void swap(some_big_object& lhs,some_big_object& rhs) +{} + +class X +{ +private: + some_big_object some_detail; + mutable std::mutex m; +public: + X(some_big_object const& sd):some_detail(sd){} + + friend void swap(X& lhs, X& rhs) + { + if(&lhs==&rhs) + return; + std::lock(lhs.m,rhs.m); + std::lock_guard lock_a(lhs.m,std::adopt_lock); + std::lock_guard lock_b(rhs.m,std::adopt_lock); + swap(lhs.some_detail,rhs.some_detail); + } +}; + +int main() +{} diff --git a/listing_3.7.cpp b/listing_3.7.cpp new file mode 100644 index 0000000..002e71b --- /dev/null +++ b/listing_3.7.cpp @@ -0,0 +1,64 @@ +#include + +class hierarchical_mutex +{ +public: + explicit hierarchical_mutex(unsigned level) + {} + + void lock() + {} + void unlock() + {} +}; + + +hierarchical_mutex high_level_mutex(10000); +hierarchical_mutex low_level_mutex(5000); + +int do_low_level_stuff() +{ + return 42; +} + + +int low_level_func() +{ + std::lock_guard lk(low_level_mutex); + return do_low_level_stuff(); +} + +void high_level_stuff(int some_param) +{} + + +void high_level_func() +{ + std::lock_guard lk(high_level_mutex); + high_level_stuff(low_level_func()); +} + +void thread_a() +{ + high_level_func(); +} + +hierarchical_mutex other_mutex(100); +void do_other_stuff() +{} + + +void other_stuff() +{ + high_level_func(); + do_other_stuff(); +} + +void thread_b() +{ + std::lock_guard lk(other_mutex); + other_stuff(); +} + +int main() +{} diff --git a/listing_3.8.cpp b/listing_3.8.cpp new file mode 100644 index 0000000..a1f18aa --- /dev/null +++ b/listing_3.8.cpp @@ -0,0 +1,56 @@ +#include +#include + +class hierarchical_mutex +{ + std::mutex internal_mutex; + unsigned long const hierarchy_value; + unsigned long previous_hierarchy_value; + static thread_local unsigned long this_thread_hierarchy_value; + + void check_for_hierarchy_violation() + { + if(this_thread_hierarchy_value <= hierarchy_value) + { + throw std::logic_error("mutex hierarchy violated"); + } + } + void update_hierarchy_value() + { + previous_hierarchy_value=this_thread_hierarchy_value; + this_thread_hierarchy_value=hierarchy_value; + } +public: + explicit hierarchical_mutex(unsigned long value): + hierarchy_value(value), + previous_hierarchy_value(0) + {} + void lock() + { + check_for_hierarchy_violation(); + internal_mutex.lock(); + update_hierarchy_value(); + } + void unlock() + { + this_thread_hierarchy_value=previous_hierarchy_value; + internal_mutex.unlock(); + } + bool try_lock() + { + check_for_hierarchy_violation(); + if(!internal_mutex.try_lock()) + return false; + update_hierarchy_value(); + return true; + } +}; +thread_local unsigned long + hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); + +int main() +{ + hierarchical_mutex m1(42); + hierarchical_mutex m2(2000); + +} diff --git a/listing_3.9.cpp b/listing_3.9.cpp new file mode 100644 index 0000000..807acbe --- /dev/null +++ b/listing_3.9.cpp @@ -0,0 +1,29 @@ +#include + +class some_big_object +{}; + +void swap(some_big_object& lhs,some_big_object& rhs) +{} + +class X +{ +private: + some_big_object some_detail; + mutable std::mutex m; +public: + X(some_big_object const& sd):some_detail(sd){} + + friend void swap(X& lhs, X& rhs) + { + if(&lhs==&rhs) + return; + std::unique_lock lock_a(lhs.m,std::defer_lock); + std::unique_lock lock_b(rhs.m,std::defer_lock); + std::lock(lock_a,lock_b); + swap(lhs.some_detail,rhs.some_detail); + } +}; + +int main() +{} diff --git a/listing_4.1.cpp b/listing_4.1.cpp new file mode 100644 index 0000000..5ede423 --- /dev/null +++ b/listing_4.1.cpp @@ -0,0 +1,64 @@ +#include +#include +#include +#include + +bool more_data_to_prepare() +{ + return false; +} + +struct data_chunk +{}; + +data_chunk prepare_data() +{ + return data_chunk(); +} + +void process(data_chunk&) +{} + +bool is_last_chunk(data_chunk&) +{ + return true; +} + +std::mutex mut; +std::queue data_queue; +std::condition_variable data_cond; + +void data_preparation_thread() +{ + while(more_data_to_prepare()) + { + data_chunk const data=prepare_data(); + std::lock_guard lk(mut); + data_queue.push(data); + data_cond.notify_one(); + } +} + +void data_processing_thread() +{ + while(true) + { + std::unique_lock lk(mut); + data_cond.wait(lk,[]{return !data_queue.empty();}); + data_chunk data=data_queue.front(); + data_queue.pop(); + lk.unlock(); + process(data); + if(is_last_chunk(data)) + break; + } +} + +int main() +{ + std::thread t1(data_preparation_thread); + std::thread t2(data_processing_thread); + + t1.join(); + t2.join(); +} diff --git a/listing_4.10.cpp b/listing_4.10.cpp new file mode 100644 index 0000000..9eadddd --- /dev/null +++ b/listing_4.10.cpp @@ -0,0 +1,27 @@ +#include +void process_connections(connection_set& connections) +{ + while(!done(connections)) + { + for(connection_iterator + connection=connections.begin(),end=connections.end(); + connection!=end; + ++connection) + { + if(connection->has_incoming_data()) + { + data_packet data=connection->incoming(); + std::promise& p= + connection->get_promise(data.id); + p.set_value(data.payload); + } + if(connection->has_outgoing_data()) + { + outgoing_packet data= + connection->top_of_outgoing_queue(); + connection->send(data.payload); + data.promise.set_value(true); + } + } + } +} diff --git a/listing_4.11.cpp b/listing_4.11.cpp new file mode 100644 index 0000000..8d22cb6 --- /dev/null +++ b/listing_4.11.cpp @@ -0,0 +1,18 @@ +#include +#include +#include +std::condition_variable cv; +bool done; +std::mutex m; +bool wait_loop() +{ + auto const timeout= std::chrono::steady_clock::now()+ + std::chrono::milliseconds(500); + std::unique_lock lk(m); + while(!done) + { + if(cv.wait_until(lk,timeout)==std::cv_status::timeout) + break; + } + return done; +} diff --git a/listing_4.12.cpp b/listing_4.12.cpp new file mode 100644 index 0000000..f4592ff --- /dev/null +++ b/listing_4.12.cpp @@ -0,0 +1,24 @@ +template +std::list sequential_quick_sort(std::list input) +{ + if(input.empty()) + { + return input; + } + std::list result; + result.splice(result.begin(),input,input.begin()); + T const& pivot=*result.begin(); + auto divide_point=std::partition(input.begin(),input.end(), + [&](T const& t){return t lower_part; + lower_part.splice(lower_part.end(),input,input.begin(), + divide_point); + auto new_lower( + sequential_quick_sort(std::move(lower_part))); + auto new_higher( + sequential_quick_sort(std::move(input))); + result.splice(result.end(),new_higher); + Using synchronization of operations to simplify code + result.splice(result.begin(),new_lower); + return result; +} diff --git a/listing_4.13.cpp b/listing_4.13.cpp new file mode 100644 index 0000000..ae87d08 --- /dev/null +++ b/listing_4.13.cpp @@ -0,0 +1,23 @@ +template +std::list parallel_quick_sort(std::list input) +{ + if(input.empty()) + { + return input; + } + std::list result; + result.splice(result.begin(),input,input.begin()); + T const& pivot=*result.begin(); + auto divide_point=std::partition(input.begin(),input.end(), + [&](T const& t){return t lower_part; + lower_part.splice(lower_part.end(),input,input.begin(), + divide_point); + std::future > new_lower( + std::async(¶llel_quick_sort,std::move(lower_part))); + auto new_higher( + parallel_quick_sort(std::move(input))); + result.splice(result.end(),new_higher); + result.splice(result.begin(),new_lower.get()); + return result; +} diff --git a/listing_4.14.cpp b/listing_4.14.cpp new file mode 100644 index 0000000..e70c58d --- /dev/null +++ b/listing_4.14.cpp @@ -0,0 +1,12 @@ +template +std::future::type> +spawn_task(F&& f,A&& a) +{ + typedef std::result_of::type result_type; + std::packaged_task + task(std::move(f)); + std::future res(task.get_future()); + std::thread t(std::move(task),std::move(a)); + t.detach(); + return res; +} diff --git a/listing_4.15.cpp b/listing_4.15.cpp new file mode 100644 index 0000000..9868de8 --- /dev/null +++ b/listing_4.15.cpp @@ -0,0 +1,43 @@ +struct card_inserted +{ + std::string account; +}; +class atm +{ + messaging::receiver incoming; + messaging::sender bank; + messaging::sender interface_hardware; + void (atm::*state)(); + std::string account; + std::string pin; + void waiting_for_card() + { + interface_hardware.send(display_enter_card()); + incoming.wait() + .handle( + [&](card_inserted const& msg) + { + account=msg.account; + pin=""; + interface_hardware.send(display_enter_pin()); + state=&atm::getting_pin; + } + ); + } + void getting_pin(); +public: + void run() + { + state=&atm::waiting_for_card; + try + { + for(;;) + { + (this->*state)(); + } + } + catch(messaging::close_queue const&) + { + } + } +}; diff --git a/listing_4.16.cpp b/listing_4.16.cpp new file mode 100644 index 0000000..499bb91 --- /dev/null +++ b/listing_4.16.cpp @@ -0,0 +1,31 @@ +void atm::getting_pin() +{ + incoming.wait() + .handle( + [&](digit_pressed const& msg) + { + unsigned const pin_length=4; + pin+=msg.digit; + if(pin.length()==pin_length) + { + bank.send(verify_pin(account,pin,incoming)); + state=&atm::verifying_pin; + } + } + ) + .handle( + [&](clear_last_pressed const& msg) + { + if(!pin.empty()) + { + pin.resize(pin.length()-1); + } + } + ) + .handle( + [&](cancel_pressed const& msg) + { + state=&atm::done_processing; + } + ); +} diff --git a/listing_4.2.cpp b/listing_4.2.cpp new file mode 100644 index 0000000..a413a22 --- /dev/null +++ b/listing_4.2.cpp @@ -0,0 +1,30 @@ +template > +class queue { +public: + explicit queue(const Container&); + explicit queue(Container&& = Container()); + queue(queue&& q); + + template explicit queue(const Alloc&); + template queue(const Container&, const Alloc&); + template queue(Container&&, const Alloc&); + template queue(queue&&, const Alloc&); + + queue& operator=(queue&& q); + void swap(queue&& q); + + bool empty() const; + size_type size() const; + + T& front(); + const T& front() const; + T& back(); + const T& back() const; + + void push(const T& x); + void push(T&& x); + void pop(); +}; + +int main() +{} diff --git a/listing_4.3.cpp b/listing_4.3.cpp new file mode 100644 index 0000000..d2ea1d4 --- /dev/null +++ b/listing_4.3.cpp @@ -0,0 +1,22 @@ +#include +template +class threadsafe_queue +{ +public: + threadsafe_queue(); + threadsafe_queue(const threadsafe_queue&); + threadsafe_queue& operator=(const threadsafe_queue&) = delete; + + void push(T new_value); + + bool try_pop(T& value); + std::shared_ptr try_pop(); + + void wait_and_pop(T& value); + std::shared_ptr wait_and_pop(); + + bool empty() const; +}; + +int main() +{} diff --git a/listing_4.4.cpp b/listing_4.4.cpp new file mode 100644 index 0000000..672d141 --- /dev/null +++ b/listing_4.4.cpp @@ -0,0 +1,58 @@ +#include +#include +#include +template +class threadsafe_queue +{ +private: + std::mutex mut; + std::queue data_queue; + std::condition_variable data_cond; +public: + void push(T new_value) + { + std::lock_guard lk(mut); + data_queue.push(new_value); + data_cond.notify_one(); + } + + void wait_and_pop(T& value) + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + value=data_queue.front(); + data_queue.pop(); + } +}; + + +struct data_chunk +{}; + +data_chunk prepare_data(); +bool more_data_to_prepare(); +void process(data_chunk); +bool is_last_chunk(data_chunk); + +threadsafe_queue data_queue; + +void data_preparation_thread() +{ + while(more_data_to_prepare()) + { + data_chunk const data=prepare_data(); + data_queue.push(data); + } +} + +void data_processing_thread() +{ + while(true) + { + data_chunk data; + data_queue.wait_and_pop(data); + process(data); + if(is_last_chunk(data)) + break; + } +} diff --git a/listing_4.5.cpp b/listing_4.5.cpp new file mode 100644 index 0000000..0e78979 --- /dev/null +++ b/listing_4.5.cpp @@ -0,0 +1,73 @@ +#include +#include +#include +#include + +template +class threadsafe_queue +{ +private: + mutable std::mutex mut; + std::queue data_queue; + std::condition_variable data_cond; +public: + threadsafe_queue() + {} + threadsafe_queue(threadsafe_queue const& other) + { + std::lock_guard lk(other.mut); + data_queue=other.data_queue; + } + + void push(T new_value) + { + std::lock_guard lk(mut); + data_queue.push(new_value); + data_cond.notify_one(); + } + + void wait_and_pop(T& value) + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + value=data_queue.front(); + data_queue.pop(); + } + + std::shared_ptr wait_and_pop() + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + std::shared_ptr res(std::make_shared(data_queue.front())); + data_queue.pop(); + return res; + } + + bool try_pop(T& value) + { + std::lock_guard lk(mut); + if(data_queue.empty) + return false; + value=data_queue.front(); + data_queue.pop(); + } + + std::shared_ptr try_pop() + { + std::lock_guard lk(mut); + if(data_queue.empty()) + return std::shared_ptr(); + std::shared_ptr res(std::make_shared(data_queue.front())); + data_queue.pop(); + return res; + } + + bool empty() const + { + std::lock_guard lk(mut); + return data_queue.empty(); + } +}; + +int main() +{} diff --git a/listing_4.6.cpp b/listing_4.6.cpp new file mode 100644 index 0000000..3965eb1 --- /dev/null +++ b/listing_4.6.cpp @@ -0,0 +1,16 @@ +#include +#include +int find_the_answer_to_ltuae() +{ + return 42; +} + +void do_other_stuff() +{} + +int main() +{ + std::future the_answer=std::async(find_the_answer_to_ltuae); + do_other_stuff(); + std::cout<<"The answer is "< +#include + +struct X +{ + void foo(int,std::string const&); + std::string bar(std::string const&); +}; + + +X x; +auto f1=std::async(&X::foo,&x,42,"hello"); +auto f2=std::async(&X::bar,x,"goodbye"); + +struct Y +{ + double operator()(double); +}; +Y y; +auto f3=std::async(Y(),3.141); +auto f4=std::async(std::ref(y),2.718); +X baz(X&); +auto f6=std::async(baz,std::ref(x)); +class move_only +{ +public: + move_only(); + move_only(move_only&&); + move_only(move_only const&) = delete; + move_only& operator=(move_only&&); + move_only& operator=(move_only const&) = delete; + void operator()(); +}; +auto f5=std::async(move_only()); diff --git a/listing_4.8.cpp b/listing_4.8.cpp new file mode 100644 index 0000000..a3f46c9 --- /dev/null +++ b/listing_4.8.cpp @@ -0,0 +1,9 @@ +template<> +class packaged_task*,int)> +{ +public: + template + explicit packaged_task(Callable&& f); + std::future get_future(); + void operator()(std::vector*,int); +}; diff --git a/listing_4.9.cpp b/listing_4.9.cpp new file mode 100644 index 0000000..0b86a3f --- /dev/null +++ b/listing_4.9.cpp @@ -0,0 +1,40 @@ +#include +#include +#include +#include +#include + +std::mutex m; +std::deque > tasks; + +bool gui_shutdown_message_received(); +void get_and_process_gui_message(); + +void gui_thread() +{ + while(!gui_shutdown_message_received()) + { + get_and_process_gui_message(); + std::packaged_task task; + { + std::lock_guard lk(m); + if(tasks.empty()) + continue; + task=std::move(tasks.front()); + tasks.pop_front(); + } + task(); + } +} + +std::thread gui_bg_thread(gui_thread); + +template +std::future post_task_for_gui_thread(Func f) +{ + std::packaged_task task(f); + std::future res=task.get_future(); + std::lock_guard lk(m); + tasks.push_back(std::move(task)); + return res; +} diff --git a/listing_5.1.cpp b/listing_5.1.cpp new file mode 100644 index 0000000..e2441b3 --- /dev/null +++ b/listing_5.1.cpp @@ -0,0 +1,16 @@ +class spinlock_mutex +{ + std::atomic_flag flag; +public: + spinlock_mutex(): + flag(ATOMIC_FLAG_INIT) + {} + void lock() + { + while(flag.test_and_set(std::memory_order_acquire)); + } + void unlock() + { + flag.clear(std::memory_order_release); + } +}; diff --git a/listing_5.10.cpp b/listing_5.10.cpp new file mode 100644 index 0000000..2de655f --- /dev/null +++ b/listing_5.10.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +struct X +{ + int i; + std::string s; +}; + +std::atomic p; +std::atomic a; + +void create_x() +{ + X* x=new X; + x->i=42; + x->s="hello"; + a.store(99,std::memory_order_relaxed); + p.store(x,std::memory_order_release); +} + +void use_x() +{ + X* x; + while(!(x=p.load(std::memory_order_consume))) + std::this_thread::sleep_for(std::chrono::microseconds(1)); + assert(x->i==42); + assert(x->s=="hello"); + assert(a.load(std::memory_order_relaxed)==99); +} +int main() +{ + std::thread t1(create_x); + std::thread t2(use_x); + t1.join(); + t2.join(); +} + diff --git a/listing_5.11.cpp b/listing_5.11.cpp new file mode 100644 index 0000000..deef694 --- /dev/null +++ b/listing_5.11.cpp @@ -0,0 +1,41 @@ +#include +#include + +std::vector queue_data; +std::atomic count; + +void populate_queue() +{ + unsigned const number_of_items=20; + queue_data.clear(); + for(unsigned i=0;i +#include +#include + +std::atomic x,y; +std::atomic z; + +void write_x_then_y() +{ + x.store(true,std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_release); + y.store(true,std::memory_order_relaxed); +} + +void read_y_then_x() +{ + while(!y.load(std::memory_order_relaxed)); + std::atomic_thread_fence(std::memory_order_acquire); + if(x.load(std::memory_order_relaxed)) + ++z; +} + +int main() +{ + x=false; + y=false; + z=0; + std::thread a(write_x_then_y); + std::thread b(read_y_then_x); + a.join(); + b.join(); + assert(z.load()!=0); +} diff --git a/listing_5.13.cpp b/listing_5.13.cpp new file mode 100644 index 0000000..a5de692 --- /dev/null +++ b/listing_5.13.cpp @@ -0,0 +1,34 @@ +#include +#include +#include + +bool x=false; +std::atomic y; +std::atomic z; + +void write_x_then_y() +{ + x=true; + std::atomic_thread_fence(std::memory_order_release); + y.store(true,std::memory_order_relaxed); +} + +void read_y_then_x() +{ + while(!y.load(std::memory_order_relaxed)); + std::atomic_thread_fence(std::memory_order_acquire); + if(x) + ++z; +} + +int main() +{ + x=false; + y=false; + z=0; + std::thread a(write_x_then_y); + std::thread b(read_y_then_x); + a.join(); + b.join(); + assert(z.load()!=0); +} diff --git a/listing_5.2.cpp b/listing_5.2.cpp new file mode 100644 index 0000000..616bc71 --- /dev/null +++ b/listing_5.2.cpp @@ -0,0 +1,22 @@ +#include +#include +#include +#include +#include + +std::vector data; +std::atomic_bool data_ready(false); + +void reader_thread() +{ + while(!data_ready.load()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + std::cout<<"The answer="< + +void foo(int a,int b) +{ + std::cout< +#include +#include + +std::atomic x,y; +std::atomic z; + +void write_x() +{ + x.store(true,std::memory_order_seq_cst); +} + +void write_y() +{ + y.store(true,std::memory_order_seq_cst); +} + +void read_x_then_y() +{ + while(!x.load(std::memory_order_seq_cst)); + if(y.load(std::memory_order_seq_cst)) + ++z; +} + +void read_y_then_x() +{ + while(!y.load(std::memory_order_seq_cst)); + if(x.load(std::memory_order_seq_cst)) + ++z; +} + +int main() +{ + x=false; + y=false; + z=0; + std::thread a(write_x); + std::thread b(write_y); + std::thread c(read_x_then_y); + std::thread d(read_y_then_x); + a.join(); + b.join(); + c.join(); + d.join(); + assert(z.load()!=0); +} diff --git a/listing_5.5.cpp b/listing_5.5.cpp new file mode 100644 index 0000000..057d03b --- /dev/null +++ b/listing_5.5.cpp @@ -0,0 +1,31 @@ +#include +#include +#include + +std::atomic x,y; +std::atomic z; + +void write_x_then_y() +{ + x.store(true,std::memory_order_relaxed); + y.store(true,std::memory_order_relaxed); +} + +void read_y_then_x() +{ + while(!y.load(std::memory_order_relaxed)); + if(x.load(std::memory_order_relaxed)) + ++z; +} + +int main() +{ + x=false; + y=false; + z=0; + std::thread a(write_x_then_y); + std::thread b(read_y_then_x); + a.join(); + b.join(); + assert(z.load()!=0); +} diff --git a/listing_5.6.cpp b/listing_5.6.cpp new file mode 100644 index 0000000..e6045a5 --- /dev/null +++ b/listing_5.6.cpp @@ -0,0 +1,74 @@ +#include +#include +#include + +std::atomic x(0),y(0),z(0); +std::atomic go(false); +unsigned const loop_count=10; + +struct read_values +{ + int x,y,z; +}; +read_values values1[loop_count]; +read_values values2[loop_count]; +read_values values3[loop_count]; +read_values values4[loop_count]; +read_values values5[loop_count]; +void increment(std::atomic* var_to_inc,read_values* values) +{ + while(!go) + std::this_thread::yield(); + for(unsigned i=0;istore(i+1,std::memory_order_relaxed); + std::this_thread::yield(); + } +} + +void read_vals(read_values* values) +{ + while(!go) + std::this_thread::yield(); + for(unsigned i=0;i +#include +#include + +std::atomic x,y; +std::atomic z; + +void write_x() +{ + x.store(true,std::memory_order_release); +} + +void write_y() +{ + y.store(true,std::memory_order_release); +} + +void read_x_then_y() +{ + while(!x.load(std::memory_order_acquire)); + if(y.load(std::memory_order_acquire)) + ++z; +} + +void read_y_then_x() +{ + while(!y.load(std::memory_order_acquire)); + if(x.load(std::memory_order_acquire)) + ++z; +} + +int main() +{ + x=false; + y=false; + z=0; + std::thread a(write_x); + std::thread b(write_y); + std::thread c(read_x_then_y); + std::thread d(read_y_then_x); + a.join(); + b.join(); + c.join(); + d.join(); + assert(z.load()!=0); +} diff --git a/listing_5.8.cpp b/listing_5.8.cpp new file mode 100644 index 0000000..19c1612 --- /dev/null +++ b/listing_5.8.cpp @@ -0,0 +1,31 @@ +#include +#include +#include + +std::atomic x,y; +std::atomic z; + +void write_x_then_y() +{ + x.store(true,std::memory_order_relaxed); + y.store(true,std::memory_order_release); +} + +void read_y_then_x() +{ + while(!y.load(std::memory_order_acquire)); + if(x.load(std::memory_order_relaxed)) + ++z; +} + +int main() +{ + x=false; + y=false; + z=0; + std::thread a(write_x_then_y); + std::thread b(read_y_then_x); + a.join(); + b.join(); + assert(z.load()!=0); +} diff --git a/listing_5.9.cpp b/listing_5.9.cpp new file mode 100644 index 0000000..992ae56 --- /dev/null +++ b/listing_5.9.cpp @@ -0,0 +1,42 @@ +#include +#include +#include + +std::atomic data[5]; +std::atomic sync1(false),sync2(false); + +void thread_1() +{ + data[0].store(42,std::memory_order_relaxed); + data[1].store(97,std::memory_order_relaxed); + data[2].store(17,std::memory_order_relaxed); + data[3].store(-141,std::memory_order_relaxed); + data[4].store(2003,std::memory_order_relaxed); + sync1.store(true,std::memory_order_release); +} + +void thread_2() +{ + while(!sync1.load(std::memory_order_acquire)); + sync2.store(std::memory_order_release); +} + +void thread_3() +{ + while(!sync2.load(std::memory_order_acquire)); + assert(data[0].load(std::memory_order_relaxed)==42); + assert(data[1].load(std::memory_order_relaxed)==97); + assert(data[2].load(std::memory_order_relaxed)==17); + assert(data[3].load(std::memory_order_relaxed)==-141); + assert(data[4].load(std::memory_order_relaxed)==2003); +} + +int main() +{ + std::thread t1(thread_1); + std::thread t2(thread_2); + std::thread t3(thread_3); + t1.join(); + t2.join(); + t3.join(); +} diff --git a/listing_6.1.cpp b/listing_6.1.cpp new file mode 100644 index 0000000..011938a --- /dev/null +++ b/listing_6.1.cpp @@ -0,0 +1,56 @@ +#include +#include +#include +#include + +struct empty_stack: std::exception +{ + const char* what() const throw() + { + return "empty stack"; + } +}; + +template +class threadsafe_stack +{ +private: + std::stack data; + mutable std::mutex m; +public: + threadsafe_stack(){} + threadsafe_stack(const threadsafe_stack& other) + { + std::lock_guard lock(other.m); + data=other.data; + } + threadsafe_stack& operator=(const threadsafe_stack&) = delete; + + void push(T new_value) + { + std::lock_guard lock(m); + data.push(std::move(new_value)); + } + std::shared_ptr pop() + { + std::lock_guard lock(m); + if(data.empty()) throw empty_stack(); + std::shared_ptr const res( + std::make_shared(std::move(data.top()))); + data.pop(); + return res; + } + void pop(T& value) + { + std::lock_guard lock(m); + if(data.empty()) throw empty_stack(); + value=std::move(data.top()); + data.pop(); + } + bool empty() const + { + std::lock_guard lock(m); + return data.empty(); + } +}; + diff --git a/listing_6.10.cpp b/listing_6.10.cpp new file mode 100644 index 0000000..79f57f0 --- /dev/null +++ b/listing_6.10.cpp @@ -0,0 +1,44 @@ +template +class threadsafe_queue +{ +private: + std::unique_ptr try_pop_head() + { + std::lock_guard head_lock(head_mutex); + if(head.get()==get_tail()) + { + return std::unique_ptr(); + } + return pop_head(); + } + + std::unique_ptr try_pop_head(T& value) + { + std::lock_guard head_lock(head_mutex); + if(head.get()==get_tail()) + { + return std::unique_ptr(); + } + value=std::move(*head->data); + return pop_head(); + } + +public: + std::shared_ptr try_pop() + { + std::unique_ptr const old_head=try_pop_head(); + return old_head?old_head->data:std::shared_ptr(); + } + + bool try_pop(T& value) + { + std::unique_ptr const old_head=try_pop_head(value); + return old_head; + } + + void empty() + { + std::lock_guard head_lock(head_mutex); + return (head==get_tail()); + } +}; diff --git a/listing_6.11.cpp b/listing_6.11.cpp new file mode 100644 index 0000000..ff1492a --- /dev/null +++ b/listing_6.11.cpp @@ -0,0 +1,108 @@ +#include +#include +#include +#include +#include +#include +#include + +template > +class threadsafe_lookup_table +{ +private: + class bucket_type + { + private: + typedef std::pair bucket_value; + typedef std::list bucket_data; + typedef typename bucket_data::iterator bucket_iterator; + + bucket_data data; + mutable boost::shared_mutex mutex; + + bucket_iterator find_entry_for(Key const& key) const + { + return std::find_if(data.begin(),data.end(), + [&](bucket_value const& item) + {return item.first==key;}); + } + public: + Value value_for(Key const& key,Value const& default_value) const + { + boost::shared_lock lock(mutex); + bucket_iterator const found_entry=find_entry_for(key); + return (found_entry==data.end())? + default_value : found_entry->second; + } + + void add_or_update_mapping(Key const& key,Value const& value) + { + std::unique_lock lock(mutex); + bucket_iterator const found_entry=find_entry_for(key); + if(found_entry==data.end()) + { + data.push_back(bucket_value(key,value)); + } + else + { + found_entry->second=value; + } + } + + void remove_mapping(Key const& key) + { + std::unique_lock lock(mutex); + bucket_iterator const found_entry=find_entry_for(key); + if(found_entry!=data.end()) + { + data.erase(found_entry); + } + } + }; + + std::vector > buckets; + Hash hasher; + + bucket_type& get_bucket(Key const& key) const + { + std::size_t const bucket_index=hasher(key)%buckets.size(); + return *buckets[bucket_index]; + } + +public: + typedef Key key_type; + typedef Value mapped_type; + typedef Hash hash_type; + + threadsafe_lookup_table( + unsigned num_buckets=19, Hash const& hasher_=Hash()): + buckets(num_buckets),hasher(hasher_) + { + for(unsigned i=0;i threadsafe_lookup_table::get_map() const +{ + std::vector > locks; + for(unsigned i=0;i(buckets[i].mutex)); + } + std::map res; + for(unsigned i=0;i +#include + +template +class threadsafe_list +{ + struct node + { + std::mutex m; + std::shared_ptr data; + std::unique_ptr next; + + node(): + next() + {} + + node(T const& value): + data(std::make_shared(value)) + {} + }; + + node head; + +public: + threadsafe_list() + {} + + ~threadsafe_list() + { + remove_if([](T const&){return true;}); + } + + threadsafe_list(threadsafe_list const& other)=delete; + threadsafe_list& operator=(threadsafe_list const& other)=delete; + + void push_front(T const& value) + { + std::unique_ptr new_node(new node(value)); + std::lock_guard lk(head.m); + new_node->next=std::move(head.next); + head.next=std::move(new_node); + } + + template + void for_each(Function f) + { + node* current=&head; + std::unique_lock lk(head.m); + while(node* const next=current->next.get()) + { + std::unique_lock next_lk(next->m); + lk.unlock(); + f(*next->data); + current=next; + lk=std::move(next_lk); + } + } + + template + std::shared_ptr find_first_if(Predicate p) + { + node* current=&head; + std::unique_lock lk(head.m); + while(node* const next=current->next.get()) + { + std::unique_lock next_lk(next->m); + lk.unlock(); + if(p(*next->data)) + { + return next->data; + } + current=next; + lk=std::move(next_lk); + } + return std::shared_ptr(); + } + + template + void remove_if(Predicate p) + { + node* current=&head; + std::unique_lock lk(head.m); + while(node* const next=current->next.get()) + { + std::unique_lock next_lk(next->m); + if(p(*next->data)) + { + std::unique_ptr old_next=std::move(current->next); + current->next=std::move(next->next); + next_lk.unlock(); + } + else + { + lk.unlock(); + current=next; + lk=std::move(next_lk); + } + } + } +}; + diff --git a/listing_6.2.cpp b/listing_6.2.cpp new file mode 100644 index 0000000..c0aed9e --- /dev/null +++ b/listing_6.2.cpp @@ -0,0 +1,72 @@ +#include +#include +#include +#include + +template +class threadsafe_queue +{ +private: + mutable std::mutex mut; + std::queue data_queue; + std::condition_variable data_cond; +public: + threadsafe_queue() + {} + + void push(T new_value) + { + std::lock_guard lk(mut); + data_queue.push(std::move(new_value)); + data_cond.notify_one(); + } + + void wait_and_pop(T& value) + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + value=std::move(data_queue.front()); + data_queue.pop(); + } + + std::shared_ptr wait_and_pop() + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + std::shared_ptr res( + std::make_shared(std::move(data_queue.front()))); + data_queue.pop(); + return res; + } + + bool try_pop(T& value) + { + std::lock_guard lk(mut); + if(data_queue.empty()) + return false; + value=std::move(data_queue.front()); + data_queue.pop(); + } + + std::shared_ptr try_pop() + { + std::lock_guard lk(mut); + if(data_queue.empty()) + return std::shared_ptr(); + std::shared_ptr res( + std::make_shared(std::move(data_queue.front()))); + data_queue.pop(); + return res; + } + + bool empty() const + { + std::lock_guard lk(mut); + return data_queue.empty(); + } +}; + +int main() +{ + threadsafe_queue rq; +} diff --git a/listing_6.3.cpp b/listing_6.3.cpp new file mode 100644 index 0000000..35c0519 --- /dev/null +++ b/listing_6.3.cpp @@ -0,0 +1,68 @@ +#include +#include +#include +#include + +template +class threadsafe_queue +{ +private: + mutable std::mutex mut; + std::queue > data_queue; + std::condition_variable data_cond; +public: + threadsafe_queue() + {} + + void wait_and_pop(T& value) + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + value=std::move(*data_queue.front()); + data_queue.pop(); + } + + bool try_pop(T& value) + { + std::lock_guard lk(mut); + if(data_queue.empty()) + return false; + value=std::move(*data_queue.front()); + data_queue.pop(); + } + + std::shared_ptr wait_and_pop() + { + std::unique_lock lk(mut); + data_cond.wait(lk,[this]{return !data_queue.empty();}); + std::shared_ptr res=data_queue.front(); + data_queue.pop(); + return res; + } + + std::shared_ptr try_pop() + { + std::lock_guard lk(mut); + if(data_queue.empty()) + return std::shared_ptr(); + std::shared_ptr res=data_queue.front(); + data_queue.pop(); + return res; + } + + bool empty() const + { + std::lock_guard lk(mut); + return data_queue.empty(); + } + + void push(T new_value) + { + std::shared_ptr data( + std::make_shared(std::move(new_value))); + std::lock_guard lk(mut); + data_queue.push(data); + data_cond.notify_one(); + } + +}; diff --git a/listing_6.4.cpp b/listing_6.4.cpp new file mode 100644 index 0000000..187b151 --- /dev/null +++ b/listing_6.4.cpp @@ -0,0 +1,56 @@ +#include + +template +class queue +{ +private: + struct node + { + T data; + std::unique_ptr next; + + node(T data_): + data(std::move(data_)) + {} + }; + + std::unique_ptr head; + node* tail; + +public: + queue(): + tail(nullptr) + {} + + queue(const queue& other)=delete; + queue& operator=(const queue& other)=delete; + + std::shared_ptr try_pop() + { + if(!head) + { + return std::shared_ptr(); + } + std::shared_ptr const res( + std::make_shared(std::move(head->data))); + std::unique_ptr const old_head=std::move(head); + head=std::move(old_head->next); + return res; + } + + void push(T new_value) + { + std::unique_ptr p(new node(std::move(new_value))); + node* const new_tail=p.get(); + if(tail) + { + tail->next=std::move(p); + } + else + { + head=std::move(p); + } + tail=new_tail; + } +}; + diff --git a/listing_6.5.cpp b/listing_6.5.cpp new file mode 100644 index 0000000..661114b --- /dev/null +++ b/listing_6.5.cpp @@ -0,0 +1,46 @@ +#include +template +class queue +{ +private: + struct node + { + std::shared_ptr data; + std::unique_ptr next; + }; + + std::unique_ptr head; + node* tail; + +public: + queue(): + head(new node),tail(head.get()) + {} + + queue(const queue& other)=delete; + queue& operator=(const queue& other)=delete; + + std::shared_ptr try_pop() + { + if(head.get()==tail) + { + return std::shared_ptr(); + } + std::shared_ptr const res(head->data); + std::unique_ptr const old_head=std::move(head); + head=std::move(old_head->next); + return res; + } + + void push(T new_value) + { + std::shared_ptr new_data( + std::make_shared(std::move(new_value))); + std::unique_ptr p(new node); + tail->data=new_data; + node* const new_tail=p.get(); + tail->next=std::move(p); + tail=new_tail; + } +}; + diff --git a/listing_6.6.cpp b/listing_6.6.cpp new file mode 100644 index 0000000..6a6eb54 --- /dev/null +++ b/listing_6.6.cpp @@ -0,0 +1,64 @@ +#include +#include + +template +class threadsafe_queue +{ +private: + struct node + { + std::shared_ptr data; + std::unique_ptr next; + }; + + std::mutex head_mutex; + std::unique_ptr head; + std::mutex tail_mutex; + node* tail; + + node* get_tail() + { + std::lock_guard tail_lock(tail_mutex); + return tail; + } + + std::unique_ptr pop_head() + { + std::lock_guard head_lock(head_mutex); + if(head.get()==get_tail()) + { + return nullptr; + } + std::unique_ptr const old_head=std::move(head); + head=std::move(old_head->next); + return old_head; + } + + +public: + threadsafe_queue(): + head(new node),tail(head.get()) + {} + + threadsafe_queue(const threadsafe_queue& other)=delete; + threadsafe_queue& operator=(const threadsafe_queue& other)=delete; + + std::shared_ptr try_pop() + { + std::unique_ptr old_head=pop_head(); + return old_head?old_head->data:std::shared_ptr(); + } + + void push(T new_value) + { + std::shared_ptr new_data( + std::make_shared(std::move(new_value))); + std::unique_ptr p(new node); + node* const new_tail=p.get(); + std::lock_guard tail_lock(tail_mutex); + tail->data=new_data; + tail->next=std::move(p); + tail=new_tail; + } +}; + diff --git a/listing_6.7.cpp b/listing_6.7.cpp new file mode 100644 index 0000000..7497a72 --- /dev/null +++ b/listing_6.7.cpp @@ -0,0 +1,29 @@ +template +class threadsafe_queue +{ +private: + struct node + { + std::shared_ptr data; + std::unique_ptr next; + }; + + std::mutex head_mutex; + std::unique_ptr head; + std::mutex tail_mutex; + node* tail; + std::condition_variable data_cond; +public: + threadsafe_queue(): + head(new node),tail(head.get()) + {} + threadsafe_queue(const threadsafe_queue& other)=delete; + threadsafe_queue& operator=(const threadsafe_queue& other)=delete; + + std::shared_ptr try_pop(); + bool try_pop(T& value); + std::shared_ptr wait_and_pop(); + void wait_and_pop(T& value); + void push(T new_value); + void empty(); +}; diff --git a/listing_6.8.cpp b/listing_6.8.cpp new file mode 100644 index 0000000..c042254 --- /dev/null +++ b/listing_6.8.cpp @@ -0,0 +1,15 @@ +template +void threadsafe_queue::push(T new_value) +{ + std::shared_ptr new_data( + std::make_shared(std::move(new_value))); + std::unique_ptr p(new node); + { + std::lock_guard tail_lock(tail_mutex); + tail->data=new_data; + node* const new_tail=p.get(); + tail->next=std::move(p); + tail=new_tail; + } + data_cond.notify_one(); +} diff --git a/listing_6.9.cpp b/listing_6.9.cpp new file mode 100644 index 0000000..d18a15e --- /dev/null +++ b/listing_6.9.cpp @@ -0,0 +1,49 @@ +template +class threadsafe_queue +{ +private: + node* get_tail() + { + std::lock_guard tail_lock(tail_mutex); + return tail; + } + + std::unique_ptr pop_head() + { + std::unique_ptr const old_head=std::move(head); + head=std::move(old_head->next); + return old_head; + } + + std::unique_lock wait_for_data() + { + std::unique_lock head_lock(head_mutex); + data_cond.wait(head_lock,[&]{return head!=get_tail();}); + return std::move(head_lock); + } + + std::unique_ptr wait_pop_head() + { + std::unique_lock head_lock(wait_for_data()); + return pop_head(); + } + + std::unique_ptr wait_pop_head(T& value) + { + std::unique_lock head_lock(wait_for_data()); + value=std::move(*head->data); + return pop_head(); + } + +public: + std::shared_ptr wait_and_pop() + { + std::unique_ptr const old_head=wait_pop_head(); + return old_head->data; + } + + void wait_and_pop(T& value) + { + std::unique_ptr const old_head=wait_pop_head(value); + } +}; diff --git a/listing_7.1.cpp b/listing_7.1.cpp new file mode 100644 index 0000000..8d141a8 --- /dev/null +++ b/listing_7.1.cpp @@ -0,0 +1,18 @@ +#include + +class spinlock_mutex +{ + std::atomic_flag flag; +public: + spinlock_mutex(): + flag(ATOMIC_FLAG_INIT) + {} + void lock() + { + while(flag.test_and_set(std::memory_order_acquire)); + } + void unlock() + { + flag.clear(std::memory_order_release); + } +}; diff --git a/listing_7.10.cpp b/listing_7.10.cpp new file mode 100644 index 0000000..3f05227 --- /dev/null +++ b/listing_7.10.cpp @@ -0,0 +1,38 @@ +#include +#include + +template +class lock_free_stack +{ +private: + struct node; + struct counted_node_ptr + { + int external_count; + node* ptr; + }; + struct node + { + std::shared_ptr data; + std::atomic internal_count; + counted_node_ptr next; + node(T const& data_): + data(std::make_shared(data_)), + internal_count(0) + {} + }; + std::atomic head; +public: + ~lock_free_stack() + { + while(pop()); + } + void push(T const& data) + { + counted_node_ptr new_node; + new_node.ptr=new node(data); + new_node.external_count=1; + new_node.ptr->next=head.load(); + while(!head.compare_exchange_weak(new_node.ptr->next,new_node)); + } +}; diff --git a/listing_7.11.cpp b/listing_7.11.cpp new file mode 100644 index 0000000..72837d0 --- /dev/null +++ b/listing_7.11.cpp @@ -0,0 +1,46 @@ +template +class lock_free_stack +{ +private: + void increase_head_count(counted_node_ptr& old_counter) + { + counted_node_ptr new_counter; + do + { + new_counter=old_counter; + ++new_counter.external_count; + } + while(!head.compare_exchange_strong(old_counter,new_counter)); + old_counter.external_count=new_counter.external_count; + } +public: + std::shared_ptr pop() + { + counted_node_ptr old_head=head.load(); + for(;;) + { + increase_head_count(old_head); + node* const ptr=old_head.ptr; + if(!ptr) + { + return std::shared_ptr(); + } + if(head.compare_exchange_strong(old_head,ptr->next)) + { + std::shared_ptr res; + res.swap(ptr->data); + int const count_increase=old_head.external_count-2; + if(ptr->internal_count.fetch_add(count_increase)== + -count_increase) + { + delete ptr; + } + return res; + } + else if(ptr->internal_count.fetch_sub(1)==1) + { + delete ptr; + } + } + } +}; diff --git a/listing_7.12.cpp b/listing_7.12.cpp new file mode 100644 index 0000000..03756e5 --- /dev/null +++ b/listing_7.12.cpp @@ -0,0 +1,88 @@ +#include +#include + +template +class lock_free_stack +{ +private: + struct node; + struct counted_node_ptr + { + int external_count; + node* ptr; + }; + struct node + { + std::shared_ptr data; + std::atomic internal_count; + counted_node_ptr next; + node(T const& data_): + data(std::make_shared(data_)), + internal_count(0) + {} + }; + std::atomic head; + void increase_head_count(counted_node_ptr& old_counter) + { + counted_node_ptr new_counter; + do + { + new_counter=old_counter; + ++new_counter.external_count; + } + while(!head.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire, + std::memory_order_relaxed)); + old_counter.external_count=new_counter.external_count; + } +public: + ~lock_free_stack() + { + while(pop()); + } + void push(T const& data) + { + counted_node_ptr new_node; + new_node.ptr=new node(data); + new_node.external_count=1; + new_node.ptr->next=head.load(std::memory_order_relaxed) + while(!head.compare_exchange_weak( + new_node.ptr->next,new_node, + std::memory_order_release, + std::memory_order_relaxed)); + } + std::shared_ptr pop() + { + counted_node_ptr old_head= + head.load(std::memory_order_relaxed); + for(;;) + { + increase_head_count(old_head); + node* const ptr=old_head.ptr; + if(!ptr) + { + return std::shared_ptr(); + } + if(head.compare_exchange_strong( + old_head,ptr->next,std::memory_order_relaxed)) + { + std::shared_ptr res; + res.swap(ptr->data); + int const count_increase=old_head.external_count-2; + if(ptr->internal_count.fetch_add( + count_increase,std::memory_order_release)==-count_increase) + { + delete ptr; + } + return res; + } + else if(ptr->internal_count.fetch_add( + -1,std::memory_order_relaxed)==1) + { + ptr->internal_count.load(std::memory_order_acquire); + delete ptr; + } + } + } +}; diff --git a/listing_7.13.cpp b/listing_7.13.cpp new file mode 100644 index 0000000..f4a4f42 --- /dev/null +++ b/listing_7.13.cpp @@ -0,0 +1,62 @@ +#include +#include + +template +class lock_free_queue +{ +private: + struct node + { + std::shared_ptr data; + node* next; + node(): + next(nullptr) + {} + }; + std::atomic head; + std::atomic tail; + node* pop_head() + { + node* const old_head=head.load(); + if(old_head==tail.load()) + { + return nullptr; + } + head.store(old_head->next); + return old_head; + } +public: + lock_free_queue(): + head(new node),tail(head.load()) + {} + lock_free_queue(const lock_free_queue& other)=delete; + lock_free_queue& operator=(const lock_free_queue& other)=delete; + ~lock_free_queue() + { + while(node* const old_head=head.load()) + { + head.store(old_head->next); + delete old_head; + } + } + std::shared_ptr pop() + { + node* old_head=pop_head(); + if(!old_head) + { + return std::shared_ptr(); + } + std::shared_ptr const res(old_head->data); + delete old_head; + return res; + } + void push(T new_value) + { + std::shared_ptr new_data(std::make_shared(new_value)); + node* p=new node; + node* const old_tail=tail.load(); + old_tail->data.swap(new_data); + old_tail->next=p; + tail.store(p); + } +}; diff --git a/listing_7.14.cpp b/listing_7.14.cpp new file mode 100644 index 0000000..0aced25 --- /dev/null +++ b/listing_7.14.cpp @@ -0,0 +1,20 @@ +void push(T new_value) +{ + std::unique_ptr new_data(new T(new_value)); + counted_node_ptr new_next; + new_next.ptr=new node; + new_next.external_count=1; + for(;;) + { + node* const old_tail=tail.load(); + T* old_data=nullptr; + if(old_tail->data.compare_exchange_strong( + old_data,new_data.get())) + { + old_tail->next=new_next; + tail.store(new_next.ptr); + new_data.release(); + break; + } + } +} diff --git a/listing_7.15.cpp b/listing_7.15.cpp new file mode 100644 index 0000000..2342ad3 --- /dev/null +++ b/listing_7.15.cpp @@ -0,0 +1,59 @@ +#include + +template +class lock_free_queue +{ +private: + struct node; + struct counted_node_ptr + { + int external_count; + node* ptr; + }; + std::atomic head; + std::atomic tail; + struct node_counter + { + unsigned internal_count:30; + unsigned external_counters:2; + }; + struct node + { + std::atomic data; + std::atomic count; + counted_node_ptr next; + node() + { + node_counter new_count; + new_count.internal_count=0; + new_count.external_counters=2; + count.store(new_count); + next.ptr=nullptr; + next.external_count=0; + } + }; +public: + void push(T new_value) + { + std::unique_ptr new_data(new T(new_value)); + counted_node_ptr new_next; + new_next.ptr=new node; + new_next.external_count=1; + counted_node_ptr old_tail=tail.load(); + for(;;) + { + increase_external_count(tail,old_tail); + T* old_data=nullptr; + if(old_tail.ptr->data.compare_exchange_strong( + old_data,new_data.get())) + { + old_tail.ptr->next=new_next; + old_tail=tail.exchange(new_next); + free_external_counter(old_tail); + new_data.release(); + break; + } + old_tail.ptr->release_ref(); + } + } +}; diff --git a/listing_7.16.cpp b/listing_7.16.cpp new file mode 100644 index 0000000..a3d74bc --- /dev/null +++ b/listing_7.16.cpp @@ -0,0 +1,31 @@ +template +class lock_free_queue +{ +private: + struct node + { + void release_ref(); + }; +public: + std::unique_ptr pop() + { + counted_node_ptr old_head=head.load(std::memory_order_relaxed); + for(;;) + { + increase_external_count(head,old_head); + node* const ptr=old_head.ptr; + if(ptr==tail.load().ptr) + { + ptr->release_ref(); + return std::unique_ptr(); + } + if(head.compare_exchange_strong(old_head,ptr->next)) + { + T* const res=ptr->data.exchange(nullptr); + free_external_counter(old_head); + return std::unique_ptr(res); + } + ptr->release_ref(); + } + } +}; diff --git a/listing_7.17.cpp b/listing_7.17.cpp new file mode 100644 index 0000000..3262a51 --- /dev/null +++ b/listing_7.17.cpp @@ -0,0 +1,27 @@ +template +class lock_free_queue +{ +private: + struct node + { + void release_ref() + { + node_counter old_counter= + count.load(std::memory_order_relaxed); + node_counter new_counter; + do + { + new_counter=old_counter; + --new_counter.internal_count; + } + while(!count.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire,std::memory_order_relaxed)); + if(!new_counter.internal_count && + !new_counter.external_counters) + { + delete this; + } + } + }; +}; diff --git a/listing_7.18.cpp b/listing_7.18.cpp new file mode 100644 index 0000000..60f2291 --- /dev/null +++ b/listing_7.18.cpp @@ -0,0 +1,20 @@ +template +class lock_free_queue +{ +private: + static void increase_external_count( + std::atomic& counter, + counted_node_ptr& old_counter) + { + counted_node_ptr new_counter; + do + { + new_counter=old_counter; + ++new_counter.external_count; + } + while(!counter.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire,std::memory_order_relaxed)); + old_counter.external_count=new_counter.external_count; + } +}; diff --git a/listing_7.19.cpp b/listing_7.19.cpp new file mode 100644 index 0000000..0667b55 --- /dev/null +++ b/listing_7.19.cpp @@ -0,0 +1,27 @@ +template +class lock_free_queue +{ +private: + static void free_external_counter(counted_node_ptr &old_node_ptr) + { + node* const ptr=old_node_ptr.ptr; + int const count_increase=old_node_ptr.external_count-2; + node_counter old_counter= + ptr->count.load(std::memory_order_relaxed); + node_counter new_counter; + do + { + new_counter=old_counter; + --new_counter.external_counters; + new_counter.internal_count+=count_increase; + } + while(!ptr->count.compare_exchange_strong( + old_counter,new_counter, + std::memory_order_acquire,std::memory_order_relaxed)); + if(!new_counter.internal_count && + !new_counter.external_counters) + { + delete ptr; + } + } +}; diff --git a/listing_7.2.cpp b/listing_7.2.cpp new file mode 100644 index 0000000..f16271c --- /dev/null +++ b/listing_7.2.cpp @@ -0,0 +1,23 @@ +#include + +template +class lock_free_stack +{ +private: + struct node + { + T data; + node* next; + node(T const& data_): + data(data_) + {} + }; + std::atomic head; +public: + void push(T const& data) + { + node* const new_node=new node(data); + new_node->next=head.load(); + while(!head.compare_exchange_weak(new_node->next,new_node)); + } +}; diff --git a/listing_7.20.cpp b/listing_7.20.cpp new file mode 100644 index 0000000..c7de500 --- /dev/null +++ b/listing_7.20.cpp @@ -0,0 +1,33 @@ +template +class lock_free_queue +{ +private: + struct node + { + std::atomic data; + std::atomic count; + std::atomic next; + }; +public: + std::unique_ptr pop() + { + counted_node_ptr old_head=head.load(std::memory_order_relaxed); + for(;;) + { + increase_external_count(head,old_head); + node* const ptr=old_head.ptr; + if(ptr==tail.load().ptr) + { + return std::unique_ptr(); + } + counted_node_ptr next=ptr->next.load(); + if(head.compare_exchange_strong(old_head,next)) + { + T* const res=ptr->data.exchange(nullptr); + free_external_counter(old_head); + return std::unique_ptr(res); + } + ptr->release_ref(); + } + } +}; diff --git a/listing_7.21.cpp b/listing_7.21.cpp new file mode 100644 index 0000000..c13bca1 --- /dev/null +++ b/listing_7.21.cpp @@ -0,0 +1,55 @@ +template +class lock_free_queue +{ +private: + void set_new_tail(counted_node_ptr &old_tail, + counted_node_ptr const &new_tail) + { + node* const current_tail_ptr=old_tail.ptr; + while(!tail.compare_exchange_weak(old_tail,new_tail) && + old_tail.ptr==current_tail_ptr); + if(old_tail.ptr==current_tail_ptr) + free_external_counter(old_tail); + else + current_tail_ptr->release_ref(); + } +public: + void push(T new_value) + { + std::unique_ptr new_data(new T(new_value)); + counted_node_ptr new_next; + new_next.ptr=new node; + new_next.external_count=1; + counted_node_ptr old_tail=tail.load(); + for(;;) + { + increase_external_count(tail,old_tail); + T* old_data=nullptr; + if(old_tail.ptr->data.compare_exchange_strong( + old_data,new_data.get())) + { + counted_node_ptr old_next={0}; + if(!old_tail.ptr->next.compare_exchange_strong( + old_next,new_next)) + { + delete new_next.ptr; + new_next=old_next; + } + set_new_tail(old_tail, new_next); + new_data.release(); + break; + } + else + { + counted_node_ptr old_next={0}; + if(old_tail.ptr->next.compare_exchange_strong( + old_next,new_next)) + { + old_next=new_next; + new_next.ptr=new node; + } + set_new_tail(old_tail, old_next); + } + } + } +}; diff --git a/listing_7.3.cpp b/listing_7.3.cpp new file mode 100644 index 0000000..7568466 --- /dev/null +++ b/listing_7.3.cpp @@ -0,0 +1,31 @@ +#include +#include + +template +class lock_free_stack +{ +private: + struct node + { + std::shared_ptr data; + node* next; + node(T const& data_): + data(std::make_shared(data_)) + {} + }; + std::atomic head; +public: + void push(T const& data) + { + node* const new_node=new node(data); + new_node->next=head.load(); + while(!head.compare_exchange_weak(new_node->next,new_node)); + } + std::shared_ptr pop() + { + node* old_head=head.load(); + while(old_head && + !head.compare_exchange_weak(old_head,old_head->next)); + return old_head ? old_head->data : std::shared_ptr(); + } +}; diff --git a/listing_7.4.cpp b/listing_7.4.cpp new file mode 100644 index 0000000..c5cf97c --- /dev/null +++ b/listing_7.4.cpp @@ -0,0 +1,25 @@ +#include +#include + +template +class lock_free_stack +{ +private: + std::atomic threads_in_pop; + void try_reclaim(node* old_head); +public: + std::shared_ptr pop() + { + ++threads_in_pop; + node* old_head=head.load(); + while(old_head && + !head.compare_exchange_weak(old_head,old_head->next)); + std::shared_ptr res; + if(old_head) + { + res.swap(old_head->data); + } + try_reclaim(old_head); + return res; + } +}; diff --git a/listing_7.5.cpp b/listing_7.5.cpp new file mode 100644 index 0000000..34d6ff6 --- /dev/null +++ b/listing_7.5.cpp @@ -0,0 +1,57 @@ +#include + +template +class lock_free_stack +{ +private: + std::atomic to_be_deleted; + static void delete_nodes(node* nodes) + { + while(nodes) + { + node* next=nodes->next; + delete nodes; + nodes=next; + } + } + void try_reclaim(node* old_head) + { + if(threads_in_pop==1) + { + node* nodes_to_delete=to_be_deleted.exchange(nullptr); + if(!--threads_in_pop) + { + delete_nodes(nodes_to_delete); + } + else if(nodes_to_delete) + { + chain_pending_nodes(nodes_to_delete); + } + delete old_head; + } + else + { + chain_pending_node(old_head); + --threads_in_pop; + } + } + void chain_pending_nodes(node* nodes) + { + node* last=nodes; + while(node* const next=last->next) + { + last=next; + } + chain_pending_nodes(nodes,last); + } + void chain_pending_nodes(node* first,node* last) + { + last->next=to_be_deleted; + while(!to_be_deleted.compare_exchange_weak( + last->next,first)); + } + void chain_pending_node(node* n) + { + chain_pending_nodes(n,n); + } +}; diff --git a/listing_7.6.cpp b/listing_7.6.cpp new file mode 100644 index 0000000..3a2606b --- /dev/null +++ b/listing_7.6.cpp @@ -0,0 +1,36 @@ +#include +#include + +std::shared_ptr pop() +{ + std::atomic& hp=get_hazard_pointer_for_current_thread(); + node* old_head=head.load(); + do + { + node* temp; + do + { + temp=old_head; + hp.store(old_head); + old_head=head.load(); + } while(old_head!=temp); + } + while(old_head && + !head.compare_exchange_strong(old_head,old_head->next)); + hp.store(nullptr); + std::shared_ptr res; + if(old_head) + { + res.swap(old_head->data); + if(outstanding_hazard_pointers_for(old_head)) + { + reclaim_later(old_head); + } + else + { + delete old_head; + } + delete_nodes_with_no_hazards(); + } + return res; +} diff --git a/listing_7.7.cpp b/listing_7.7.cpp new file mode 100644 index 0000000..70603c8 --- /dev/null +++ b/listing_7.7.cpp @@ -0,0 +1,49 @@ +#include +#include + +unsigned const max_hazard_pointers=100; +struct hazard_pointer +{ + std::atomic id; + std::atomic pointer; +}; +hazard_pointer hazard_pointers[max_hazard_pointers]; +class hp_owner +{ + hazard_pointer* hp; +public: + hp_owner(hp_owner const&)=delete; + hp_owner operator=(hp_owner const&)=delete; + hp_owner(): + hp(nullptr) + { + for(unsigned i=0;i& get_pointer() + { + return hp->pointer; + } + ~hp_owner() + { + hp->pointer.store(nullptr); + hp->id.store(std::thread::id()); + } +}; +std::atomic& get_hazard_pointer_for_current_thread() +{ + thread_local static hp_owner hazard; + return hazard.get_pointer(); +} diff --git a/listing_7.8.cpp b/listing_7.8.cpp new file mode 100644 index 0000000..f7ba86c --- /dev/null +++ b/listing_7.8.cpp @@ -0,0 +1,51 @@ +#include + +template +void do_delete(void* p) +{ + delete static_cast(p); +} +struct data_to_reclaim +{ + void* data; + std::function deleter; + data_to_reclaim* next; + template + data_to_reclaim(T* p): + data(p), + deleter(&do_delete), + next(0) + {} + ~data_to_reclaim() + { + deleter(data); + } +}; +std::atomic nodes_to_reclaim; +void add_to_reclaim_list(data_to_reclaim* node) +{ + node->next=nodes_to_reclaim.load(); + while(!nodes_to_reclaim.compare_exchange_weak(node->next,node)); +} +template +void reclaim_later(T* data) +{ + add_to_reclaim_list(new data_to_reclaim(data)); +} +void delete_nodes_with_no_hazards() +{ + data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr); + while(current) + { + data_to_reclaim* const next=current->next; + if(!outstanding_hazard_pointers_for(current->data)) + { + delete current; + } + else + { + add_to_reclaim_list(current); + } + current=next; + } +} diff --git a/listing_7.9.cpp b/listing_7.9.cpp new file mode 100644 index 0000000..d3c09b7 --- /dev/null +++ b/listing_7.9.cpp @@ -0,0 +1,32 @@ +#include +#include + +template +class lock_free_stack +{ +private: + struct node + { + std::shared_ptr data; + std::shared_ptr next; + node(T const& data_): + data(std::make_shared(data_)) + {} + }; + std::shared_ptr head; +public: + void push(T const& data) + { + std::shared_ptr const new_node=std::make_shared(data); + new_node->next=head.load(); + while(!std::atomic_compare_exchange_weak( + &head,&new_node->next,new_node)); + } + std::shared_ptr pop() + { + std::shared_ptr old_head=std::atomic_load(&head); + while(old_head && !std::atomic_compare_exchange_weak( + &head,&old_head,old_head->next)); + return old_head ? old_head->data : std::shared_ptr(); + } +}; diff --git a/listing_8.1.cpp b/listing_8.1.cpp new file mode 100644 index 0000000..e345302 --- /dev/null +++ b/listing_8.1.cpp @@ -0,0 +1,102 @@ +template +struct sorter +{ + struct chunk_to_sort + { + std::list data; + std::promise > promise; + }; + + thread_safe_stack chunks; + std::vector threads; + unsigned const max_thread_count; + std::atomic end_of_data; + + sorter(): + max_thread_count(std::thread::hardware_concurrency()-1), + end_of_data(false) + {} + + ~sorter() + { + end_of_data=true; + for(unsigned i=0;i chunk=chunks.pop(); + if(chunk) + { + sort_chunk(chunk); + } + } + + std::list do_sort(std::list& chunk_data) + { + if(chunk_data.empty()) + { + return chunk_data; + } + + std::list result; + result.splice(result.begin(),chunk_data,chunk_data.begin()); + T const& partition_val=*result.begin(); + + typename std::list::iterator divide_point= + std::partition(chunk_data.begin(),chunk_data.end(), + [&](T const& val){return val > new_lower= + new_lower_chunk.promise.get_future(); + chunks.push(std::move(new_lower_chunk)); + if(threads.size()::sort_thread,this)); + } + + std::list new_higher(do_sort(chunk_data)); + + result.splice(result.end(),new_higher); + while(new_lower.wait_for(std::chrono::seconds(0)) != + std::future_status::ready) + { + try_sort_chunk(); + } + + result.splice(result.begin(),new_lower.get()); + return result; + } + + void sort_chunk(boost::shared_ptr const& chunk) + { + chunk->promise.set_value(do_sort(chunk->data)); + } + + void sort_thread() + { + while(!end_of_data) + { + try_sort_chunk(); + std::this_thread::yield(); + } + } +}; + +template +std::list parallel_quick_sort(std::list input) +{ + if(input.empty()) + { + return input; + } + sorter s; + return s.do_sort(input); +} diff --git a/listing_8.10.cpp b/listing_8.10.cpp new file mode 100644 index 0000000..07241ea --- /dev/null +++ b/listing_8.10.cpp @@ -0,0 +1,45 @@ +template +Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match, + std::atomic& done) +{ + try + { + unsigned long const length=std::distance(first,last); + unsigned long const min_per_thread=25; + if(length<(2*min_per_thread)) + { + for(;(first!=last) && !done.load();++first) + { + if(*first==match) + { + done=true; + return first; + } + } + return last; + } + else + { + Iterator const mid_point=first+(length/2); + std::future async_result= + std::async(¶llel_find_impl, + mid_point,last,match,std::ref(done)); + Iterator const direct_result= + parallel_find_impl(first,mid_point,match,done); + return (direct_result==mid_point)? + async_result.get():direct_result; + } + } + catch(...) + { + done=true; + throw; + } +} + +template +Iterator parallel_find(Iterator first,Iterator last,MatchType match) +{ + std::atomic done(false); + return parallel_find_impl(first,last,match,done); +} diff --git a/listing_8.11.cpp b/listing_8.11.cpp new file mode 100644 index 0000000..8595714 --- /dev/null +++ b/listing_8.11.cpp @@ -0,0 +1,93 @@ +template +void parallel_partial_sum(Iterator first,Iterator last) +{ + typedef typename Iterator::value_type value_type; + struct process_chunk + { + void operator()(Iterator begin,Iterator last, + std::future* previous_end_value, + std::promise* end_value) + { + try + { + Iterator end=last; + ++end; + std::partial_sum(begin,end,begin); + if(previous_end_value) + { + value_type& addend=previous_end_value->get(); + *last+=addend; + if(end_value) + { + end_value->set_value(*last); + } + std::for_each(begin,last,[addend](value_type& item) + { + item+=addend; + }); + } + else if(end_value) + { + end_value->set_value(*last); + } + } + catch(...) + { + if(end_value) + { + end_value->set_exception(std::current_exception()); + } + else + { + throw; + } + } + } + }; + + unsigned long const length=std::distance(first,last); + + if(!length) + return last; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + typedef typename Iterator::value_type value_type; + + std::vector threads(num_threads-1); + std::vector > + end_values(num_threads-1); + std::vector > + previous_end_values; + previous_end_values.reserve(num_threads-1); + join_threads joiner(threads); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_last=block_start; + std::advance(block_last,block_size-1); + threads[i]=std::thread(process_chunk(), + block_start,block_last, + (i!=0)?&previous_end_values[i-1]:0, + &end_values[i]); + block_start=block_last; + ++block_start; + previous_end_values.push_back(end_values[i].get_future()); + } + Iterator final_element=block_start; + std::advance(final_element,std::distance(block_start,last)-1); + process_chunk()(block_start,final_element, + (num_threads>1)?&previous_end_values.back():0, + 0); +} diff --git a/listing_8.12.cpp b/listing_8.12.cpp new file mode 100644 index 0000000..45b2d00 --- /dev/null +++ b/listing_8.12.cpp @@ -0,0 +1,24 @@ +class barrier +{ + unsigned const count; + std::atomic spaces; + std::atomic generation; +public: + explicit barrier(unsigned count_): + count(count_),spaces(count),generation(0) + {} + void wait() + { + unsigned const my_generation=generation; + if(!--spaces) + { + spaces=count; + ++generation; + } + else + { + while(generation==my_generation) + std::this_thread::yield(); + } + } +}; diff --git a/listing_8.13.cpp b/listing_8.13.cpp new file mode 100644 index 0000000..7e6b366 --- /dev/null +++ b/listing_8.13.cpp @@ -0,0 +1,98 @@ +#include +#include +#include + +struct join_threads +{ + join_threads(std::vector&) + {} +}; + + +struct barrier +{ + std::atomic count; + std::atomic spaces; + std::atomic generation; + barrier(unsigned count_): + count(count_),spaces(count_),generation(0) + {} + void wait() + { + unsigned const gen=generation.load(); + if(!--spaces) + { + spaces=count.load(); + ++generation; + } + else + { + while(generation.load()==gen) + { + std::this_thread::yield(); + } + } + } + + void done_waiting() + { + --count; + if(!--spaces) + { + spaces=count.load(); + ++generation; + } + } +}; + +template +void parallel_partial_sum(Iterator first,Iterator last) +{ + typedef typename Iterator::value_type value_type; + + struct process_element + { + void operator()(Iterator first,Iterator last, + std::vector& buffer, + unsigned i,barrier& b) + { + value_type& ith_element=*(first+i); + bool update_source=false; + for(unsigned step=0,stride=1;stride<=i;++step,stride*=2) + { + value_type const& source=(step%2)? + buffer[i]:ith_element; + value_type& dest=(step%2)? + ith_element:buffer[i]; + value_type const& addend=(step%2)? + buffer[i-stride]:*(first+i-stride); + dest=source+addend; + update_source=!(step%2); + b.wait(); + } + if(update_source) + { + ith_element=buffer[i]; + } + b.done_waiting(); + } + }; + + unsigned long const length=std::distance(first,last); + + if(length<=1) + return; + + std::vector buffer(length); + barrier b(length); + std::vector threads(length-1); + join_threads joiner(threads); + + Iterator block_start=first; + for(unsigned long i=0;i<(length-1);++i) + { + threads[i]=std::thread(process_element(),first,last, + std::ref(buffer),i,std::ref(b)); + } + process_element()(first,last,buffer,length-1,b); +} diff --git a/listing_8.2.cpp b/listing_8.2.cpp new file mode 100644 index 0000000..559f1ad --- /dev/null +++ b/listing_8.2.cpp @@ -0,0 +1,49 @@ +template +struct accumulate_block +{ + void operator()(Iterator first,Iterator last,T& result) + { + result=std::accumulate(first,last,result); + } +}; + +template +T parallel_accumulate(Iterator first,Iterator last,T init) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return init; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + std::vector results(num_threads); + std::vector threads(num_threads-1); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + threads[i]=std::thread( + accumulate_block(), + block_start,block_end,std::ref(results[i])); + block_start=block_end; + } + accumulate_block()(block_start,last,results[num_threads-1]); + + std::for_each(threads.begin(),threads.end(), + std::mem_fn(&std::thread::join)); + + return std::accumulate(results.begin(),results.end(),init); +} diff --git a/listing_8.3.cpp b/listing_8.3.cpp new file mode 100644 index 0000000..30e0386 --- /dev/null +++ b/listing_8.3.cpp @@ -0,0 +1,56 @@ +template +struct accumulate_block +{ + T operator()(Iterator first,Iterator last) + { + return std::accumulate(first,last,T()); + } +}; + +template +T parallel_accumulate(Iterator first,Iterator last,T init) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return init; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + std::vector > futures(num_threads-1); + std::vector threads(num_threads-1); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + std::packaged_task task( + accumulate_block()); + futures[i]=task.get_future(); + threads[i]=std::thread(std::move(task),block_start,block_end); + block_start=block_end; + } + T last_result=accumulate_block()(block_start,last); + + std::for_each(threads.begin(),threads.end(), + std::mem_fn(&std::thread::join)); + + T result=init; + for(unsigned long i=0;i<(num_threads-1);++i) + { + result+=futures[i].get(); + } + result += last_result; + return result; +} diff --git a/listing_8.4.cpp b/listing_8.4.cpp new file mode 100644 index 0000000..852af26 --- /dev/null +++ b/listing_8.4.cpp @@ -0,0 +1,44 @@ +template +T parallel_accumulate(Iterator first,Iterator last,T init) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return init; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + std::vector > futures(num_threads-1); + std::vector threads(num_threads-1); + join_threads joiner(threads); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + std::packaged_task task( + accumulate_block()); + futures[i]=task.get_future(); + threads[i]=std::thread(std::move(task),block_start,block_end); + block_start=block_end; + } + T last_result=accumulate_block()(block_start,last); + T result=init; + for(unsigned long i=0;i<(num_threads-1);++i) + { + result+=futures[i].get(); + } + result += last_result; + return result; +} diff --git a/listing_8.5.cpp b/listing_8.5.cpp new file mode 100644 index 0000000..a5248a5 --- /dev/null +++ b/listing_8.5.cpp @@ -0,0 +1,20 @@ +template +T parallel_accumulate(Iterator first,Iterator last,T init) +{ + unsigned long const length=std::distance(first,last); + unsigned long const max_chunk_size=25; + if(length<=max_chunk_size) + { + return std::accumulate(first,last,init); + } + else + { + Iterator mid_point=first; + std::advance(mid_point,length/2); + std::future first_half_result= + std::async(parallel_accumulate, + first,mid_point,init); + T second_half_result=parallel_accumulate(mid_point,last,T()); + return first_half_result.get()+second_half_result; + } +} diff --git a/listing_8.6.cpp b/listing_8.6.cpp new file mode 100644 index 0000000..09751ef --- /dev/null +++ b/listing_8.6.cpp @@ -0,0 +1,50 @@ +std::thread task_thread; +std::atomic task_cancelled(false); + +void gui_thread() +{ + while(true) + { + event_data event=get_event(); + if(event.type==quit) + break; + process(event); + } +} + +void task() +{ + while(!task_complete() && !task_cancelled) + { + do_next_operation(); + } + if(task_cancelled) + { + perform_cleanup(); + } + else + { + post_gui_event(task_complete); + } +} + +void process(event_data const& event) +{ + switch(event.type) + { + case start_task: + task_cancelled=false; + task_thread=std::thread(task); + break; + case stop_task: + task_cancelled=true; + task_thread.join(); + break; + case task_complete: + task_thread.join(); + display_results(); + break; + default: + //... + } +} diff --git a/listing_8.7.cpp b/listing_8.7.cpp new file mode 100644 index 0000000..0a1a916 --- /dev/null +++ b/listing_8.7.cpp @@ -0,0 +1,44 @@ +template +void parallel_for_each(Iterator first,Iterator last,Func f) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + std::vector > futures(num_threads-1); + std::vector threads(num_threads-1); + join_threads joiner(threads); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + std::packaged_task task( + [=]() + { + std::for_each(block_start,block_end,f); + }); + futures[i]=task.get_future(); + threads[i]=std::thread(std::move(task)); + block_start=block_end; + } + std::for_each(block_start,last,f); + for(unsigned long i=0;i<(num_threads-1);++i) + { + futures[i].get(); + } +} diff --git a/listing_8.8.cpp b/listing_8.8.cpp new file mode 100644 index 0000000..f8ff7b0 --- /dev/null +++ b/listing_8.8.cpp @@ -0,0 +1,24 @@ +template +void parallel_for_each(Iterator first,Iterator last,Func f) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return; + + unsigned long const min_per_thread=25; + + if(length<(2*min_per_thread)) + { + std::for_each(first,last,f); + } + else + { + Iterator const mid_point=first+length/2; + std::future first_half= + std::async(¶llel_for_each, + first,mid_point,f); + parallel_for_each(mid_point,last,f); + first_half.get(); + } +} diff --git a/listing_8.9.cpp b/listing_8.9.cpp new file mode 100644 index 0000000..1750296 --- /dev/null +++ b/listing_8.9.cpp @@ -0,0 +1,76 @@ +template +Iterator parallel_find(Iterator first,Iterator last,MatchType match) +{ + struct find_element + { + void operator()(Iterator begin,Iterator end, + MatchType match, + std::promise* result, + std::atomic* done_flag) + { + try + { + for(;(begin!=end) && !done_flag->load();++begin) + { + if(*begin==match) + { + result->set_value(begin); + done_flag->store(true); + return; + } + } + } + catch(...) + { + try + { + result->set_exception(std::current_exception()); + done_flag->store(true); + } + catch(...) + {} + } + } + }; + + unsigned long const length=std::distance(first,last); + + if(!length) + return last; + + unsigned long const min_per_thread=25; + unsigned long const max_threads= + (length+min_per_thread-1)/min_per_thread; + + unsigned long const hardware_threads= + std::thread::hardware_concurrency(); + + unsigned long const num_threads= + std::min(hardware_threads!=0?hardware_threads:2,max_threads); + + unsigned long const block_size=length/num_threads; + + std::promise result; + std::atomic done_flag(false); + std::vector threads(num_threads-1); + { + join_threads joiner(threads); + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + threads[i]=std::thread(find_element(), + block_start,block_end,match, + &result,&done_flag); + block_start=block_end; + } + find_element()(block_start,last,match,&result,&done_flag); + } + if(!done_flag.load()) + { + return last; + } + return result.get_future().get(); +} diff --git a/listing_9.1.cpp b/listing_9.1.cpp new file mode 100644 index 0000000..dd3ca84 --- /dev/null +++ b/listing_9.1.cpp @@ -0,0 +1,53 @@ +class thread_pool +{ + std::atomic_bool done; + thread_safe_queue > work_queue; + std::vector threads; + join_threads joiner; + + void worker_thread() + { + while(!done) + { + std::function task; + if(work_queue.try_pop(task)) + { + task(); + } + else + { + std::this_thread::yield(); + } + } + } +public: + thread_pool(): + done(false),joiner(threads) + { + unsigned const thread_count=std::thread::hardware_concurrency(); + try + { + for(unsigned i=0;i + void submit(FunctionType f) + { + work_queue.push(std::function(f)); + } +}; diff --git a/listing_9.10.cpp b/listing_9.10.cpp new file mode 100644 index 0000000..f9641ad --- /dev/null +++ b/listing_9.10.cpp @@ -0,0 +1,9 @@ +void interruptible_wait(std::condition_variable& cv, + std::unique_lock& lk) +{ + interruption_point(); + this_thread_interrupt_flag.set_condition_variable(cv); + cv.wait(lk); + this_thread_interrupt_flag.clear_condition_variable(); + interruption_point(); +} diff --git a/listing_9.11.cpp b/listing_9.11.cpp new file mode 100644 index 0000000..ea05a8a --- /dev/null +++ b/listing_9.11.cpp @@ -0,0 +1,57 @@ +class interrupt_flag +{ + std::atomic flag; + std::condition_variable* thread_cond; + std::mutex set_clear_mutex; + +public: + interrupt_flag(): + thread_cond(0) + {} + + void set() + { + flag.store(true,std::memory_order_relaxed); + std::lock_guard lk(set_clear_mutex); + if(thread_cond) + { + thread_cond->notify_all(); + } + } + + bool is_set() const + { + return flag.load(std::memory_order_relaxed); + } + + void set_condition_variable(std::condition_variable& cv) + { + std::lock_guard lk(set_clear_mutex); + thread_cond=&cv; + } + + void clear_condition_variable() + { + std::lock_guard lk(set_clear_mutex); + thread_cond=0; + } + + struct clear_cv_on_destruct + { + ~clear_cv_on_destruct() + { + this_thread_interrupt_flag.clear_condition_variable(); + } + }; +}; + +void interruptible_wait(std::condition_variable& cv, + std::unique_lock& lk) +{ + interruption_point(); + this_thread_interrupt_flag.set_condition_variable(cv); + interrupt_flag::clear_cv_on_destruct guard; + interruption_point(); + cv.wait_for(lk,std::chrono::milliseconds(1)); + interruption_point(); +} diff --git a/listing_9.12.cpp b/listing_9.12.cpp new file mode 100644 index 0000000..29101ce --- /dev/null +++ b/listing_9.12.cpp @@ -0,0 +1,77 @@ +class interrupt_flag +{ + std::atomic flag; + std::condition_variable* thread_cond; + std::condition_variable_any* thread_cond_any; + std::mutex set_clear_mutex; + +public: + interrupt_flag(): + Interrupting threads + thread_cond(0),thread_cond_any(0) + {} + + void set() + { + flag.store(true,std::memory_order_relaxed); + std::lock_guard lk(set_clear_mutex); + if(thread_cond) + { + thread_cond->notify_all(); + } + else if(thread_cond_any) + { + thread_cond_any->notify_all(); + } + } + + template + void wait(std::condition_variable_any& cv,Lockable& lk) + { + struct custom_lock + { + interrupt_flag* self; + Lockable& lk; + + custom_lock(interrupt_flag* self_, + std::condition_variable_any& cond, + Lockable& lk_): + self(self_),lk(lk_) + { + self->set_clear_mutex.lock(); + self->thread_cond_any=&cond; + } + + void unlock() + { + lk.unlock(); + self->set_clear_mutex.unlock(); + } + + void lock() + { + std::lock(self->set_clear_mutex,lk); + } + + ~custom_lock() + { + self->thread_cond_any=0; + self->set_clear_mutex.unlock(); + } + }; + + custom_lock cl(this,cv,lk); + interruption_point(); + cv.wait(cl); + interruption_point(); + } + + // rest as before +}; + +template +void interruptible_wait(std::condition_variable_any& cv, + Lockable& lk) +{ + this_thread_interrupt_flag.wait(cv,lk); +} diff --git a/listing_9.13.cpp b/listing_9.13.cpp new file mode 100644 index 0000000..5ec1da0 --- /dev/null +++ b/listing_9.13.cpp @@ -0,0 +1,38 @@ +std::mutex config_mutex; +std::vector background_threads; + +void background_thread(int disk_id) +{ + while(true) + { + interruption_point(); + fs_change fsc=get_fs_changes(disk_id); + if(fsc.has_changes()) + { + update_index(fsc); + } + } +} + +void start_background_processing() +{ + background_threads.push_back( + interruptible_thread(background_thread,disk_1)); + background_threads.push_back( + interruptible_thread(background_thread,disk_2)); +} + +int main() +{ + start_background_processing(); + process_gui_until_exit(); + std::unique_lock lk(config_mutex); + for(unsigned i=0;i +#include +#include +#include +#include +#include + +class function_wrapper +{ + struct impl_base { + virtual void call()=0; + virtual ~impl_base() {} + }; + std::unique_ptr impl; + template + struct impl_type: impl_base + { + F f; + impl_type(F&& f_): f(std::move(f_)) {} + void call() { f(); } + }; +public: + template + function_wrapper(F&& f): + impl(new impl_type(std::move(f))) + {} + + void call() { impl->call(); } + + function_wrapper(function_wrapper&& other): + impl(std::move(other.impl)) + {} + + function_wrapper& operator=(function_wrapper&& other) + { + impl=std::move(other.impl); + return *this; + } + + function_wrapper(const function_wrapper&)=delete; + function_wrapper(function_wrapper&)=delete; + function_wrapper& operator=(const function_wrapper&)=delete; +}; + +class thread_pool +{ +public: + std::deque work_queue; + + template + std::future::type> + submit(FunctionType f) + { + typedef typename std::result_of::type result_type; + + std::packaged_task task(std::move(f)); + std::future res(task.get_future()); + work_queue.push_back(std::move(task)); + return res; + } + // rest as before +}; diff --git a/listing_9.3.cpp b/listing_9.3.cpp new file mode 100644 index 0000000..9a16073 --- /dev/null +++ b/listing_9.3.cpp @@ -0,0 +1,31 @@ +template +T parallel_accumulate(Iterator first,Iterator last,T init) +{ + unsigned long const length=std::distance(first,last); + + if(!length) + return init; + + unsigned long const block_size=25; + unsigned long const num_blocks=(length+block_size-1)/block_size; + + std::vector > futures(num_blocks-1); + thread_pool pool; + + Iterator block_start=first; + for(unsigned long i=0;i<(num_threads-1);++i) + { + Iterator block_end=block_start; + std::advance(block_end,block_size); + futures[i]=pool.submit(accumulate_block()); + block_start=block_end; + } + T last_result=accumulate_block()(block_start,last); + T result=init; + for(unsigned long i=0;i<(num_blocks-1);++i) + { + result+=futures[i].get(); + } + result += last_result; + return result; +} diff --git a/listing_9.4.cpp b/listing_9.4.cpp new file mode 100644 index 0000000..9d1beea --- /dev/null +++ b/listing_9.4.cpp @@ -0,0 +1,12 @@ +void thread_pool::run_pending_task() +{ + function_wrapper task; + if(work_queue.try_pop(task)) + { + task(); + } + else + { + std::this_thread::yield(); + } +} diff --git a/listing_9.5.cpp b/listing_9.5.cpp new file mode 100644 index 0000000..83b83b4 --- /dev/null +++ b/listing_9.5.cpp @@ -0,0 +1,58 @@ +template +struct sorter +{ + thread_pool pool; + + std::list do_sort(std::list& chunk_data) + { + if(chunk_data.empty()) + { + return chunk_data; + } + + std::list result; + result.splice(result.begin(),chunk_data,chunk_data.begin()); + T const& partition_val=*result.begin(); + + typename std::list::iterator divide_point= + std::partition( + chunk_data.begin(),chunk_data.end(), + [&](T const& val){return val new_lower_chunk; + new_lower_chunk.splice( + new_lower_chunk.end(), + chunk_data,chunk_data.begin(), + divide_point); + + thread_pool::task_handle > new_lower= + pool.submit( + std::bind( + &sorter::do_sort,this, + std::move(new_lower_chunk))); + + std::list new_higher(do_sort(chunk_data)); + + result.splice(result.end(),new_higher); + while(!new_lower.is_ready()) + { + pool.run_pending_task(); + } + + result.splice(result.begin(),new_lower.get()); + return result; + } +}; + + +template +std::list parallel_quick_sort(std::list input) +{ + if(input.empty()) + { + return input; + } + sorter s; + + return s.do_sort(input); +} diff --git a/listing_9.6.cpp b/listing_9.6.cpp new file mode 100644 index 0000000..840382d --- /dev/null +++ b/listing_9.6.cpp @@ -0,0 +1,58 @@ +class thread_pool +{ + thread_safe_queue pool_work_queue; + + typedef std::queue local_queue_type; + static thread_local std::unique_ptr + local_work_queue; + + void worker_thread() + { + local_work_queue.reset(new local_queue_type); + + while(!done) + { + run_pending_task(); + } + } + +public: + template + std::future::type> + submit(FunctionType f) + { + typedef std::result_of::type result_type; + + std::packaged_task task(f); + std::future res(task.get_future()); + if(local_work_queue) + { + local_work_queue->push(std::move(task)); + } + else + { + pool_work_queue.push(std::move(task)); + } + return res; + } + + void run_pending_task() + { + function_wrapper task; + if(local_work_queue && !local_work_queue->empty()) + { + task=std::move(local_work_queue->front()); + local_work_queue->pop(); + task(); + } + else if(pool_work_queue.try_pop(task)) + { + task(); + } + else + { + std::this_thread::yield(); + } + } + // rest as before +}; diff --git a/listing_9.7.cpp b/listing_9.7.cpp new file mode 100644 index 0000000..2ebdb29 --- /dev/null +++ b/listing_9.7.cpp @@ -0,0 +1,53 @@ +class work_stealing_queue +{ +private: + typedef function_wrapper data_type; + std::deque the_queue; + mutable std::mutex the_mutex; + +public: + work_stealing_queue() + {} + + work_stealing_queue(const work_stealing_queue& other)=delete; + work_stealing_queue& operator=( + const work_stealing_queue& other)=delete; + + void push(data_type data) + { + std::lock_guard lock(the_mutex); + the_queue.push_front(std::move(data)); + } + + bool empty() const + { + std::lock_guard lock(the_mutex); + return the_queue.empty(); + } + + bool try_pop(data_type& res) + { + std::lock_guard lock(the_mutex); + if(the_queue.empty()) + { + return false; + } + + res=std::move(the_queue.front()); + the_queue.pop_front(); + return true; + } + + bool try_steal(data_type& res) + { + std::lock_guard lock(the_mutex); + if(the_queue.empty()) + { + return false; + } + + res=std::move(the_queue.back()); + the_queue.pop_back(); + return true; + } +}; diff --git a/listing_9.8.cpp b/listing_9.8.cpp new file mode 100644 index 0000000..f2bbf09 --- /dev/null +++ b/listing_9.8.cpp @@ -0,0 +1,112 @@ +class thread_pool +{ + typedef function_wrapper task_type; + + std::atomic_bool done; + thread_safe_queue pool_work_queue; + std::vector > queues; + std::vector threads; + join_threads joiner; + + static thread_local work_stealing_queue* local_work_queue; + static thread_local unsigned my_index; + + void worker_thread(unsigned my_index_) + { + my_index=my_index_; + local_work_queue=queues[my_index].get(); + while(!done) + { + run_pending_task(); + } + } + + bool pop_task_from_local_queue(task_type& task) + { + return local_work_queue && local_work_queue->try_pop(task); + } + + bool pop_task_from_pool_queue(task_type& task) + { + return pool_work_queue.try_pop(task); + } + + bool pop_task_from_other_thread_queue(task_type& task) + { + for(unsigned i=0;itry_steal(task)) + { + return true; + } + } + + return false; + } + +public: + thread_pool(): + joiner(threads),done(false) + { + unsigned const thread_count=std::thread::hardware_concurrency(); + + try + { + for(unsigned i=0;i( + new work_stealing_queue)); + threads.push_back( + std::thread(&thread_pool::worker_thread,this,i)); + } + } + catch(...) + { + done=true; + throw; + } + } + + ~thread_pool() + { + done=true; + } + + template + using task_handle=std::unique_future; + + template + task_handle::type> submit( + FunctionType f) + { + typedef std::result_of::type result_type; + + std::packaged_task task(f); + task_handle res(task.get_future()); + if(local_work_queue) + { + local_work_queue->push(std::move(task)); + } + else + { + pool_work_queue.push(std::move(task)); + } + return res; + } + + void run_pending_task() + { + task_type task; + if(pop_task_from_local_queue(task) || + pop_task_from_pool_queue(task) || + pop_task_from_other_thread_queue(task)) + { + task(); + } + else + { + std::this_thread::yield(); + } + } +}; diff --git a/listing_9.9.cpp b/listing_9.9.cpp new file mode 100644 index 0000000..ffd7835 --- /dev/null +++ b/listing_9.9.cpp @@ -0,0 +1,31 @@ +class interrupt_flag +{ +public: + void set(); + bool is_set() const; +}; +thread_local interrupt_flag this_thread_interrupt_flag; + +class interruptible_thread +{ + std::thread internal_thread; + interrupt_flag* flag; +public: + template + interruptible_thread(FunctionType f) + { + std::promise p; + internal_thread=std::thread([f,&p]{ + p.set_value(&this_thread_interrupt_flag); + f(); + }); + flag=p.get_future().get(); + } + void interrupt() + { + if(flag) + { + flag->set(); + } + } +}; diff --git a/listing_a.1.cpp b/listing_a.1.cpp new file mode 100644 index 0000000..5a5956e --- /dev/null +++ b/listing_a.1.cpp @@ -0,0 +1,23 @@ +class X +{ +private: + int* data; +public: + X(): + data(new int[1000000]) + {} + ~X() + { + delete [] data; + } + X(const X& other): + data(new int[1000000]) + { + std::copy(other.data,other.data+1000000,data); + } + X(X&& other): + data(other.data) + { + other.data=nullptr; + } +}; diff --git a/listing_a.2.cpp b/listing_a.2.cpp new file mode 100644 index 0000000..647b826 --- /dev/null +++ b/listing_a.2.cpp @@ -0,0 +1,18 @@ +class move_only +{ + std::unique_ptr data; +public: + move_only(const move_only&) = delete; + move_only(move_only&& other): + data(std::move(other.data)) + {} + move_only& operator=(const move_only&) = delete; + move_only& operator=(move_only&& other) + { + data=std::move(other.data); + return *this; + } +}; +move_only m1; +move_only m2(m1); +move_only m3(std::move(m1)); diff --git a/listing_a.3.cpp b/listing_a.3.cpp new file mode 100644 index 0000000..fd85c29 --- /dev/null +++ b/listing_a.3.cpp @@ -0,0 +1,23 @@ +class CX +{ +private: + int a; + int b; +public: + CX() = default; + CX(int a_, int b_): + a(a_),b(b_) + {} + int get_a() const + { + return a; + } + int get_b() const + { + return b; + } + int foo() const + { + return a+b; + } +}; diff --git a/listing_a.4.cpp b/listing_a.4.cpp new file mode 100644 index 0000000..24a43f9 --- /dev/null +++ b/listing_a.4.cpp @@ -0,0 +1,8 @@ +std::condition_variable cond; +bool data_ready; +std::mutex m; +void wait_for_data() +{ + std::unique_lock lk(m); + cond.wait(lk,[]{return data_ready;}); +} diff --git a/listing_c.1.cpp b/listing_c.1.cpp new file mode 100644 index 0000000..bb66f44 --- /dev/null +++ b/listing_c.1.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include + + + + +namespace messaging +{ + struct message_base + { + virtual ~message_base() + {} + }; + + template + struct wrapped_message: + message_base + { + Msg contents; + explicit wrapped_message(Msg const& contents_): + contents(contents_) + {} + }; + + class queue + { + std::mutex m; + std::condition_variable c; + message_base + std::queue > q; + public: + template + void push(T const& msg) + { + std::lock_guard lk(m); + q.push(std::make_shared >(msg)); + c.notify_all(); + } + std::shared_ptr wait_and_pop() + { + std::unique_lock lk(m); + c.wait(lk,[&]{return !q.empty();}); + auto res=q.front(); + q.pop(); + return res; + } + }; +} diff --git a/listing_c.10.cpp b/listing_c.10.cpp new file mode 100644 index 0000000..3a3af51 --- /dev/null +++ b/listing_c.10.cpp @@ -0,0 +1,51 @@ +int main() +{ + bank_machine bank; + interface_machine interface_hardware; + atm machine(bank.get_sender(),interface_hardware.get_sender()); + std::thread bank_thread(&bank_machine::run,&bank); + std::thread if_thread(&interface_machine::run,&interface_hardware); + std::thread atm_thread(&atm::run,&machine); + messaging::sender atmqueue(machine.get_sender()); + bool quit_pressed=false; + while(!quit_pressed) + { + char c=getchar(); + switch(c) + { + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + atmqueue.send(digit_pressed(c)); + break; + case 'b': + atmqueue.send(balance_pressed()); + break; + case 'w': + atmqueue.send(withdraw_pressed(50)); + break; + case 'c': + atmqueue.send(cancel_pressed()); + break; + case 'q': + quit_pressed=true; + break; + case 'i': + atmqueue.send(card_inserted("acc1234")); + break; + } + } + bank.done(); + machine.done(); + interface_hardware.done(); + atm_thread.join(); + bank_thread.join(); + if_thread.join(); +} diff --git a/listing_c.2.cpp b/listing_c.2.cpp new file mode 100644 index 0000000..822b509 --- /dev/null +++ b/listing_c.2.cpp @@ -0,0 +1,22 @@ +namespace messaging +{ + class sender + { + queue*q; + public: + sender(): + q(nullptr) + {} + explicit sender(queue*q_): + q(q_) + {} + template + void send(Message const& msg) + { + if(q) + { + q->push(msg); + } + } + }; +} diff --git a/listing_c.3.cpp b/listing_c.3.cpp new file mode 100644 index 0000000..6396f50 --- /dev/null +++ b/listing_c.3.cpp @@ -0,0 +1,16 @@ +namespace messaging +{ + class receiver + { + queue q; + public: + operator sender() + { + return sender(&q); + } + dispatcher wait() + { + return dispatcher(&q); + } + }; +} diff --git a/listing_c.4.cpp b/listing_c.4.cpp new file mode 100644 index 0000000..2ca84a1 --- /dev/null +++ b/listing_c.4.cpp @@ -0,0 +1,65 @@ +namespace messaging +{ + class close_queue + {}; + + class dispatcher + { + queue* q; + bool chained; + + dispatcher(dispatcher const&)=delete; + dispatcher& operator=(dispatcher const&)=delete; + + template< + typename Dispatcher, + typename Msg, + typename Func> + friend class TemplateDispatcher; + + void wait_and_dispatch() + { + for(;;) + { + auto msg=q->wait_and_pop(); + dispatch(msg); + } + } + + bool dispatch( + std::shared_ptr const& msg) + { + if(dynamic_cast*>(msg.get())) + { + throw close_queue(); + } + return false; + } + public: + dispatcher(dispatcher&& other): + q(other.q),chained(other.chained) + { + other.chained=true; + } + + explicit dispatcher(queue* q_): + q(q_),chained(false) + {} + + template + TemplateDispatcher + handle(Func&& f) + { + return TemplateDispatcher( + q,this,std::forward(f)); + } + + ~dispatcher() noexcept(false) + { + if(!chained) + { + wait_and_dispatch(); + } + } + }; +} diff --git a/listing_c.5.cpp b/listing_c.5.cpp new file mode 100644 index 0000000..41fa481 --- /dev/null +++ b/listing_c.5.cpp @@ -0,0 +1,71 @@ +namespace messaging +{ + template + class TemplateDispatcher + { + queue* q; + PreviousDispatcher* prev; + Func f; + bool chained; + + TemplateDispatcher(TemplateDispatcher const&)=delete; + TemplateDispatcher& operator=(TemplateDispatcher const&)=delete; + + template + friend class TemplateDispatcher; + + void wait_and_dispatch() + { + for(;;) + { + auto msg=q->wait_and_pop(); + if(dispatch(msg)) + break; + } + } + + bool dispatch(std::shared_ptr const& msg) + { + if(wrapped_message* wrapper= + dynamic_cast*>(msg.get())) + { + f(wrapper->contents); + return true; + } + else + { + return prev->dispatch(msg); + } + } + public: + TemplateDispatcher(TemplateDispatcher&& other): + q(other.q),prev(other.prev),f(std::move(other.f)), + chained(other.chained) + { + other.chained=true; + } + + TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_): + q(q_),prev(prev_),f(std::forward(f_)),chained(false) + { + prev_->chained=true; + } + + template + TemplateDispatcher + handle(OtherFunc&& of) + { + return TemplateDispatcher< + TemplateDispatcher,OtherMsg,OtherFunc>( + q,this,std::forward(of)); + } + + ~TemplateDispatcher() noexcept(false) + { + if(!chained) + { + wait_and_dispatch(); + } + } + }; +} diff --git a/listing_c.6.cpp b/listing_c.6.cpp new file mode 100644 index 0000000..14d05fa --- /dev/null +++ b/listing_c.6.cpp @@ -0,0 +1,118 @@ +struct withdraw +{ + std::string account; + unsigned amount; + mutable messaging::sender atm_queue; + withdraw(std::string const& account_, + unsigned amount_, + messaging::sender atm_queue_): + account(account_),amount(amount_), + atm_queue(atm_queue_) + {} +}; +struct withdraw_ok +{}; +struct withdraw_denied +{}; +struct cancel_withdrawal +{ + std::string account; + unsigned amount; + cancel_withdrawal(std::string const& account_, + unsigned amount_): + account(account_),amount(amount_) + {} +}; +struct withdrawal_processed +{ + std::string account; + unsigned amount; + withdrawal_processed(std::string const& account_, + unsigned amount_): + account(account_),amount(amount_) + {} +}; +struct card_inserted +{ + std::string account; + explicit card_inserted(std::string const& account_): + account(account_) + {} +}; +struct digit_pressed +{ + char digit; + explicit digit_pressed(char digit_): + digit(digit_) + {} +}; +struct clear_last_pressed +{}; +struct eject_card +{}; +struct withdraw_pressed +{ + unsigned amount; + explicit withdraw_pressed(unsigned amount_): + amount(amount_) + {} +}; +struct cancel_pressed +{}; +struct issue_money +{ + unsigned amount; + issue_money(unsigned amount_): + amount(amount_) + {} +}; +struct verify_pin +{ + std::string account; + std::string pin; + mutable messaging::sender atm_queue; + verify_pin(std::string const& account_,std::string const& pin_, + messaging::sender atm_queue_): + account(account_),pin(pin_),atm_queue(atm_queue_) + {} +}; +struct pin_verified +{}; +struct pin_incorrect +{}; +struct display_enter_pin +{}; +struct display_enter_card +{}; +struct display_insufficient_funds +{}; +struct display_withdrawal_cancelled +{}; +struct display_pin_incorrect_message +{}; +struct display_withdrawal_options +{}; +struct get_balance +{ + std::string account; + mutable messaging::sender atm_queue; + get_balance(std::string const& account_,messaging::sender atm_queue_): + account(account_),atm_queue(atm_queue_) + {} +}; +struct balance +{ + unsigned amount; + explicit balance(unsigned amount_): + amount(amount_) + {} +}; +struct display_balance +{ + unsigned amount; + explicit display_balance(unsigned amount_): + amount(amount_) + {} +}; +struct balance_pressed +{}; diff --git a/listing_c.7.cpp b/listing_c.7.cpp new file mode 100644 index 0000000..6ce0508 --- /dev/null +++ b/listing_c.7.cpp @@ -0,0 +1,187 @@ +class atm +{ + messaging::receiver incoming; + messaging::sender bank; + messaging::sender interface_hardware; + void (atm::*state)(); + std::string account; + unsigned withdrawal_amount; + std::string pin; + void process_withdrawal() + { + incoming.wait() + .handle( + [&](withdraw_ok const& msg) + { + interface_hardware.send( + issue_money(withdrawal_amount)); + bank.send( + withdrawal_processed(account,withdrawal_amount)); + state=&atm::done_processing; + } + ) + .handle( + [&](withdraw_denied const& msg) + { + interface_hardware.send(display_insufficient_funds()); + state=&atm::done_processing; + } + ) + .handle( + [&](cancel_pressed const& msg) + { + bank.send( + cancel_withdrawal(account,withdrawal_amount)); + interface_hardware.send( + display_withdrawal_cancelled()); + state=&atm::done_processing; + } + ); + } + void process_balance() + { + incoming.wait() + .handle( + [&](balance const& msg) + { + interface_hardware.send(display_balance(msg.amount)); + state=&atm::wait_for_action; + } + ) + .handle( + [&](cancel_pressed const& msg) + { + state=&atm::done_processing; + } + ); + } + void wait_for_action() + { + interface_hardware.send(display_withdrawal_options()); + incoming.wait() + .handle( + [&](withdraw_pressed const& msg) + { + withdrawal_amount=msg.amount; + bank.send(withdraw(account,msg.amount,incoming)); + state=&atm::process_withdrawal; + } + ) + .handle( + [&](balance_pressed const& msg) + { + bank.send(get_balance(account,incoming)); + state=&atm::process_balance; + } + ) + .handle( + [&](cancel_pressed const& msg) + { + state=&atm::done_processing; + } + ); + } + void verifying_pin() + { + incoming.wait() + .handle( + [&](pin_verified const& msg) + { + state=&atm::wait_for_action; + } + ) + .handle( + [&](pin_incorrect const& msg) + { + interface_hardware.send( + display_pin_incorrect_message()); + state=&atm::done_processing; + } + ) + .handle( + [&](cancel_pressed const& msg) + { + state=&atm::done_processing; + } + ); + } + void getting_pin() + { + incoming.wait() + .handle( + [&](digit_pressed const& msg) + { + unsigned const pin_length=4; + pin+=msg.digit; + if(pin.length()==pin_length) + { + bank.send(verify_pin(account,pin,incoming)); + state=&atm::verifying_pin; + } + } + ) + .handle( + [&](clear_last_pressed const& msg) + { + if(!pin.empty()) + { + pin.pop_back(); + } + } + ) + .handle( + [&](cancel_pressed const& msg) + { + state=&atm::done_processing; + } + ); + } + void waiting_for_card() + { + interface_hardware.send(display_enter_card()); + incoming.wait() + .handle( + [&](card_inserted const& msg) + { + account=msg.account; + pin=""; + interface_hardware.send(display_enter_pin()); + state=&atm::getting_pin; + } + ); + } + void done_processing() + { + interface_hardware.send(eject_card()); + state=&atm::waiting_for_card; + } + atm(atm const&)=delete; + atm& operator=(atm const&)=delete; +public: + atm(messaging::sender bank_, + messaging::sender interface_hardware_): + bank(bank_),interface_hardware(interface_hardware_) + {} + void done() + { + get_sender().send(messaging::close_queue()); + } + void run() + { + state=&atm::waiting_for_card; + try + { + for(;;) + { + (this->*state)(); + } + } + catch(messaging::close_queue const&) + { + } + } + messaging::sender get_sender() + { + return incoming; + } +}; diff --git a/listing_c.8.cpp b/listing_c.8.cpp new file mode 100644 index 0000000..a8e5566 --- /dev/null +++ b/listing_c.8.cpp @@ -0,0 +1,73 @@ +class bank_machine +{ + messaging::receiver incoming; + unsigned balance; +public: + bank_machine(): + balance(199) + {} + void done() + { + get_sender().send(messaging::close_queue()); + } + void run() + { + try + { + for(;;) + { + incoming.wait() + .handle( + [&](verify_pin const& msg) + { + if(msg.pin=="1937") + { + msg.atm_queue.send(pin_verified()); + } + else + { + msg.atm_queue.send(pin_incorrect()); + } + } + ) + .handle( + [&](withdraw const& msg) + { + if(balance>=msg.amount) + { + msg.atm_queue.send(withdraw_ok()); + balance-=msg.amount; + } + else + { + msg.atm_queue.send(withdraw_denied()); + } + } + ) + .handle( + [&](get_balance const& msg) + { + msg.atm_queue.send(::balance(balance)); + } + ) + .handle( + [&](withdrawal_processed const& msg) + { + } + ) + .handle( + [&](cancel_withdrawal const& msg) + { + } + ); + } + } + catch(messaging::close_queue const&) + { + } + } + messaging::sender get_sender() + { + return incoming; + } +}; diff --git a/listing_c.9.cpp b/listing_c.9.cpp new file mode 100644 index 0000000..3d1a24a --- /dev/null +++ b/listing_c.9.cpp @@ -0,0 +1,117 @@ +class interface_machine +{ + messaging::receiver incoming; +public: + void done() + { + get_sender().send(messaging::close_queue()); + } + void run() + { + try + { + for(;;) + { + incoming.wait() + .handle( + [&](issue_money const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"Issuing " + <( + [&](display_insufficient_funds const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"Insufficient funds"<( + [&](display_enter_pin const& msg) + { + { + std::lock_guard lk(iom); + std::cout + <<"Please enter your PIN (0-9)" + <( + [&](display_enter_card const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"Please enter your card (I)" + <( + [&](display_balance const& msg) + { + { + std::lock_guard lk(iom); + std::cout + <<"The balance of your account is " + <( + [&](display_withdrawal_options const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"Withdraw 50? (w)"<( + [&](display_withdrawal_cancelled const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"Withdrawal cancelled" + <( + [&](display_pin_incorrect_message const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"PIN incorrect"<( + [&](eject_card const& msg) + { + { + std::lock_guard lk(iom); + std::cout<<"Ejecting card"<