diff --git a/src/components/tl/ucp/alltoall/alltoall_onesided.c b/src/components/tl/ucp/alltoall/alltoall_onesided.c index 3e9f543a8e..028a85f31e 100644 --- a/src/components/tl/ucp/alltoall/alltoall_onesided.c +++ b/src/components/tl/ucp/alltoall/alltoall_onesided.c @@ -62,12 +62,12 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask) ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team); long * pSync = TASK_ARGS(task).global_work_buffer; - if (*pSync < gsize - 1 || task->send_completed < task->send_posted) { + if (*pSync < gsize || task->send_completed < task->send_posted) { ucp_worker_progress(UCC_TL_UCP_TEAM_CTX(team)->ucp_worker); return UCC_INPROGRESS; } - *pSync = -1; + pSync[0] = 0; task->super.super.status = UCC_OK; ucc_task_complete(ctask); return task->super.super.status; diff --git a/src/ucc/api/ucc.h b/src/ucc/api/ucc.h index 704e6c4594..e27b30a8df 100644 --- a/src/ucc/api/ucc.h +++ b/src/ucc/api/ucc.h @@ -1848,15 +1848,16 @@ typedef struct ucc_coll_args { collectives */ ucc_error_type_t error_type; /*!< Error type */ ucc_coll_id_t tag; /*!< Used for ordering collectives */ - void *global_work_buffer; /*!< User allocated scratchpad - buffer for one-sided - collectives. The buffer - provided should be at least - the size returned by @ref - ucc_context_get_attr with - the field mask - - UCC_CONTEXT_ATTR_FIELD_WORK_BUFFER_SIZE - set to 1. */ + void *global_work_buffer; /*!< User allocated scratchpad + buffer for one-sided + collectives. The buffer + provided should be at least + the size returned by @ref + ucc_context_get_attr with + the field mask - + UCC_CONTEXT_ATTR_FIELD_WORK_BUFFER_SIZE + set to 1. The buffer must be initialized + to 0. */ ucc_coll_callback_t cb; double timeout; /*!< Timeout in seconds */ } ucc_coll_args_t; diff --git a/test/gtest/common/test_ucc.cc b/test/gtest/common/test_ucc.cc index 7caa7cad3c..b3036f5a9b 100644 --- a/test/gtest/common/test_ucc.cc +++ b/test/gtest/common/test_ucc.cc @@ -41,6 +41,11 @@ UccProcess::~UccProcess() { EXPECT_EQ(UCC_OK, ucc_context_destroy(ctx_h)); EXPECT_EQ(UCC_OK, ucc_finalize(lib_h)); + if (ctx_params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) { + for (auto i = 0; i < UCC_TEST_N_MEM_SEGMENTS; i++) { + ucc_free(onesided_buf[i]); + } + } } ucc_status_t UccTeam::allgather(void *src_buf, void *recv_buf, size_t size, @@ -157,7 +162,8 @@ uint64_t rank_map_cb(uint64_t ep, void *cb_ctx) { return (uint64_t)team->procs[(int)ep].p.get()->job_rank; } -void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range) +void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range, + bool is_onesided) { ucc_team_params_t team_params; std::vector cis; @@ -189,6 +195,10 @@ void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range) team_params.oob.oob_ep = i; team_params.mask |= UCC_TEAM_PARAM_FIELD_OOB; } + if (is_onesided) { + team_params.mask |= UCC_TEAM_PARAM_FIELD_FLAGS; + team_params.flags = UCC_TEAM_FLAG_COLL_WORK_BUFFER; + } EXPECT_EQ(UCC_OK, ucc_team_create_post(&(procs[i].p.get()->ctx_h), 1, &team_params, &(procs[i].team))); @@ -211,7 +221,6 @@ void UccTeam::init_team(bool use_team_ep_map, bool use_ep_range) } } - void UccTeam::destroy_team() { ucc_status_t status; @@ -241,7 +250,7 @@ void UccTeam::progress() } UccTeam::UccTeam(std::vector &_procs, bool use_team_ep_map, - bool use_ep_range) + bool use_ep_range, bool is_onesided) { n_procs = _procs.size(); ag.resize(n_procs); @@ -252,7 +261,7 @@ UccTeam::UccTeam(std::vector &_procs, bool use_team_ep_map, a.phase = AG_INIT; } copy_complete_count = 0; - init_team(use_team_ep_map, use_ep_range); + init_team(use_team_ep_map, use_ep_range, is_onesided); // test_allgather(128); } @@ -293,7 +302,6 @@ UccJob::UccJob(int _n_procs, ucc_job_ctx_mode_t _ctx_mode, ucc_job_env_t vars) : /*restore original env */ setenv(v.first.c_str(), v.second.c_str(), 1); } - } void thread_allgather(void *src_buf, void *recv_buf, size_t size, @@ -391,13 +399,61 @@ void proc_context_create(UccProcess_h proc, int id, ThreadAllgather *ta, bool is throw std::runtime_error(err_msg.str()); } +void proc_context_create_mem_params(UccProcess_h proc, int id, + ThreadAllgather *ta) +{ + ucc_status_t status; + ucc_context_config_h ctx_config; + std::stringstream err_msg; + ucc_mem_map_t map[UCC_TEST_N_MEM_SEGMENTS]; + + status = ucc_context_config_read(proc->lib_h, NULL, &ctx_config); + if (status != UCC_OK) { + err_msg << "ucc_context_config_read failed"; + goto exit_err; + } + for (auto i = 0; i < UCC_TEST_N_MEM_SEGMENTS; i++) { + proc->onesided_buf[i] = + ucc_calloc(UCC_TEST_MEM_SEGMENT_SIZE, 1, "onesided_buffer"); + EXPECT_NE(proc->onesided_buf[i], nullptr); + map[i].address = proc->onesided_buf[i]; + map[i].len = UCC_TEST_MEM_SEGMENT_SIZE; + } + proc->ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; + proc->ctx_params.mask |= UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS; + proc->ctx_params.oob.allgather = thread_allgather_start; + proc->ctx_params.oob.req_test = thread_allgather_req_test; + proc->ctx_params.oob.req_free = thread_allgather_req_free; + proc->ctx_params.oob.coll_info = (void *)&ta->reqs[id]; + proc->ctx_params.oob.n_oob_eps = ta->n_procs; + proc->ctx_params.oob.oob_ep = id; + proc->ctx_params.mem_params.segments = map; + proc->ctx_params.mem_params.n_segments = UCC_TEST_N_MEM_SEGMENTS; + status = ucc_context_create(proc->lib_h, &proc->ctx_params, ctx_config, + &proc->ctx_h); + ucc_context_config_release(ctx_config); + if (status != UCC_OK) { + err_msg << "ucc_context_create for one-sided context failed"; + goto exit_err; + } + return; + +exit_err: + err_msg << ": " << ucc_status_string(status) << " (" << status << ")"; + throw std::runtime_error(err_msg.str()); +} void UccJob::create_context() { std::vector workers; for (auto i = 0; i < procs.size(); i++) { - workers.push_back(std::thread(proc_context_create, procs[i], i, &ta, - ctx_mode == UCC_JOB_CTX_GLOBAL)); + if (ctx_mode == UCC_JOB_CTX_GLOBAL_ONESIDED) { + workers.push_back( + std::thread(proc_context_create_mem_params, procs[i], i, &ta)); + } else { + workers.push_back(std::thread(proc_context_create, procs[i], i, &ta, + ctx_mode == UCC_JOB_CTX_GLOBAL)); + } } for (auto i = 0; i < procs.size(); i++) { workers[i].join(); @@ -464,28 +520,29 @@ void UccJob::cleanup() } UccTeam_h UccJob::create_team(int _n_procs, bool use_team_ep_map, - bool use_ep_range) + bool use_ep_range, bool is_onesided) { EXPECT_GE(n_procs, _n_procs); std::vector team_procs; - for (int i=0; i<_n_procs; i++) { + for (int i = 0; i < _n_procs; i++) { team_procs.push_back(procs[i]); } - return std::make_shared(team_procs, use_team_ep_map, use_ep_range); + return std::make_shared(team_procs, use_team_ep_map, use_ep_range, + is_onesided); } UccTeam_h UccJob::create_team(std::vector &ranks, bool use_team_ep_map, - bool use_ep_range) + bool use_ep_range, bool is_onesided) { EXPECT_GE(n_procs, ranks.size()); std::vector team_procs; - for (int i=0; i(team_procs, use_team_ep_map, use_ep_range); + return std::make_shared(team_procs, use_team_ep_map, use_ep_range, + is_onesided); } - UccReq::UccReq(UccTeam_h _team, ucc_coll_args_t *args) : team(_team) { diff --git a/test/gtest/common/test_ucc.h b/test/gtest/common/test_ucc.h index 881c11af74..959420acfc 100644 --- a/test/gtest/common/test_ucc.h +++ b/test/gtest/common/test_ucc.h @@ -129,6 +129,7 @@ class UccProcess { }; ucc_lib_h lib_h; ucc_context_h ctx_h; + void * onesided_buf[3]; int job_rank; UccProcess(int _job_rank, const ucc_lib_params_t &lp = default_lib_params, @@ -163,7 +164,7 @@ class UccTeam { UccTeam *self; } allgather_coll_info_t; std::vector ag; - void init_team(bool use_team_ep_map, bool use_ep_range); + void init_team(bool use_team_ep_map, bool use_ep_range, bool is_onesided); void destroy_team(); void test_allgather(size_t msglen); static ucc_status_t allgather(void *src_buf, void *recv_buf, size_t size, @@ -176,7 +177,7 @@ class UccTeam { void progress(); std::vector procs; UccTeam(std::vector &_procs, bool use_team_ep_map = false, - bool use_ep_range = true); + bool use_ep_range = true, bool is_onesided = false); ~UccTeam(); }; typedef std::shared_ptr UccTeam_h; @@ -191,7 +192,8 @@ class UccJob { public: typedef enum { UCC_JOB_CTX_LOCAL, - UCC_JOB_CTX_GLOBAL /*< ucc ctx create with OOB */ + UCC_JOB_CTX_GLOBAL, /*< ucc ctx create with OOB */ + UCC_JOB_CTX_GLOBAL_ONESIDED } ucc_job_ctx_mode_t; static const int nStaticTeams = 3; static const int staticUccJobSize = 16; @@ -205,9 +207,9 @@ class UccJob { ~UccJob(); std::vector procs; UccTeam_h create_team(int n_procs, bool use_team_ep_map = false, - bool use_ep_range = true); + bool use_ep_range = true, bool is_onesided = false); UccTeam_h create_team(std::vector &ranks, bool use_team_ep_map = false, - bool use_ep_range = true); + bool use_ep_range = true, bool is_onesided = false); void create_context(); ucc_job_ctx_mode_t ctx_mode; }; @@ -241,4 +243,7 @@ void clear_buffer(void *_buf, size_t size, ucc_memory_type_t mt, uint8_t value); UCC_DT_UINT8, UCC_DT_UINT16, UCC_DT_UINT32, UCC_DT_UINT64, UCC_DT_UINT128,\ UCC_DT_FLOAT16, UCC_DT_FLOAT32, UCC_DT_FLOAT64) +#define UCC_TEST_N_MEM_SEGMENTS 3 +#define UCC_TEST_MEM_SEGMENT_SIZE (1 << 20) + #endif diff --git a/test/gtest/core/test_alltoall.cc b/test/gtest/core/test_alltoall.cc index 67186692d9..65a6c83295 100644 --- a/test/gtest/core/test_alltoall.cc +++ b/test/gtest/core/test_alltoall.cc @@ -12,19 +12,26 @@ using Param_1 = std::tupleargs = coll; - coll->mask = 0; - coll->coll_type = UCC_COLL_TYPE_ALLTOALL; + coll->mask = 0; + coll->coll_type = UCC_COLL_TYPE_ALLTOALL; coll->src.info.mem_type = mem_type; coll->src.info.count = (ucc_count_t)single_rank_count * nprocs; coll->src.info.datatype = dtype; @@ -35,35 +42,55 @@ class test_alltoall : public UccCollArgs, public ucc::test ctxs[i]->init_buf = ucc_malloc( ucc_dt_size(dtype) * single_rank_count * nprocs, "init buf"); EXPECT_NE(ctxs[i]->init_buf, nullptr); - for (int r = 0; r < nprocs; r++) { + for (int r = 0; r < nprocs; r++) { size_t rank_size = ucc_dt_size(dtype) * single_rank_count; alltoallx_init_buf(r, i, - (uint8_t*)ctxs[i]->init_buf + r * rank_size, + (uint8_t *)ctxs[i]->init_buf + r * rank_size, rank_size); } - - UCC_CHECK(ucc_mc_alloc( - &ctxs[i]->dst_mc_header, - ucc_dt_size(dtype) * single_rank_count * nprocs, mem_type)); - coll->dst.info.buffer = ctxs[i]->dst_mc_header->addr; - if (TEST_INPLACE == inplace) { - coll->mask |= UCC_COLL_ARGS_FIELD_FLAGS; - coll->flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; - UCC_CHECK(ucc_mc_memcpy(coll->dst.info.buffer, ctxs[i]->init_buf, - ucc_dt_size(dtype) * single_rank_count * nprocs, - mem_type, UCC_MEMORY_TYPE_HOST)); + if (is_onesided) { + sbuf = team->procs[i].p->onesided_buf[0]; + rbuf = team->procs[i].p->onesided_buf[1]; + work_buf = (long *)team->procs[i].p->onesided_buf[2]; + coll->mask = UCC_COLL_ARGS_FIELD_FLAGS | + UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; + coll->src.info.buffer = sbuf; + coll->dst.info.buffer = rbuf; + coll->flags = UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS; + coll->global_work_buffer = work_buf; } else { UCC_CHECK(ucc_mc_alloc( - &ctxs[i]->src_mc_header, + &ctxs[i]->dst_mc_header, ucc_dt_size(dtype) * single_rank_count * nprocs, mem_type)); - coll->src.info.buffer = ctxs[i]->src_mc_header->addr; - UCC_CHECK( - ucc_mc_memcpy(coll->src.info.buffer, ctxs[i]->init_buf, - ucc_dt_size(dtype) * single_rank_count * nprocs, - mem_type, UCC_MEMORY_TYPE_HOST)); + coll->dst.info.buffer = ctxs[i]->dst_mc_header->addr; + } + if (TEST_INPLACE == inplace) { + coll->mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll->flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + UCC_CHECK(ucc_mc_memcpy( + coll->dst.info.buffer, ctxs[i]->init_buf, + ucc_dt_size(dtype) * single_rank_count * nprocs, mem_type, + UCC_MEMORY_TYPE_HOST)); + } else { + if (!is_onesided) { + UCC_CHECK(ucc_mc_alloc(&ctxs[i]->src_mc_header, + ucc_dt_size(dtype) * + single_rank_count * nprocs, + mem_type)); + coll->src.info.buffer = ctxs[i]->src_mc_header->addr; + } + UCC_CHECK(ucc_mc_memcpy( + coll->src.info.buffer, ctxs[i]->init_buf, + ucc_dt_size(dtype) * single_rank_count * nprocs, mem_type, + UCC_MEMORY_TYPE_HOST)); } } } + void data_init(int nprocs, ucc_datatype_t dtype, size_t single_rank_count, + UccCollCtxVec &ctxs) + { + data_init(nprocs, dtype, single_rank_count, ctxs, NULL); + } void reset(UccCollCtxVec ctxs) { for (auto r = 0; r < ctxs.size(); r++) { @@ -80,7 +107,6 @@ class test_alltoall : public UccCollArgs, public ucc::test } } } - void data_fini(UccCollCtxVec ctxs) { for (gtest_ucc_coll_ctx_t* ctx : ctxs) { @@ -95,6 +121,16 @@ class test_alltoall : public UccCollArgs, public ucc::test } ctxs.clear(); } + void data_fini_onesided(UccCollCtxVec ctxs) + { + for (gtest_ucc_coll_ctx_t *ctx : ctxs) { + ucc_coll_args_t *coll = ctx->args; + ucc_free(ctx->init_buf); + free(coll); + free(ctx); + } + ctxs.clear(); + } bool data_validate(UccCollCtxVec ctxs) { bool ret = true; @@ -162,6 +198,42 @@ UCC_TEST_P(test_alltoall_0, single) data_fini(ctxs); } +UCC_TEST_P(test_alltoall_0, single_onesided) +{ + const int team_id = std::get<0>(GetParam()); + const ucc_datatype_t dtype = std::get<1>(GetParam()); + ucc_memory_type_t mem_type = std::get<2>(GetParam()); + gtest_ucc_inplace_t inplace = std::get<3>(GetParam()); + const int count = std::get<4>(GetParam()); + UccTeam_h reference_team = UccJob::getStaticTeams()[team_id]; + int size = reference_team->procs.size(); + ucc_job_env_t env = {{"UCC_TL_UCP_TUNE", "alltoall:0-inf:@1"}}; + bool is_contig = true; + UccJob job(size, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env); + UccTeam_h team; + std::vector reference_ranks; + UccCollCtxVec ctxs; + + for (auto i = 0; i < reference_team->n_procs; i++) { + int rank = reference_team->procs[i].p->job_rank; + reference_ranks.push_back(rank); + if (is_contig && i > 0 && + (rank - reference_ranks[i - 1] > 1 || + reference_ranks[i - 1] - rank > 1)) { + is_contig = false; + } + } + team = job.create_team(reference_ranks, true, is_contig, true); + this->set_inplace(inplace); + this->set_mem_type(mem_type); + data_init(size, dtype, count, ctxs, team); + UccReq req(team, ctxs); + req.start(); + req.wait(); + EXPECT_EQ(true, data_validate(ctxs)); + data_fini_onesided(ctxs); +} + UCC_TEST_P(test_alltoall_0, single_persistent) { const int team_id = std::get<0>(GetParam()); diff --git a/test/gtest/core/test_context.cc b/test/gtest/core/test_context.cc index f4cc52ed94..981a986b27 100644 --- a/test/gtest/core/test_context.cc +++ b/test/gtest/core/test_context.cc @@ -76,6 +76,13 @@ UCC_TEST_F(test_context_get_attr, addr) EXPECT_EQ(true, ((attr.ctx_addr_len == 0) || (NULL != attr.ctx_addr))); } +UCC_TEST_F(test_context_get_attr, work_buffer_size) +{ + ucc_context_attr_t attr; + attr.mask = UCC_CONTEXT_ATTR_FIELD_WORK_BUFFER_SIZE; + EXPECT_EQ(UCC_OK, ucc_context_get_attr(ctx_h, &attr)); + EXPECT_EQ(5, attr.global_work_buffer_size); +} UCC_TEST_F(test_context, global) {