Skip to content

Commit

Permalink
Improve Flow Graph conformance tests (#410)
Browse files Browse the repository at this point in the history
* continue and function node

* multifunction node

* input node

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* buffering nodes

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* new input and output messages

* service nodes

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* green CI

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* graph reset tests

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* additional reset tests

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* fix codestyle issues

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* copy constructor fix

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* sfinae fix

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* fix unique_ptr on cxx11

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* style fixes

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* copy_constructor fix

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* fix atomic initialization

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>

* fid data race

Signed-off-by: Mishin, Ilya <ilya.mishin@intel.com>
  • Loading branch information
Iliamish authored Aug 12, 2021
1 parent b44f164 commit de0109b
Show file tree
Hide file tree
Showing 24 changed files with 2,455 additions and 2,458 deletions.
203 changes: 83 additions & 120 deletions test/conformance/conformance_async_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,158 +18,121 @@
#pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
#endif

#include "common/test.h"

#include "common/utils.h"
#include "common/graph_utils.h"

#include "oneapi/tbb/flow_graph.h"
#include "oneapi/tbb/task_arena.h"
#include "oneapi/tbb/global_control.h"

#include "conformance_flowgraph.h"

//! \file conformance_async_node.cpp
//! \brief Test for [flow_graph.async_node] specification

/*
TODO: implement missing conformance tests for async_node:
- [ ] Write `test_forwarding()'.
- [ ] Improve test of the node's copy-constructor.
- [ ] Write `test_priority'.
- [ ] Rename `test_discarding' to `test_buffering'.
- [ ] Write inheritance test.
- [ ] Constructor with explicitly passed Policy parameter.
- [ ] Concurrency testing of the node: make a loop over possible concurrency levels. It is
important to test at least on five values: 1, oneapi::tbb::flow::serial, `max_allowed_parallelism'
obtained from `oneapi::tbb::global_control', `oneapi::tbb::flow::unlimited', and, if `max allowed
parallelism' is > 2, use something in the middle of the [1, max_allowed_parallelism]
interval. Use `utils::ExactConcurrencyLevel' entity (extending it if necessary).
- [ ] Write `test_rejecting', where avoid dependency on OS scheduling of the threads; add check
that `try_put()' returns `false'
- [ ] The `copy_body' function copies altered body (e.g. after successful `try_put()' call).
- [ ] The copy constructor and copy assignment are called for the node's input and output types.
- [ ] Add CTAD test.
*/
using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/false>;
using output_msg = conformance::message</*default_ctor*/false, /*copy_ctor*/false, /*copy_assign*/false>;

template<typename I, typename O>
void test_inheritance(){
//! Test async_node constructors
//! \brief \ref requirement
TEST_CASE("async_node constructors"){
using namespace oneapi::tbb::flow;
graph g;

CHECK_MESSAGE( (std::is_base_of<graph_node, async_node<I, O>>::value), "async_node should be derived from graph_node");
CHECK_MESSAGE( (std::is_base_of<receiver<I>, async_node<I, O>>::value), "async_node should be derived from receiver<Input>");
CHECK_MESSAGE( (std::is_base_of<sender<O>, async_node<I, O>>::value), "async_node should be derived from sender<Output>");
}
conformance::dummy_functor<int> fun;

template< typename OutputType >
struct as_inc_functor {
std::thread my_thread;
async_node<int, int> fn1(g, unlimited, fun);
async_node<int, int> fn2(g, unlimited, fun, oneapi::tbb::flow::node_priority_t(1));

std::atomic<size_t>& local_execute_count;

as_inc_functor(std::atomic<size_t>& execute_count ) :
local_execute_count (execute_count)
{ }

as_inc_functor( const as_inc_functor &f ) : local_execute_count(f.local_execute_count) { }
void operator=(const as_inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }

void operator()( int num , oneapi::tbb::flow::async_node<int, int>::gateway_type& g) {
++local_execute_count;
g.try_put(num);
// my_thread = std::thread([&](){
// g.try_put(num);
// });
}

};

void test_async_body(){
oneapi::tbb::flow::graph g;

std::atomic<size_t> local_count(0);
as_inc_functor<int> fun(local_count);

oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun);

const size_t n = 10;
for(size_t i = 0; i < n; ++i) {
CHECK_MESSAGE((node1.try_put(1) == true), "try_put needs to return true");
}

//fun.my_thread.join();
g.wait_for_all();

CHECK_MESSAGE( (fun.local_execute_count.load() == n), "Body of the node needs to be executed N times");
async_node<int, int, lightweight> lw_node1(g, serial, fun, lightweight());
async_node<int, int, lightweight> lw_node2(g, serial, fun, lightweight(), oneapi::tbb::flow::node_priority_t(1));
}

void test_copy(){
oneapi::tbb::flow::graph g;
std::atomic<size_t> local_count(0);
as_inc_functor<int> fun(local_count);

oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun);
oneapi::tbb::flow::async_node<int, int> node2(node1);
//! Test buffering property
//! \brief \ref requirement
TEST_CASE("async_node buffering") {
conformance::dummy_functor<int> fun;
conformance::test_buffering<oneapi::tbb::flow::async_node<input_msg, int>, input_msg>(oneapi::tbb::flow::unlimited, fun);
}

void test_priority(){
oneapi::tbb::flow::graph g;
std::atomic<size_t> local_count(0);
as_inc_functor<int> fun(local_count);

oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun, oneapi::tbb::flow::no_priority);
//! Test priorities work in single-threaded configuration
//! \brief \ref requirement
TEST_CASE("async_node priority support"){
conformance::test_priority<oneapi::tbb::flow::async_node<input_msg, int>, input_msg>(oneapi::tbb::flow::unlimited);
}

void test_discarding(){
oneapi::tbb::flow::graph g;

std::atomic<size_t> local_count(0);
as_inc_functor<int> fun(local_count);

oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun);

oneapi::tbb::flow::limiter_node< int > rejecter1( g,0);
oneapi::tbb::flow::limiter_node< int > rejecter2( g,0);

make_edge(node1, rejecter2);
make_edge(node1, rejecter1);

node1.try_put(1);
//! The node that is constructed has a reference to the same graph object as src, has a copy of the initial body used by src, and has the same concurrency threshold as src.
//! The predecessors and successors of src are not copied.
//! \brief \ref requirement
TEST_CASE("async_node copy constructor"){
conformance::test_copy_ctor<oneapi::tbb::flow::async_node<int, int>>();
}

int tmp = -1;
CHECK_MESSAGE((node1.try_get(tmp) == false), "Value should be discarded after rejection");
//! Test calling async body
//! \brief \ref interface \ref requirement
TEST_CASE("Test async_node body") {
conformance::test_body_exec<oneapi::tbb::flow::async_node<input_msg, output_msg>, input_msg, output_msg>(oneapi::tbb::flow::unlimited);
}

g.wait_for_all();
//! Test async_node inheritance relations
//! \brief \ref interface
TEST_CASE("async_node superclasses"){
conformance::test_inheritance<oneapi::tbb::flow::async_node<int, int>, int, int>();
conformance::test_inheritance<oneapi::tbb::flow::async_node<void*, float>, void*, float>();
conformance::test_inheritance<oneapi::tbb::flow::async_node<input_msg, output_msg>, input_msg, output_msg>();
}

//! Test discarding property
//! Test node broadcast messages to successors
//! \brief \ref requirement
TEST_CASE("async_node discarding") {
test_discarding();
TEST_CASE("async_node broadcast"){
conformance::counting_functor<int> fun(conformance::expected);
conformance::test_forwarding<oneapi::tbb::flow::async_node<input_msg, int>, input_msg, int>(1, oneapi::tbb::flow::unlimited, fun);
}

//! Test async_node has a user-settable concurrency limit. It can be set to one of predefined values.
//! The user can also provide a value of type std::size_t to limit concurrency.
//! Test that not more than limited threads works in parallel.
//! \brief \ref requirement
TEST_CASE("concurrency follows set limits"){
conformance::test_concurrency<oneapi::tbb::flow::async_node<int, int>>();
}

//! Test async_node priority interface
//! Test body copying and copy_body logic
//! Test the body object passed to a node is copied
//! \brief \ref interface
TEST_CASE("async_node priority interface"){
test_priority();
TEST_CASE("async_node body copying"){
conformance::test_copy_body_function<oneapi::tbb::flow::async_node<int, int>, conformance::copy_counting_object<int>>(oneapi::tbb::flow::unlimited);
}

//! Test async_node copy
//! Test node reject the incoming message if the concurrency limit achieved.
//! \brief \ref interface
TEST_CASE("async_node copy"){
test_copy();
TEST_CASE("async_node with rejecting policy"){
conformance::test_rejecting<oneapi::tbb::flow::async_node<int, int, oneapi::tbb::flow::rejecting>>();
}

//! Test calling async body
//! Test node Input class meet the DefaultConstructible and CopyConstructible requirements and Output class meet the CopyConstructible requirements.
//! \brief \ref interface \ref requirement
TEST_CASE("Test async_node body") {
test_async_body();
TEST_CASE("Test async_node Output and Input class") {
using Body = conformance::copy_counting_object<int>;
conformance::test_output_input_class<oneapi::tbb::flow::async_node<Body, Body>, Body>();
}

//! Test async_node inheritance relations
//! Test the body of assync_node typically submits the messages to an external activity for processing outside of the graph.
//! \brief \ref interface
TEST_CASE("async_node superclasses"){
test_inheritance<int, int>();
test_inheritance<void*, float>();
TEST_CASE("async_node with rejecting policy"){
using async_node_type = tbb::flow::async_node<int, int>;
using gateway_type = async_node_type::gateway_type;

oneapi::tbb::flow::graph g;
std::atomic<bool> flag{false};
std::thread thr;
async_node_type testing_node{
g, tbb::flow::unlimited,
[&](const int& input, gateway_type& gateway) {
gateway.reserve_wait();
thr = std::thread{[&]{
flag = true;
gateway.try_put(input);
gateway.release_wait();
}};
}
};

testing_node.try_put(1);
g.wait_for_all();
CHECK_MESSAGE((flag.load()), "The body of assync_node must submits the messages to an external activity for processing outside of the graph");
thr.join();
}
109 changes: 35 additions & 74 deletions test/conformance/conformance_broadcast_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,99 +18,60 @@
#pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
#endif

#include "common/test.h"

#include "common/utils.h"
#include "common/graph_utils.h"

#include "oneapi/tbb/flow_graph.h"
#include "oneapi/tbb/task_arena.h"
#include "oneapi/tbb/global_control.h"

#include "conformance_flowgraph.h"

//! \file conformance_broadcast_node.cpp
//! \brief Test for [flow_graph.broadcast_node] specification

/*
TODO: implement missing conformance tests for broadcast_node:
- [ ] The copy constructor and copy assignment are called for the node's type template parameter.
- [ ] Improve test for constructors.
*/

template<typename T>
void test_inheritance(){
using namespace oneapi::tbb::flow;
using input_msg = conformance::message</*default_ctor*/false, /*copy_ctor*/true/*enable for queue_node successor*/, /*copy_assign*/true/*enable for queue_node successor*/>;

CHECK_MESSAGE( (std::is_base_of<graph_node, broadcast_node<T>>::value), "broadcast_node should be derived from graph_node");
CHECK_MESSAGE( (std::is_base_of<receiver<T>, broadcast_node<T>>::value), "broadcast_node should be derived from receiver<T>");
CHECK_MESSAGE( (std::is_base_of<sender<T>, broadcast_node<T>>::value), "broadcast_node should be derived from sender<T>");
//! Test function_node broadcast
//! \brief \ref requirement
TEST_CASE("broadcast_node broadcasts"){
conformance::test_forwarding<oneapi::tbb::flow::broadcast_node<int>, int>(1);
conformance::test_forwarding<oneapi::tbb::flow::broadcast_node<input_msg>, input_msg>(1);
}

void test_copies(){
using namespace oneapi::tbb::flow;

graph g;
broadcast_node<int> n(g);
broadcast_node<int> n2(n);
//! Test broadcast_node buffering
//! \brief \ref requirement
TEST_CASE("broadcast_node buffering"){
conformance::test_buffering<oneapi::tbb::flow::broadcast_node<int>, int>();
}

void test_buffering(){
oneapi::tbb::flow::graph g;

oneapi::tbb::flow::broadcast_node<int> node(g);
oneapi::tbb::flow::limiter_node<int> rejecter(g, 0);

oneapi::tbb::flow::make_edge(node, rejecter);

node.try_put(1);
g.wait_for_all();

int tmp = -1;
CHECK_MESSAGE( (node.try_get(tmp) == false), "try_get after rejection should not succeed");
CHECK_MESSAGE( (tmp == -1), "try_get after rejection should not set value");
//! Test inheritance relations
//! \brief \ref interface
TEST_CASE("broadcast_node superclasses"){
conformance::test_inheritance<oneapi::tbb::flow::broadcast_node<int>, int, int>();
conformance::test_inheritance<oneapi::tbb::flow::broadcast_node<float>, float, float>();
conformance::test_inheritance<oneapi::tbb::flow::broadcast_node<input_msg>, input_msg, input_msg>();
}

void test_forwarding(){
oneapi::tbb::flow::graph g;
//! The node that is constructed has a reference to the same graph object as src.
//! The predecessors and successors of src are not copied.
//! \brief \ref interface
TEST_CASE("broadcast_node copy constructor"){
using namespace oneapi::tbb::flow;
graph g;

oneapi::tbb::flow::broadcast_node<int> node1(g);
test_push_receiver<int> node2(g);
test_push_receiver<int> node3(g);
broadcast_node<int> node0(g);
broadcast_node<int> node1(g);
conformance::test_push_receiver<int> node2(g);
conformance::test_push_receiver<int> node3(g);

oneapi::tbb::flow::make_edge(node0, node1);
oneapi::tbb::flow::make_edge(node1, node2);
oneapi::tbb::flow::make_edge(node1, node3);

node1.try_put(1);
g.wait_for_all();
broadcast_node<int> node_copy(node1);

int c2 = get_count(node2), c3 = get_count(node3);
CHECK_MESSAGE( ( c2 == 1), "Descendant of the node must receive one message.");
CHECK_MESSAGE( ( c3 == 1), "Descendant of the node must receive one message.");
}
oneapi::tbb::flow::make_edge(node_copy, node3);

//! Test function_node broadcast
//! \brief \ref requirement
TEST_CASE("broadcast_node broadcasts"){
test_forwarding();
}
node_copy.try_put(1);
g.wait_for_all();

//! Test broadcast_node buffering
//! \brief \ref requirement
TEST_CASE("broadcast_node buffering"){
test_buffering();
}
CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 1), "Copied node doesn`t copy successor");

//! Test copy constructor
//! \brief \ref interface
TEST_CASE("broadcast_node copy constructor"){
test_copies();
}
node0.try_put(1);
g.wait_for_all();

//! Test inheritance relations
//! \brief \ref interface
TEST_CASE("broadcast_node superclasses"){
test_inheritance<int>();
test_inheritance<void*>();
CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor");
}

Loading

0 comments on commit de0109b

Please sign in to comment.