Skip to content

Commit

Permalink
TL/UCP: opt radix alg (#766)
Browse files Browse the repository at this point in the history
* TL/UCP: opt radix alg

* TL/UCP: add opt radix to colls and calc socket nb

* TL/UCP: socket size from topo, remove service ar

* REVIEW: code review fixes

* REVIEW: second review fixes

* REVIEW: third review fixes
  • Loading branch information
shimmybalsam authored Jun 23, 2023
1 parent 6c2a4a8 commit 1280664
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 30 deletions.
29 changes: 29 additions & 0 deletions src/coll_patterns/recursive_knomial.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define RECURSIVE_KNOMIAL_H_

#define UCC_KN_PEER_NULL ((ucc_rank_t)-1)
#define UCC_KN_MIN_RADIX 2
typedef uint16_t ucc_kn_radix_t;

enum {
Expand Down Expand Up @@ -230,6 +231,34 @@ ucc_knomial_calc_recv_dist(ucc_rank_t team_size, ucc_rank_t rank,
return dist;
}

/* Calculates (sub) opt radix for Allreduce SRA and Bcast SAG,
by minimizing n_extra ranks */
static inline ucc_rank_t ucc_kn_get_opt_radix(ucc_rank_t team_size,
ucc_kn_radix_t max_radix)
{
ucc_rank_t n_extra = 0, min_val = team_size;
ucc_kn_radix_t min_i = UCC_KN_MIN_RADIX;
ucc_kn_radix_t max_r = ucc_max(max_radix, UCC_KN_MIN_RADIX);
ucc_kn_radix_t r, fs;

for (r = UCC_KN_MIN_RADIX; r <= max_r; r++) {
fs = r;
while (fs < team_size) {
fs = fs * r;
}
fs = (fs == team_size) ? fs : fs / r;
n_extra = team_size - (team_size / fs) * fs;
if (n_extra == 0) {
return r;
}
if (n_extra < min_val) {
min_val = n_extra;
min_i = r;
}
}
return min_i;
}

/* A set of convenience macros used to implement sw based progress
of the algorithms that use kn pattern */
enum {
Expand Down
6 changes: 3 additions & 3 deletions src/components/tl/ucp/allreduce/allreduce_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *coll_task)
task->allreduce_kn.phase = UCC_KN_PHASE_INIT;
ucc_assert(UCC_IS_INPLACE(TASK_ARGS(task)) ||
(TASK_ARGS(task).src.info.mem_type == mem_type));
cfg_radix = ucc_tl_ucp_get_radix_from_range(team, data_size,
mem_type, p);
cfg_radix = ucc_tl_ucp_get_radix_from_range(team, data_size, mem_type, p,
UCC_UUNITS_AUTO_RADIX);
ucc_knomial_pattern_init(size, rank, ucc_min(cfg_radix, size),
&task->allreduce_kn.p);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
Expand Down Expand Up @@ -228,7 +228,7 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task)
task->super.progress = ucc_tl_ucp_allreduce_knomial_progress;
task->super.finalize = ucc_tl_ucp_allreduce_knomial_finalize;
cfg_radix = ucc_tl_ucp_get_radix_from_range(team, data_size,
mem_type, p);
mem_type, p, UCC_UUNITS_AUTO_RADIX);
radix = ucc_min(cfg_radix, size);
status = ucc_mc_alloc(&task->allreduce_kn.scratch_mc_header,
(radix - 1) * data_size,
Expand Down
4 changes: 2 additions & 2 deletions src/components/tl/ucp/allreduce/allreduce_sra_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ static ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_frag_init(

cfg_radix = ucc_tl_ucp_get_radix_from_range(tl_team,
count * ucc_dt_size(dtype),
mem_type, p);
mem_type, p,
tl_team->opt_radix);
radix = ucc_knomial_pattern_get_min_radix(cfg_radix,
UCC_TL_TEAM_SIZE(tl_team),
count);

/* 1st step of allreduce: knomial reduce_scatter */
UCC_CHECK_GOTO(
ucc_tl_ucp_reduce_scatter_knomial_init_r(&args, team, &task, radix),
Expand Down
17 changes: 12 additions & 5 deletions src/components/tl/ucp/bcast/bcast_sag_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ ucc_tl_ucp_bcast_sag_knomial_init(ucc_base_coll_args_t *coll_args,
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
size_t count = coll_args->args.src.info.count;
ucc_datatype_t dtype = coll_args->args.src.info.datatype;
ucc_memory_type_t mem_type = coll_args->args.src.info.mem_type;
ucc_base_coll_args_t args = *coll_args;
ucc_mrange_uint_t *p = &tl_team->cfg.bcast_sag_kn_radix;
ucc_schedule_t *schedule;
ucc_coll_task_t *task, *rs_task;
ucc_status_t status;
Expand All @@ -86,11 +89,15 @@ ucc_tl_ucp_bcast_sag_knomial_init(ucc_base_coll_args_t *coll_args,
return ucc_tl_ucp_bcast_knomial_init(coll_args, team, task_h);
}

cfg_radix = UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.bcast_sag_kn_radix;
radix = ucc_knomial_pattern_get_min_radix(cfg_radix,
UCC_TL_TEAM_SIZE(tl_team), count);
status = ucc_tl_ucp_get_schedule(tl_team, coll_args,
(ucc_tl_ucp_schedule_t **)&schedule);
cfg_radix = ucc_tl_ucp_get_radix_from_range(tl_team,
count * ucc_dt_size(dtype),
mem_type, p,
tl_team->opt_radix);
radix = ucc_knomial_pattern_get_min_radix(cfg_radix,
UCC_TL_TEAM_SIZE(tl_team),
count);
status = ucc_tl_ucp_get_schedule(tl_team, coll_args,
(ucc_tl_ucp_schedule_t **)&schedule);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
Expand Down
4 changes: 2 additions & 2 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
ucc_offsetof(ucc_tl_ucp_lib_config_t, bcast_kn_radix),
UCC_CONFIG_TYPE_UINT},

{"BCAST_SAG_KN_RADIX", "4",
{"BCAST_SAG_KN_RADIX", "auto",
"Radix of the scatter-allgather (SAG) knomial bcast algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, bcast_sag_kn_radix),
UCC_CONFIG_TYPE_UINT},
UCC_CONFIG_TYPE_UINT_RANGED},

{"REDUCE_KN_RADIX", "4", "Radix of the knomial tree reduce algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_kn_radix),
Expand Down
4 changes: 3 additions & 1 deletion src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "schedule/ucc_schedule_pipelined.h"
#include <ucp/api/ucp.h>
#include <ucs/memory/memory_type.h>
#include "core/ucc_service_coll.h"

#ifndef UCC_TL_UCP_DEFAULT_SCORE
#define UCC_TL_UCP_DEFAULT_SCORE 10
Expand Down Expand Up @@ -52,7 +53,7 @@ typedef struct ucc_tl_ucp_lib_config {
uint32_t reduce_scatter_kn_radix;
uint32_t allgather_kn_radix;
uint32_t bcast_kn_radix;
uint32_t bcast_sag_kn_radix;
ucc_mrange_uint_t bcast_sag_kn_radix;
uint32_t reduce_kn_radix;
uint32_t gather_kn_radix;
uint32_t gatherv_linear_num_posts;
Expand Down Expand Up @@ -141,6 +142,7 @@ typedef struct ucc_tl_ucp_team {
const char * tuning_str;
ucc_topo_t *topo;
ucc_ep_map_t ctx_map;
ucc_rank_t opt_radix;
} ucc_tl_ucp_team_t;
UCC_CLASS_DECLARE(ucc_tl_ucp_team_t, ucc_base_context_t *,
const ucc_base_team_params_t *);
Expand Down
6 changes: 3 additions & 3 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,15 +409,15 @@ static inline unsigned
ucc_tl_ucp_get_radix_from_range(ucc_tl_ucp_team_t *team,
size_t msgsize,
ucc_memory_type_t mem_type,
ucc_mrange_uint_t *p)
ucc_mrange_uint_t *p,
ucc_rank_t default_value)
{
unsigned radix;

radix = ucc_mrange_uint_get(p, msgsize, mem_type);

if (UCC_UUNITS_AUTO == radix) {
/* auto selection based on team configuration */
return UCC_UUNITS_AUTO_RADIX;
return default_value;
}
return radix;
}
Expand Down
1 change: 0 additions & 1 deletion src/components/tl/ucp/tl_ucp_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *params,
self->cfg.reduce_scatter_kn_radix = tl_ucp_config->kn_radix;
self->cfg.allgather_kn_radix = tl_ucp_config->kn_radix;
self->cfg.bcast_kn_radix = tl_ucp_config->kn_radix;
self->cfg.bcast_sag_kn_radix = tl_ucp_config->kn_radix;
self->cfg.reduce_kn_radix = tl_ucp_config->kn_radix;
self->cfg.scatter_kn_radix = tl_ucp_config->kn_radix;
self->cfg.gather_kn_radix = tl_ucp_config->kn_radix;
Expand Down
20 changes: 17 additions & 3 deletions src/components/tl/ucp/tl_ucp_team.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_team_t, ucc_base_context_t *tl_context,
{
ucc_tl_ucp_context_t *ctx =
ucc_derived_of(tl_context, ucc_tl_ucp_context_t);
ucc_status_t status;
ucc_kn_radix_t max_radix;
ucc_status_t status;

UCC_CLASS_CALL_SUPER_INIT(ucc_tl_team_t, &ctx->super, params);
/* TODO: init based on ctx settings and on params: need to check
Expand All @@ -56,12 +57,14 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_team_t, ucc_base_context_t *tl_context,
self->status = UCC_INPROGRESS;
self->tuning_str = "";
self->topo = NULL;
self->opt_radix = UCC_UUNITS_AUTO_RADIX;

status = ucc_config_clone_table(&UCC_TL_UCP_TEAM_LIB(self)->cfg, &self->cfg,
ucc_tl_ucp_lib_config_table);
if (UCC_OK != status) {
return status;
}

if (ctx->topo_required) {
status = ucc_tl_ucp_get_topo(self);
if (UCC_OK != status) {
Expand All @@ -86,6 +89,15 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_team_t, ucc_base_context_t *tl_context,
"topo is not available, disabling ranks reordering");
self->cfg.use_reordering = 0;
}

if (self->topo && !IS_SERVICE_TEAM(self) && self->topo->topo->sock_bound) {
max_radix = ucc_min(UCC_TL_TEAM_SIZE(self),
ucc_topo_min_socket_size(self->topo));

self->opt_radix = ucc_kn_get_opt_radix(UCC_TL_TEAM_SIZE(self),
max_radix);
}

tl_debug(tl_context->lib, "posted tl team: %p", self);
return UCC_OK;
}
Expand All @@ -103,7 +115,7 @@ ucc_status_t ucc_tl_ucp_team_destroy(ucc_base_team_t *tl_team)
{
ucc_tl_ucp_team_t *team = ucc_derived_of(tl_team, ucc_tl_ucp_team_t);

if (UCC_TL_UCP_TEAM_CTX(team)->topo_required) {
if (team->topo) {
ucc_ep_map_destroy_nested(&team->ctx_map);
ucc_topo_cleanup(team->topo);
}
Expand Down Expand Up @@ -156,6 +168,7 @@ ucc_status_t ucc_tl_ucp_team_create_test(ucc_base_team_t *tl_team)
{
ucc_tl_ucp_team_t * team = ucc_derived_of(tl_team, ucc_tl_ucp_team_t);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
int i;
ucc_status_t status;

if (USE_SERVICE_WORKER(team)) {
Expand All @@ -167,6 +180,7 @@ ucc_status_t ucc_tl_ucp_team_create_test(ucc_base_team_t *tl_team)
if (team->status == UCC_OK) {
return UCC_OK;
}

if (UCC_TL_TEAM_SIZE(team) <= ctx->cfg.preconnect) {
status = ucc_tl_ucp_team_preconnect(team);
if (UCC_INPROGRESS == status) {
Expand All @@ -177,7 +191,7 @@ ucc_status_t ucc_tl_ucp_team_create_test(ucc_base_team_t *tl_team)
}

if (ctx->remote_info) {
for (int i = 0; i < ctx->n_rinfo_segs; i++) {
for (i = 0; i < ctx->n_rinfo_segs; i++) {
team->va_base[i] = ctx->remote_info[i].va_base;
team->base_length[i] = ctx->remote_info[i].len;
}
Expand Down
72 changes: 63 additions & 9 deletions src/components/topo/ucc_sbgp.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,24 @@ static inline ucc_status_t sbgp_create_node(ucc_topo_t *topo, ucc_sbgp_t *sbgp)
static ucc_status_t sbgp_create_node_leaders(ucc_topo_t *topo, ucc_sbgp_t *sbgp,
int ctx_nlr)
{
ucc_subset_t *set = &topo->set;
ucc_rank_t comm_size = ucc_subset_size(set);
ucc_rank_t comm_rank = set->myrank;
int i_am_node_leader = 0;
ucc_rank_t nnodes = topo->topo->nnodes;
ucc_rank_t n_node_leaders;
ucc_subset_t *set = &topo->set;
ucc_rank_t comm_size = ucc_subset_size(set);
ucc_rank_t comm_rank = set->myrank;
ucc_rank_t min_sbgp_size = UCC_RANK_MAX;
ucc_rank_t max_sbgp_size = 0;
ucc_rank_t max_ctx_sbgp_size = 0;
ucc_rank_t *nl_array_3 = NULL;
int i_am_node_leader = 0;
int socket_bound = topo->topo->sock_bound;
int numa_bound = topo->topo->numa_bound;
int bound = socket_bound || numa_bound;
ucc_rank_t nnodes = topo->topo->nnodes;
ucc_rank_t n_node_leaders, ctx_rank, i;
ucc_rank_t *nl_array_1, *nl_array_2;
int i;
ucc_host_id_t host_id;
uint8_t sbgp_id;

ucc_assert(comm_size != 0 && nnodes != 0);

if (topo->min_ppn != UCC_RANK_MAX && ctx_nlr >= topo->min_ppn) {
sbgp->status = UCC_SBGP_NOT_EXISTS;
Expand All @@ -203,19 +213,42 @@ static ucc_status_t sbgp_create_node_leaders(ucc_topo_t *topo, ucc_sbgp_t *sbgp,
return UCC_ERR_NO_MEMORY;
}

if (bound) {
max_ctx_sbgp_size = socket_bound ? topo->topo->max_n_sockets :
topo->topo->max_n_numas;
nl_array_3 = ucc_malloc(max_ctx_sbgp_size * nnodes *
sizeof(ucc_rank_t), "nl_array_3");
if (!nl_array_3) {
ucc_error("failed to allocate %zd bytes for nl_array_3",
max_ctx_sbgp_size * nnodes * sizeof(ucc_rank_t));
ucc_free(nl_array_1);
ucc_free(nl_array_2);
return UCC_ERR_NO_MEMORY;
}

memset(nl_array_3, 0, max_ctx_sbgp_size * nnodes * sizeof(ucc_rank_t));
}

for (i = 0; i < nnodes; i++) {
nl_array_1[i] = 0;
nl_array_2[i] = UCC_RANK_MAX;
}

for (i = 0; i < comm_size; i++) {
ucc_rank_t ctx_rank = ucc_ep_map_eval(set->map, i);
ucc_host_id_t host_id = topo->topo->procs[ctx_rank].host_id;
ctx_rank = ucc_ep_map_eval(set->map, i);
host_id = topo->topo->procs[ctx_rank].host_id;
if (bound) {
sbgp_id = socket_bound ? topo->topo->procs[ctx_rank].socket_id :
topo->topo->procs[ctx_rank].numa_id;
nl_array_3[sbgp_id + host_id * max_ctx_sbgp_size]++;
}

if (nl_array_1[host_id] == 0 || nl_array_1[host_id] == ctx_nlr) {
nl_array_2[host_id] = i;
}
nl_array_1[host_id]++;
}

for (i = 0; i < nnodes; i++) {
if (nl_array_1[i] > topo->max_ppn) {
topo->max_ppn = nl_array_1[i];
Expand All @@ -225,6 +258,24 @@ static ucc_status_t sbgp_create_node_leaders(ucc_topo_t *topo, ucc_sbgp_t *sbgp,
}
}

if (bound) {
for (i = 0; i < nnodes * max_ctx_sbgp_size; i++) {
if (nl_array_3[i] == 0) {
continue;
}
min_sbgp_size = ucc_min(min_sbgp_size, nl_array_3[i]);
max_sbgp_size = ucc_max(max_sbgp_size, nl_array_3[i]);
}
if (socket_bound) {
topo->min_socket_size = min_sbgp_size;
topo->max_socket_size = max_sbgp_size;
} else {
topo->min_numa_size = min_sbgp_size;
topo->max_numa_size = max_sbgp_size;
}
ucc_free(nl_array_3);
}

n_node_leaders = 0;
if (ctx_nlr >= topo->min_ppn) {
/* at least one node has less number of local ranks than
Expand Down Expand Up @@ -504,6 +555,9 @@ ucc_status_t ucc_sbgp_create(ucc_topo_t *topo, ucc_sbgp_type_t type)
sbgp->map = ucc_ep_map_from_array(&sbgp->rank_map, sbgp->group_size,
ucc_subset_size(&topo->set), 1);
}
if (sbgp->rank_map && sbgp->status == UCC_SBGP_NOT_EXISTS) {
ucc_free(sbgp->rank_map);
}
return status;
}

Expand Down
2 changes: 1 addition & 1 deletion src/components/topo/ucc_sbgp.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef enum ucc_sbgp_type_t
but EXISTS for procs with local_socket_rank != 0 */
UCC_SBGP_NUMA_LEADERS, /* Same as SOCKET_LEADERS but for NUMA grouping */
UCC_SBGP_FULL, /* Group contains ALL the ranks of the team */
UCC_SBGP_FULL_HOST_ORDERED, /* Group contains All the ranks of the ream ordered
UCC_SBGP_FULL_HOST_ORDERED, /* Group contains ALL the ranks of the team ordered
by host, socket, numa */
UCC_SBGP_LAST
} ucc_sbgp_type_t;
Expand Down
4 changes: 4 additions & 0 deletions src/components/topo/ucc_topo.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ ucc_status_t ucc_topo_init(ucc_subset_t set, ucc_context_topo_t *ctx_topo,
topo->set = set;
topo->min_ppn = UCC_RANK_MAX;
topo->max_ppn = 0;
topo->min_socket_size = UCC_RANK_MAX;
topo->max_socket_size = 0;
topo->min_numa_size = UCC_RANK_MAX;
topo->max_numa_size = 0;
topo->all_sockets = NULL;
topo->all_numas = NULL;

Expand Down
Loading

0 comments on commit 1280664

Please sign in to comment.