From 3f1cd76ac6e7ea0ed1874be56c7102c150d882a1 Mon Sep 17 00:00:00 2001 From: Eyal Chocron Date: Mon, 2 Sep 2024 10:03:41 +0300 Subject: [PATCH] Add support for non symetric topology --- .../cl/hier/allgatherv/allgatherv.c | 117 ++++++++++++------ src/components/cl/hier/barrier/barrier.c | 7 -- src/components/topo/ucc_sbgp.c | 2 +- 3 files changed, 79 insertions(+), 47 deletions(-) diff --git a/src/components/cl/hier/allgatherv/allgatherv.c b/src/components/cl/hier/allgatherv/allgatherv.c index e16ab5e665..2282969f97 100755 --- a/src/components/cl/hier/allgatherv/allgatherv.c +++ b/src/components/cl/hier/allgatherv/allgatherv.c @@ -153,15 +153,16 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule( int n_tasks = 0; void *gv_rc, *gv_displ; // Gatherv args buffers void *agv_rc, *agv_displ; // Allgatherv args buffers - int i, c64, d64, rank, nrank; + int i, c64, d64, rank, nrank, host_id; ucc_rank_t full_size, node_size, leaders_size; size_t elem_size; ucc_rank_t node_root = 0; rank = cl_team->sbgps[UCC_HIER_SBGP_FULL].sbgp->group_rank; nrank = cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp->group_rank; + host_id = ucc_team_rank_host_id(rank, coll_args->team); - printf("[%d] pid=%d\n", rank, getpid()); + printf("[%d] pid=%d, host_id=%d\n", rank, getpid(), host_id); //if (cl_team->sbgps[UCC_HIER_SBGP_FULL].sbgp->group_rank == 0){ // printf("Waiting, pid=%d\n", getpid()); @@ -203,6 +204,10 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule( leaders_size = cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp->group_size; elem_size = c64 ? 8 : 4; + if (!cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp->is_contig){ + printf("Non contiguous node ranking is not supported") + } + // DEBUG int _i; // END OF DEBUG @@ -242,7 +247,12 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule( _displ = 0; - for (_i = 0; is_root && _i < (*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).group_size; _i++) { + /* + For every rank in the node, + add his count as is and the displacement + to be the running sum of the counts + */ + for (_i = 0; is_root && _i < node_size; _i++) { ucc_rank_t r = ucc_ep_map_eval((*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).map, _i); _scount = ((uint32_t *)coll_args->args.dst.info_v.counts)[r]; ((uint32_t *)gv_rc)[_i] = _scount; @@ -254,19 +264,19 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule( // DEBUG - do { - printf("[%d] gatherv src count: %lu\n", rank, args.args.src.info.count); - printf("[%d] gatherv recv_counts: [", rank); - for (_i = 0; _i < node_size; _i++) { - printf("%u,", ((uint32_t *) args.args.dst.info_v.counts)[_i]); - } - printf("]\n"); - printf("[%d] gatherv displ: [", rank); - for (_i = 0; _i < node_size; _i++) { - printf("%u,", ((uint32_t *) args.args.dst.info_v.displacements)[_i]); - } - printf("]\n"); - } while (0); + // do { + // printf("[%d] gatherv src count: %lu\n", rank, args.args.src.info.count); + // printf("[%d] gatherv recv_counts: [", rank); + // for (_i = 0; _i < node_size; _i++) { + // printf("%u,", ((uint32_t *) args.args.dst.info_v.counts)[_i]); + // } + // printf("]\n"); + // printf("[%d] gatherv displ: [", rank); + // for (_i = 0; _i < node_size; _i++) { + // printf("%u,", ((uint32_t *) args.args.dst.info_v.displacements)[_i]); + // } + // printf("]\n"); + // } while (0); //----- args.args.coll_type = UCC_COLL_TYPE_GATHERV; @@ -305,22 +315,51 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule( // agv_args.args.dst.info_v.displacements, full_size); //} - for (_i = 0; _i < (*cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp).group_size; _i++) { + // NEW + for (_i = 0; _i < leaders_size; _i++) { ((uint32_t *) agv_rc)[_i] = 0; - ((uint32_t *) agv_displ)[_i] = ((uint32_t *)coll_args->args.dst.info_v.displacements)[_i*node_size]; // TODO FIX NODE_SIZE } - args.args.src.info.count = 0; - for (_i = 0; _i < full_size; _i++) { - int _is_local = ucc_rank_on_local_node(_i, (team)->params.team->topo); - - if (_is_local) { - args.args.src.info.count += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i]; - } - // TODO how can i know on which node this rank is ?? This would work only if all nodes are same size - int _node_ix = _i / (*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).group_size; - ((uint32_t *)agv_rc)[_node_ix] += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i]; + int _hid, _count, _displ; + /* + For every rank in the communicator, + - find his host id (which is also the index of the leader -- Sergey ?) + - Add his count to the leader count + */ + for (_i = 0; _i < full_size; _i++){ + _hid = ucc_team_rank_host_id(_i, coll_args->team); + _count = ((uint32_t *)coll_args->args.dst.info_v.counts)[_i]; + // Add the count of this rank to the count of the leader + ((uint32_t *) agv_rc)[_hid] += _count; } + /* + For every leader, add the sum of the previous counts to his displacements + */ + _displ = 0; + for (_i = 0; _i < leaders_size; _i++){ + ((uint32_t *) agv_displ)[_i] = _displ; + _displ += ((uint32_t *) agv_rc)[_i]; + } + // END OF NEW + + // OLD + //for (_i = 0; _i < (*cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp).group_size; _i++) { + // ((uint32_t *) agv_rc)[_i] = 0; + // ((uint32_t *) agv_displ)[_i] = ((uint32_t *)coll_args->args.dst.info_v.displacements)[_i*node_size]; // TODO FIX NODE_SIZE + //} + + //args.args.src.info.count = 0; + //for (_i = 0; _i < full_size; _i++) { + // int _is_local = ucc_rank_on_local_node(_i, (team)->params.team->topo); + // + // if (_is_local) { + // args.args.src.info.count += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i]; + // } + // // TODO how can i know on which node this rank is ?? This would work only if all nodes are same size + // int _node_ix = _i / (*cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp).group_size; + // ((uint32_t *)agv_rc)[_node_ix] += ((uint32_t *)coll_args->args.dst.info_v.counts)[_i]; + //} + // END OF OLD args.args.coll_type = UCC_COLL_TYPE_ALLGATHERV; @@ -328,17 +367,17 @@ static ucc_status_t ucc_cl_hier_allgatherv_node_split_init_schedule( args.args.dst.info_v.displacements = (ucc_aint_t *)agv_displ; // DEBUG do { - // printf("[%d] allgatherv src count: %lu\n", rank, args.args.src.info.count); - // printf("[%d] allgatherv recv counts: [", rank); - // for (_i = 0; _i < node_size; _i++) { - // printf("%u, ", ((uint32_t *) args.args.dst.info_v.counts)[_i]); - // } - // printf("]\n"); - // printf("[%d] allgatherv displacements: [", rank); - // for (_i = 0; _i < node_size; _i++) { - // printf("%u, ", ((uint32_t *) args.args.dst.info_v.displacements)[_i]); - // } - // printf("]\n"); + printf("[%d] allgatherv src count: %lu\n", rank, args.args.src.info.count); + printf("[%d] allgatherv recv counts: [", rank); + for (_i = 0; _i < node_size; _i++) { + printf("%u, ", ((uint32_t *) args.args.dst.info_v.counts)[_i]); + } + printf("]\n"); + printf("[%d] allgatherv displacements: [", rank); + for (_i = 0; _i < node_size; _i++) { + printf("%u, ", ((uint32_t *) args.args.dst.info_v.displacements)[_i]); + } + printf("]\n"); } while (0); // diff --git a/src/components/cl/hier/barrier/barrier.c b/src/components/cl/hier/barrier/barrier.c index 30c895bffe..3e9f673c14 100644 --- a/src/components/cl/hier/barrier/barrier.c +++ b/src/components/cl/hier/barrier/barrier.c @@ -39,13 +39,6 @@ UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_barrier_init, ucc_base_coll_args_t args; int n_tasks, i; - if (cl_team->sbgps[UCC_HIER_SBGP_FULL].sbgp->group_rank == 0) { - printf("Waiting, pid=%d\n", getpid()); - int wait = 1; - while (wait) { - sleep(1); - } - } schedule = &ucc_cl_hier_get_schedule(cl_team)->super.super; if (ucc_unlikely(!schedule)) { diff --git a/src/components/topo/ucc_sbgp.c b/src/components/topo/ucc_sbgp.c index 38d7101043..724468d136 100644 --- a/src/components/topo/ucc_sbgp.c +++ b/src/components/topo/ucc_sbgp.c @@ -143,7 +143,7 @@ static inline ucc_status_t sbgp_create_node(ucc_topo_t *topo, ucc_sbgp_t *sbgp) ucc_free(local_ranks); return UCC_ERR_NO_MESSAGE; } - sbgp->is_contig = local_ranks[0] == ctx_nlr + sbgp->is_contig = local_ranks[0] == ctx_nlr; sbgp->group_size = node_size; sbgp->group_rank = node_rank; sbgp->rank_map = local_ranks;