Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
x41lakazam committed Sep 3, 2024
1 parent 3f1cd76 commit e0ac473
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 302 deletions.
7 changes: 3 additions & 4 deletions src/components/cl/hier/barrier/barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ static ucc_status_t ucc_cl_hier_barrier_finalize(ucc_coll_task_t *task)
ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t);
ucc_status_t status;

UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_barrier_finalize", 0);
UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_barrier_finalize",
0);
status = ucc_schedule_finalize(task);
ucc_cl_hier_put_schedule(schedule);
return status;
Expand All @@ -31,15 +32,13 @@ UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_barrier_init,
ucc_base_coll_args_t *coll_args, ucc_base_team_t *team,
ucc_coll_task_t **task)
{

ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t);
ucc_coll_task_t *tasks[MAX_BARRIER_TASKS] = {NULL};
ucc_schedule_t *schedule;
ucc_status_t status;
ucc_base_coll_args_t args;
int n_tasks, i;


schedule = &ucc_cl_hier_get_schedule(cl_team)->super.super;
if (ucc_unlikely(!schedule)) {
return UCC_ERR_NO_MEMORY;
Expand Down Expand Up @@ -72,6 +71,7 @@ UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_barrier_init,
n_tasks++;
}


if (SBGP_ENABLED(cl_team, NODE) &&
cl_team->top_sbgp != UCC_HIER_SBGP_NODE) {
args.args.coll_type = UCC_COLL_TYPE_FANOUT;
Expand All @@ -97,7 +97,6 @@ UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_barrier_init,
schedule->super.post = ucc_cl_hier_barrier_start;
schedule->super.finalize = ucc_cl_hier_barrier_finalize;
*task = &schedule->super;

return UCC_OK;

out:
Expand Down
6 changes: 3 additions & 3 deletions src/components/tl/ucp/allgather/allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ucc_base_coll_alg_info_t
[UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL] =
{.id = UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL,
.name = "knomial",
.desc = "recursive k-ing with arbitrary radix"},
.desc = "recursive k-ing with arbitrary radix "},
[UCC_TL_UCP_ALLGATHER_ALG_RING] =
{.id = UCC_TL_UCP_ALLGATHER_ALG_RING,
.name = "ring",
Expand All @@ -23,11 +23,11 @@ ucc_base_coll_alg_info_t
{.id = UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR,
.name = "neighbor",
.desc = "O(N) Neighbor Exchange N/2 steps"},
[UCC_TL_UCP_ALLGATHER_ALG_BRUCK] =
[UCC_TL_UCP_ALLGATHER_ALG_BRUCK] =
{.id = UCC_TL_UCP_ALLGATHER_ALG_BRUCK,
.name = "bruck",
.desc = "O(log(N)) Variation of Bruck algorithm"},
[UCC_TL_UCP_ALLGATHER_ALG_SPARBIT] =
[UCC_TL_UCP_ALLGATHER_ALG_SPARBIT] =
{.id = UCC_TL_UCP_ALLGATHER_ALG_SPARBIT,
.name = "sparbit",
.desc = "O(log(N)) SPARBIT algorithm"},
Expand Down
62 changes: 19 additions & 43 deletions src/components/tl/ucp/allgatherv/allgatherv_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "utils/ucc_math.h"
#include "utils/ucc_coll_utils.h"
#include "tl_ucp_sendrecv.h"
#include <stdio.h>

void ucc_tl_ucp_allgatherv_ring_progress(ucc_coll_task_t *coll_task)
{
Expand All @@ -34,22 +33,23 @@ void ucc_tl_ucp_allgatherv_ring_progress(ucc_coll_task_t *coll_task)
recvfrom = ucc_ep_map_eval(task->subset.map, (trank - 1 + tsize) % tsize);

while (task->tagged.send_posted < tsize) {
send_idx = ucc_ep_map_eval(
task->subset.map,
(trank - task->tagged.send_posted + 1 + tsize) % tsize);
send_idx =
ucc_ep_map_eval(task->subset.map, (trank -
task->tagged.send_posted + 1 +
tsize) % tsize);
data_displ = ucc_coll_args_get_displacement(
args, args->dst.info_v.displacements, send_idx) *
rdt_size;
data_size =

ucc_coll_args_get_count(args, args->dst.info_v.counts, send_idx) *
rdt_size;
UCPCHECK_GOTO(ucc_tl_ucp_send_nb((void *)(rbuf + data_displ), data_size,
rmem, sendto, team, task),
task, out);
recv_idx =
ucc_ep_map_eval(task->subset.map,
(trank - task->tagged.recv_posted + tsize) % tsize);
recv_idx =
ucc_ep_map_eval(task->subset.map, (trank -
task->tagged.recv_posted +
tsize) % tsize);
data_displ = ucc_coll_args_get_displacement(
args, args->dst.info_v.displacements, recv_idx) *
rdt_size;
Expand All @@ -63,18 +63,6 @@ void ucc_tl_ucp_allgatherv_ring_progress(ucc_coll_task_t *coll_task)
return;
}
}
// DEBUG
int _i, _total_counts;
_total_counts=0;
for (_i=0; _i < tsize; _i++){
_total_counts += ucc_coll_args_get_count(args, args->dst.info_v.counts, _i);
}
printf("[%d] end-allgatherv rbuf: [", trank);
for (_i=0; _i < _total_counts; _i++){
printf("%u, ", ((uint32_t *)rbuf)[_i]);
}
printf("]\n");
//--
ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task));
task->super.status = UCC_OK;
out:
Expand All @@ -86,8 +74,8 @@ ucc_status_t ucc_tl_ucp_allgatherv_ring_start(ucc_coll_task_t *coll_task)
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
void *sbuf = args->src.info.buffer;
void *rbuf = args->dst.info_v.buffer;
void * sbuf = args->src.info.buffer;
void * rbuf = args->dst.info_v.buffer;
ucc_memory_type_t smem = args->src.info.mem_type;
ucc_memory_type_t rmem = args->dst.info_v.mem_type;
ucc_rank_t grank = UCC_TL_TEAM_RANK(team);
Expand All @@ -98,34 +86,22 @@ ucc_status_t ucc_tl_ucp_allgatherv_ring_start(ucc_coll_task_t *coll_task)
if (!UCC_IS_INPLACE(*args)) {
/* TODO replace local sendrecv with memcpy? */
rdt_size = ucc_dt_size(args->dst.info_v.datatype);
data_displ = ucc_coll_args_get_displacement(
args, args->dst.info_v.displacements, grank) *
rdt_size;
data_size =
ucc_coll_args_get_count(args, args->dst.info_v.counts, grank) *
rdt_size;
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_displ),
data_size, rmem, grank, team, task),
data_displ = ucc_coll_args_get_displacement(args,
args->dst.info_v.displacements, grank) * rdt_size;
data_size = ucc_coll_args_get_count(args,
args->dst.info_v.counts, grank) * rdt_size;
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_displ), data_size,
rmem, grank, team, task),
task, error);
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, grank, team, task),
task, error);
UCPCHECK_GOTO(
ucc_tl_ucp_send_nb(sbuf, data_size, smem, grank, team, task), task,
error);
} else {
/* to simplify progress fucnction and make it identical for
in-place and non in-place */
task->tagged.send_posted = task->tagged.send_completed = 1;
task->tagged.recv_posted = task->tagged.recv_completed = 1;
}

// DEBUG
// int _i;
// printf("[%d] start-allgatherv sbuf: [", grank);
// for (_i=0; _i < 4; _i++){
// printf("%u, ", ((uint32_t *)sbuf)[_i]);
// }
// printf("]\n");
// -----

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
error:
return task->super.status;
Expand All @@ -134,7 +110,7 @@ ucc_status_t ucc_tl_ucp_allgatherv_ring_start(ucc_coll_task_t *coll_task)
ucc_status_t ucc_tl_ucp_allgatherv_ring_init_common(ucc_tl_ucp_task_t *task)
{
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_sbgp_t *sbgp;
ucc_sbgp_t *sbgp;

if (!ucc_coll_args_is_predefined_dt(&TASK_ARGS(task), UCC_RANK_INVALID)) {
tl_error(UCC_TASK_LIB(task), "user defined datatype is not supported");
Expand Down
17 changes: 0 additions & 17 deletions src/components/tl/ucp/bcast/bcast_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,6 @@ void ucc_tl_ucp_bcast_knomial_progress(ucc_coll_task_t *coll_task)
return;
}

// DEBUG
// int _i;
// printf("[%d] end-bcast buffer: [", task->subset.myrank);
// for (_i=0; _i < 8; _i++){
// printf("%u, ", ((uint32_t *)TASK_ARGS(task).src.info.buffer)[_i]);
// }
// printf("]\n");
// -----
ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task));
task->super.status = UCC_OK;
UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_bcast_kn_done", 0);
Expand All @@ -101,14 +93,5 @@ ucc_status_t ucc_tl_ucp_bcast_knomial_start(ucc_coll_task_t *coll_task)

CALC_KN_TREE_DIST(size, task->bcast_kn.radix, task->bcast_kn.dist);

// DEBUG
// int _i;
// printf("[%d] start-bcast buffer: [", task->subset.myrank);
// for (_i=0; _i < 8; _i++){
// printf("%u, ", ((uint32_t *)TASK_ARGS(task).src.info.buffer)[_i]);
// }
// printf("]\n");
// -----

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}
87 changes: 29 additions & 58 deletions src/components/tl/ucp/gatherv/gatherv_linear.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "config.h"
#include "tl_ucp.h"
#include "gatherv.h"
#include <stdio.h>
#include "core/ucc_progress_queue.h"
#include "tl_ucp_sendrecv.h"

Expand All @@ -22,7 +21,7 @@ void ucc_tl_ucp_gatherv_linear_progress(ucc_coll_task_t *coll_task)
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
void *rbuf = args->dst.info_v.buffer;
void* rbuf = args->dst.info_v.buffer;
ucc_memory_type_t rmem = args->dst.info_v.mem_type;
ucc_rank_t grank = UCC_TL_TEAM_RANK(team);
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
Expand All @@ -40,17 +39,14 @@ void ucc_tl_ucp_gatherv_linear_progress(ucc_coll_task_t *coll_task)
while ((task->tagged.recv_posted < gsize) &&
((task->tagged.recv_posted - task->tagged.recv_completed) <
nreqs)) {
peer = get_peer(grank, gsize, task->tagged.recv_posted);
data_size = ucc_coll_args_get_count(
args, args->dst.info_v.counts, peer) *
dt_size;
data_displ = ucc_coll_args_get_displacement(
args, args->dst.info_v.displacements, peer) *
dt_size;
peer = get_peer(grank, gsize, task->tagged.recv_posted);
data_size = ucc_coll_args_get_count(args,
args->dst.info_v.counts, peer) * dt_size;
data_displ = ucc_coll_args_get_displacement(args,
args->dst.info_v.displacements, peer) * dt_size;
UCPCHECK_GOTO(ucc_tl_ucp_recv_nz(PTR_OFFSET(rbuf, data_displ),
data_size, rmem, peer, team,
task),
task, out);
data_size, rmem, peer, team, task),
task, out);
polls = 0;
}
}
Expand All @@ -62,19 +58,8 @@ void ucc_tl_ucp_gatherv_linear_progress(ucc_coll_task_t *coll_task)
task->super.status = ucc_tl_ucp_test(task);
out:
if (task->super.status != UCC_INPROGRESS) {
// DEBUG
int _i, _total_counts;
_total_counts=0;
for (_i=0; _i < gsize; _i++){
_total_counts += ucc_coll_args_get_count(args, args->dst.info_v.counts, _i);
}
printf("[%d] end-gatherv rbuf (pid=%d): [", grank, getpid());
for (_i=0; _i < 4; _i++){
printf("%u, ", ((uint32_t *)rbuf)[_i]);
}
printf("]\n");
// ----
UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_gatherv_linear_done", 0);
UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task,
"ucp_gatherv_linear_done", 0);
}
}

Expand All @@ -86,7 +71,7 @@ ucc_status_t ucc_tl_ucp_gatherv_linear_start(ucc_coll_task_t *coll_task)
ucc_rank_t grank = UCC_TL_TEAM_RANK(team);
ucc_memory_type_t smem = args->src.info.mem_type;
ucc_memory_type_t rmem = args->dst.info_v.mem_type;
void *sbuf = args->src.info.buffer;
void * sbuf = args->src.info.buffer;
void *rbuf;
size_t dt_size, data_displ, data_size;

Expand All @@ -95,52 +80,38 @@ ucc_status_t ucc_tl_ucp_gatherv_linear_start(ucc_coll_task_t *coll_task)

if (UCC_IS_ROOT(*args, grank)) {
if (!UCC_IS_INPLACE(*args)) {
dt_size = ucc_dt_size(args->dst.info_v.datatype);
data_size =
ucc_coll_args_get_count(args, args->dst.info_v.counts, grank) *
dt_size;
data_displ = ucc_coll_args_get_displacement(
args, args->dst.info_v.displacements, grank) *
dt_size;
rbuf = PTR_OFFSET(args->dst.info_v.buffer, data_displ);
dt_size = ucc_dt_size(args->dst.info_v.datatype);
data_size = ucc_coll_args_get_count(args,
args->dst.info_v.counts, grank) * dt_size;
data_displ = ucc_coll_args_get_displacement(args,
args->dst.info_v.displacements, grank) * dt_size;
rbuf = PTR_OFFSET(args->dst.info_v.buffer, data_displ);

UCPCHECK_GOTO(
ucc_tl_ucp_recv_nz(rbuf, data_size, rmem, grank, team, task),
task, error);
UCPCHECK_GOTO(
ucc_tl_ucp_send_nz(sbuf, data_size, smem, grank, team, task),
task, error);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nz(rbuf, data_size, rmem, grank, team,
task),
task, error);
UCPCHECK_GOTO(ucc_tl_ucp_send_nz(sbuf, data_size, smem, grank, team,
task),
task, error);
} else {
/* to simplify progress fucnction and make it identical for
in-place and non in-place */
task->tagged.send_posted = task->tagged.send_completed = 1;
task->tagged.recv_posted = task->tagged.recv_completed = 1;
}
} else {
dt_size = ucc_dt_size(args->src.info.datatype);
data_size = args->src.info.count * dt_size;

UCPCHECK_GOTO(
ucc_tl_ucp_send_nz(sbuf, data_size, smem, args->root, team, task),
task, error);
}
dt_size = ucc_dt_size(args->src.info.datatype);
data_size = args->src.info.count * dt_size;

// DEBUG
int _i, _total_counts;
_total_counts=0;
for (_i=0; _i < 4; _i++){
_total_counts += ucc_coll_args_get_count(args, args->dst.info_v.counts, _i);
}
printf("[%d] (%d) start-gatherv sbuf: [", grank, _total_counts);
for (_i=0; _i < 4; _i++){
printf("%u, ", ((uint32_t *)sbuf)[_i]);
UCPCHECK_GOTO(ucc_tl_ucp_send_nz(sbuf, data_size, smem, args->root,
team, task),
task, error);
}
printf("]\n");
// ----

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
error:
return task->super.status;

}

ucc_status_t ucc_tl_ucp_gatherv_linear_init(ucc_tl_ucp_task_t *task)
Expand Down
Loading

0 comments on commit e0ac473

Please sign in to comment.