Skip to content

Commit

Permalink
Add support for non symetric topology
Browse files Browse the repository at this point in the history
  • Loading branch information
x41lakazam committed Sep 2, 2024
1 parent a9ec2b0 commit 3f1cd76
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 47 deletions.
117 changes: 78 additions & 39 deletions src/components/cl/hier/allgatherv/allgatherv.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,16 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule(
int n_tasks = 0;
void *gv_rc, *gv_displ; // Gatherv args buffers
void *agv_rc, *agv_displ; // Allgatherv args buffers
int i, c64, d64, rank, nrank;
int i, c64, d64, rank, nrank, host_id;
ucc_rank_t full_size, node_size, leaders_size;
size_t elem_size;
ucc_rank_t node_root = 0;

rank = cl_team->sbgps[UCC_HIER_SBGP_FULL].sbgp->group_rank;
nrank = cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp->group_rank;
host_id = ucc_team_rank_host_id(rank, coll_args->team);

printf("[%d] pid=%d\n", rank, getpid());
printf("[%d] pid=%d, host_id=%d\n", rank, getpid(), host_id);

//if (cl_team->sbgps[UCC_HIER_SBGP_FULL].sbgp->group_rank == 0){
// printf("Waiting, pid=%d\n", getpid());
Expand Down Expand Up @@ -203,6 +204,10 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule(
leaders_size = cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp->group_size;
elem_size = c64 ? 8 : 4;

if (!cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp->is_contig){
printf("Non contiguous node ranking is not supported")
}

// DEBUG
int _i;
// END OF DEBUG
Expand Down Expand Up @@ -242,7 +247,12 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule(

_displ = 0;

for (_i = 0; is_root && _i < (*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).group_size; _i++) {
/*
For every rank in the node,
add his count as is and the displacement
to be the running sum of the counts
*/
for (_i = 0; is_root && _i < node_size; _i++) {
ucc_rank_t r = ucc_ep_map_eval((*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).map, _i);
_scount = ((uint32_t *)coll_args->args.dst.info_v.counts)[r];
((uint32_t *)gv_rc)[_i] = _scount;
Expand All @@ -254,19 +264,19 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule(


// DEBUG
do {
printf("[%d] gatherv src count: %lu\n", rank, args.args.src.info.count);
printf("[%d] gatherv recv_counts: [", rank);
for (_i = 0; _i < node_size; _i++) {
printf("%u,", ((uint32_t *) args.args.dst.info_v.counts)[_i]);
}
printf("]\n");
printf("[%d] gatherv displ: [", rank);
for (_i = 0; _i < node_size; _i++) {
printf("%u,", ((uint32_t *) args.args.dst.info_v.displacements)[_i]);
}
printf("]\n");
} while (0);
// do {
// printf("[%d] gatherv src count: %lu\n", rank, args.args.src.info.count);
// printf("[%d] gatherv recv_counts: [", rank);
// for (_i = 0; _i < node_size; _i++) {
// printf("%u,", ((uint32_t *) args.args.dst.info_v.counts)[_i]);
// }
// printf("]\n");
// printf("[%d] gatherv displ: [", rank);
// for (_i = 0; _i < node_size; _i++) {
// printf("%u,", ((uint32_t *) args.args.dst.info_v.displacements)[_i]);
// }
// printf("]\n");
// } while (0);
//-----

args.args.coll_type = UCC_COLL_TYPE_GATHERV;
Expand Down Expand Up @@ -305,40 +315,69 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule(
// agv_args.args.dst.info_v.displacements, full_size);
//}

for (_i = 0; _i < (*cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp).group_size; _i++) {
// NEW
for (_i = 0; _i < leaders_size; _i++) {
((uint32_t *) agv_rc)[_i] = 0;
((uint32_t *) agv_displ)[_i] = ((uint32_t *)coll_args->args.dst.info_v.displacements)[_i*node_size]; // TODO FIX NODE_SIZE
}

args.args.src.info.count = 0;
for (_i = 0; _i < full_size; _i++) {
int _is_local = ucc_rank_on_local_node(_i, (team)->params.team->topo);

if (_is_local) {
args.args.src.info.count += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i];
}
// TODO how can i know on which node this rank is ?? This would work only if all nodes are same size
int _node_ix = _i / (*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).group_size;
((uint32_t *)agv_rc)[_node_ix] += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i];
int _hid, _count, _displ;
/*
For every rank in the communicator,
- find his host id (which is also the index of the leader -- Sergey ?)
- Add his count to the leader count
*/
for (_i = 0; _i < full_size; _i++){
_hid = ucc_team_rank_host_id(_i, coll_args->team);
_count = ((uint32_t *)coll_args->args.dst.info_v.counts)[_i];
// Add the count of this rank to the count of the leader
((uint32_t *) agv_rc)[_hid] += _count;
}
/*
For every leader, add the sum of the previous counts to his displacements
*/
_displ = 0;
for (_i = 0; _i < leaders_size; _i++){
((uint32_t *) agv_displ)[_i] = _displ;
_displ += ((uint32_t *) agv_rc)[_i];
}
// END OF NEW

// OLD
//for (_i = 0; _i < (*cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp).group_size; _i++) {
// ((uint32_t *) agv_rc)[_i] = 0;
// ((uint32_t *) agv_displ)[_i] = ((uint32_t *)coll_args->args.dst.info_v.displacements)[_i*node_size]; // TODO FIX NODE_SIZE
//}

//args.args.src.info.count = 0;
//for (_i = 0; _i < full_size; _i++) {
// int _is_local = ucc_rank_on_local_node(_i, (team)->params.team->topo);
//
// if (_is_local) {
// args.args.src.info.count += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i];
// }
// // TODO how can i know on which node this rank is ?? This would work only if all nodes are same size
// int _node_ix = _i / (*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).group_size;
// ((uint32_t *)agv_rc)[_node_ix] += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i];
//}
// END OF OLD


args.args.coll_type = UCC_COLL_TYPE_ALLGATHERV;
args.args.dst.info_v.counts = (ucc_count_t *)agv_rc;
args.args.dst.info_v.displacements = (ucc_aint_t *)agv_displ;
// DEBUG
do {
// printf("[%d] allgatherv src count: %lu\n", rank, args.args.src.info.count);
// printf("[%d] allgatherv recv counts: [", rank);
// for (_i = 0; _i < node_size; _i++) {
// printf("%u, ", ((uint32_t *) args.args.dst.info_v.counts)[_i]);
// }
// printf("]\n");
// printf("[%d] allgatherv displacements: [", rank);
// for (_i = 0; _i < node_size; _i++) {
// printf("%u, ", ((uint32_t *) args.args.dst.info_v.displacements)[_i]);
// }
// printf("]\n");
printf("[%d] allgatherv src count: %lu\n", rank, args.args.src.info.count);
printf("[%d] allgatherv recv counts: [", rank);
for (_i = 0; _i < node_size; _i++) {
printf("%u, ", ((uint32_t *) args.args.dst.info_v.counts)[_i]);
}
printf("]\n");
printf("[%d] allgatherv displacements: [", rank);
for (_i = 0; _i < node_size; _i++) {
printf("%u, ", ((uint32_t *) args.args.dst.info_v.displacements)[_i]);
}
printf("]\n");
} while (0);
//

Expand Down
7 changes: 0 additions & 7 deletions src/components/cl/hier/barrier/barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_barrier_init,
ucc_base_coll_args_t args;
int n_tasks, i;

if (cl_team->sbgps[UCC_HIER_SBGP_FULL].sbgp->group_rank == 0) {
printf("Waiting, pid=%d\n", getpid());
int wait = 1;
while (wait) {
sleep(1);
}
}

schedule = &ucc_cl_hier_get_schedule(cl_team)->super.super;
if (ucc_unlikely(!schedule)) {
Expand Down
2 changes: 1 addition & 1 deletion src/components/topo/ucc_sbgp.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ static inline ucc_status_t sbgp_create_node(ucc_topo_t *topo, ucc_sbgp_t *sbgp)
ucc_free(local_ranks);
return UCC_ERR_NO_MESSAGE;
}
sbgp->is_contig = local_ranks[0] == ctx_nlr
sbgp->is_contig = local_ranks[0] == ctx_nlr;
sbgp->group_size = node_size;
sbgp->group_rank = node_rank;
sbgp->rank_map = local_ranks;
Expand Down

0 comments on commit 3f1cd76

Please sign in to comment.