Skip to content

Commit

Permalink
Add loopback in start collective function. Add flag for choosing cuda…
Browse files Browse the repository at this point in the history
…/loopback
  • Loading branch information
yaeliyac committed Oct 2, 2024
1 parent 9362d20 commit 7e6c55d
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 114 deletions.
20 changes: 0 additions & 20 deletions src/components/tl/ucp/allgather/allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,6 @@

#define ALLGATHER_MAX_PATTERN_SIZE (sizeof(UCC_TL_UCP_ALLGATHER_DEFAULT_ALG_SELECT_STR))


ucc_status_t new_ucp_tl_self_copy_nb(void *dst, void *src, size_t len, ucc_memory_type_t dst_mem,ucc_memory_type_t src_mem, ucc_rank_t rank, ucc_tl_ucp_team_t *team, ucc_tl_ucp_task_t *task){
ucc_status_t status;
status = ucc_tl_ucp_send_nb(src, len, src_mem, rank, team, task);
// check here all occurances of returns (if this is ok)
if (ucc_unlikely(UCC_OK != status)) {
printf("\n allgather.c line 41 \n");
task->super.status = status;
return status;
}
status = ucc_tl_ucp_recv_nb(dst, len, dst_mem, rank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("\n allgather.c line 47 \n");
task->super.status = status;
return status;
}
return UCC_OK;
}

/*--------------YAELIS FUNCTION---------------------*/
ucc_base_coll_alg_info_t
ucc_tl_ucp_allgather_algs[UCC_TL_UCP_ALLGATHER_ALG_LAST + 1] = {
[UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL] =
Expand Down
7 changes: 0 additions & 7 deletions src/components/tl/ucp/allgather/allgather.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@
#include "tl_ucp_sendrecv.h"



#define NEW_MEMCPY(use_cuda, dst, src, len, dst_mem_type, src_mem_type, rank, team, task) \
((use_cuda) ? ucc_mc_memcpy(dst, src, len, dst_mem_type, src_mem_type) : \
new_ucp_tl_self_copy_nb(dst, src, len, dst_mem_type, src_mem_type, rank, team, task))



enum {
UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL,
UCC_TL_UCP_ALLGATHER_ALG_RING,
Expand Down
35 changes: 21 additions & 14 deletions src/components/tl/ucp/allgather/allgather_bruck.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,28 +240,35 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task)
/* initial step: copy data on non root ranks to the beginning of buffer */

uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
// not inplace: copy chunk from source buff to beginning of receive
/*
status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, rbuf, sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error bruck line 254\n");
return status;
if(USE_CUDA){
status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, rmem, trank, team, task),task, enqueue);
}

} else if (trank != 0) {
printf(" inplace\n");
// inplace: copy chunk to the begin
status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank),
if(USE_CUDA){
status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank),
data_size, rmem, rmem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, rmem, trank, team, task),task, enqueue);
}
}

enqueue:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}
76 changes: 40 additions & 36 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "utils/ucc_coll_utils.h"
#include "allgather.h"



#define SAVE_STATE(_phase) \
do { \
task->allgather_kn.phase = _phase; \
Expand Down Expand Up @@ -57,11 +59,19 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_rank_t peer, peer_dist;
ucc_kn_radix_t loop_step;
size_t peer_seg_count, local_seg_count;
//ucc_status_t status;
ucc_status_t status;
size_t extra_count;

//EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask);
//task->allgather_kn.etask = NULL;
uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if(!USE_CUDA){
if (UCC_INPROGRESS == ucc_tl_ucp_test(task)){
return;
}
}
if(USE_CUDA) EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask);
task->allgather_kn.etask = NULL;

UCC_KN_GOTO_PHASE(task->allgather_kn.phase);
if (KN_NODE_EXTRA == node_type) {
peer = ucc_knomial_pattern_get_proxy(p, rank);
Expand Down Expand Up @@ -176,6 +186,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)

ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
{

ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task,
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
Expand Down Expand Up @@ -205,39 +216,33 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
offset = ucc_buffer_block_offset(args->dst.info.count, size, rank) *
ucc_dt_size(args->dst.info.datatype);

if (!UCC_IS_INPLACE(*args) && USE_CUDA) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
}
if (!UCC_IS_INPLACE(*args) && !USE_CUDA){
status = new_ucp_tl_self_copy_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.buffer,
args->src.info.count * ucc_dt_size(args->src.info.datatype), args->dst.info.mem_type, args->src.info.mem_type,
rank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error knomial line 231\n");
return status;
if (!UCC_IS_INPLACE(*args)){
if(USE_CUDA){
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->src.info.buffer, args->src.info.count * ucc_dt_size(args->src.info.datatype),
args->src.info.mem_type, rank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.count * ucc_dt_size(args->src.info.datatype),
args->dst.info.mem_type, rank, team, task),task, enqueue);
}
}







} else {
ucc_kn_agx_pattern_init(size, rank, radix, args->dst.info.count,
&task->allgather_kn.p);
Expand All @@ -249,8 +254,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_knomial_pattern_loop_rank(p, rank),
p->radix, 0);
}
enqueue:
task->allgather_kn.sbuf = PTR_OFFSET(args->dst.info.buffer, offset);

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

Expand Down Expand Up @@ -284,7 +289,6 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_rank_t size = UCC_TL_TEAM_SIZE(tl_team);
ucc_kn_radix_t radix;

radix = ucc_min(UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allgather_kn_radix, size);
return ucc_tl_ucp_allgather_knomial_init_r(coll_args, team, task_h, radix);
}
24 changes: 12 additions & 12 deletions src/components/tl/ucp/allgather/allgather_neighbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,27 +145,27 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task)
ucc_rank_t neighbor;
void *tmprecv, *tmpsend;


UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_neighbor_start",
0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
/*
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if(!USE_CUDA){
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error neighbor line 162\n");
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, tmp);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, tmp);
}
}

tmp:
if (trank % 2) {
neighbor = (trank - 1 + tsize) % tsize;
} else {
Expand Down
24 changes: 11 additions & 13 deletions src/components/tl/ucp/allgather/allgather_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,20 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task)
uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize,
0);
/*
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, 0);
if(USE_CUDA){
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * block), sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error ring line 110\n");
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, trank, team, task),task, enqueue);
}
}

enqueue:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

Expand Down
30 changes: 18 additions & 12 deletions src/components/tl/ucp/allgather/allgather_sparbit.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,27 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task)
task->allgather_sparbit.data_expected = 1;

uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if(trank == 0){
printf("\nin sparbit using: ");
if(USE_CUDA){
printf("cuda\n");
} else {
printf("loop\n");
}
}
if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
/*
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if(USE_CUDA){
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error bruck line 254\n");
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, enqueue);
}
}

enqueue:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

0 comments on commit 7e6c55d

Please sign in to comment.