Skip to content

Commit

Permalink
TL/MLX5: a2a part2 -- full collective (openucx#802)
Browse files Browse the repository at this point in the history
* TL/MLX5: a2a part 2 -- full coll

* REVIEW: minor comments

* REVIEW: minor comments

* REVIEW: fix linter and align

* TL/MLX5: minor changes

* TL/MLX5: handle msgsize=0

* TL/MLX5: fix ib_ctx cleanup

* TL/MLX5: fix team's dm alloc status

* TEST: minor change in gtest

* CODESTYLE: clang format

* TL/MLX5: fix multiple outstanding ops

* TL/MLX5: cleanup

* CODESTYLE: clang-format

* TL/MLX5: fix msgsize computation
  • Loading branch information
samnordmann authored and janjust committed Jan 31, 2024
1 parent 2d20efb commit c11b02e
Show file tree
Hide file tree
Showing 12 changed files with 1,284 additions and 118 deletions.
12 changes: 7 additions & 5 deletions src/components/tl/mlx5/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

if TL_MLX5_ENABLED

alltoall = \
alltoall/alltoall.h \
alltoall/alltoall.c \
alltoall/alltoall_mkeys.h \
alltoall/alltoall_mkeys.c
alltoall = \
alltoall/alltoall.h \
alltoall/alltoall.c \
alltoall/alltoall_mkeys.h \
alltoall/alltoall_mkeys.c \
alltoall/alltoall_inline.h \
alltoall/alltoall_coll.c

mcast = \
mcast/tl_mlx5_mcast_context.c \
Expand Down
27 changes: 15 additions & 12 deletions src/components/tl/mlx5/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static ucc_status_t build_rank_map(ucc_tl_mlx5_alltoall_t *a2a,
return UCC_OK;
}

ucc_status_t ucc_tl_mlx5_alltoall_init_start(ucc_tl_mlx5_team_t *team)
ucc_status_t ucc_tl_mlx5_team_alltoall_init_start(ucc_tl_mlx5_team_t *team)
{
ucc_tl_mlx5_context_t *ctx = UCC_TL_MLX5_TEAM_CTX(team);
ucc_tl_mlx5_alltoall_t *a2a = NULL;
Expand Down Expand Up @@ -204,7 +204,7 @@ static ucc_status_t ucc_tl_mlx5_alltoall_atomic_alloc(ucc_tl_mlx5_team_t *team)
#endif

if (!a2a->net.atomic.counters) {
tl_error(UCC_TL_TEAM_LIB(team),
tl_debug(UCC_TL_TEAM_LIB(team),
"failed to allocate %zd bytes for atomic counters array",
size);
return UCC_ERR_NO_MEMORY;
Expand Down Expand Up @@ -270,7 +270,8 @@ static ucc_status_t ucc_tl_mlx5_alltoall_barrier_alloc(ucc_tl_mlx5_team_t *team)
return UCC_OK;
}

ucc_status_t ucc_tl_mlx5_alltoall_init_progress(ucc_tl_mlx5_team_t *tl_team)
ucc_status_t
ucc_tl_mlx5_team_alltoall_init_progress(ucc_tl_mlx5_team_t *tl_team)
{
ucc_tl_mlx5_team_t *team = ucc_derived_of(tl_team,
ucc_tl_mlx5_team_t);
Expand Down Expand Up @@ -358,6 +359,7 @@ ucc_status_t ucc_tl_mlx5_alltoall_init_progress(ucc_tl_mlx5_team_t *tl_team)
}

for (i = 0; i < MAX_OUTSTANDING_OPS; i++) {
op = &a2a->node.ops[i];
op->blocks_sent = PTR_OFFSET(a2a->net.blocks_sent,
sizeof(*a2a->net.blocks_sent) *
a2a->net.net_size * i);
Expand Down Expand Up @@ -597,14 +599,15 @@ ucc_status_t ucc_tl_mlx5_alltoall_init_progress(ucc_tl_mlx5_team_t *tl_team)
a2a->net.rkeys[i] = remote_data->recv_mkey_rkey;
}

a2a->scratch_bf_mr =
ibv_reg_mr(ctx->shared_pd, (void *)&a2a->dummy_atomic_buff,
sizeof(a2a->dummy_atomic_buff),
a2a->atomic_scratch_bf_mr =
ibv_reg_mr(ctx->shared_pd, (void *)&a2a->atomic_scratch_bf,
sizeof(a2a->atomic_scratch_bf),
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!a2a->scratch_bf_mr) {
tl_error(lib, "failed to register dummy buff (errno=%d)", errno);
if (!a2a->atomic_scratch_bf_mr) {
tl_error(lib, "failed to register atomic scratch buff (errno=%d)",
errno);
status = UCC_ERR_NO_MESSAGE;
goto err_scratch_bf_mr;
goto err_atomic_atomic_scratch_bf_mr;
}

/* allocate buffer for noninline UMR registration, has to be 2KB aligned */
Expand All @@ -631,8 +634,8 @@ ucc_status_t ucc_tl_mlx5_alltoall_init_progress(ucc_tl_mlx5_team_t *tl_team)
return UCC_OK;

err_umr_entries_mr:
ibv_dereg_mr(a2a->scratch_bf_mr);
err_scratch_bf_mr:
ibv_dereg_mr(a2a->atomic_scratch_bf_mr);
err_atomic_atomic_scratch_bf_mr:
if (a2a->is_dc) {
err_create_ah:
for (j = 0; j < i ; j++) {
Expand Down Expand Up @@ -743,7 +746,7 @@ void ucc_tl_mlx5_alltoall_cleanup(ucc_tl_mlx5_team_t *team)
tl_error(lib, "failed to destroy Mkeys");
}
ucc_free(a2a->net.rkeys);
ibv_dereg_mr(a2a->scratch_bf_mr);
ibv_dereg_mr(a2a->atomic_scratch_bf_mr);
ucc_free(a2a->net.rank_map);
ibv_dereg_mr(a2a->node.umr_entries_mr);
ucc_free(a2a->node.umr_entries_buf);
Expand Down
13 changes: 7 additions & 6 deletions src/components/tl/mlx5/alltoall/alltoall.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,29 @@ typedef struct ucc_tl_mlx5_alltoall {
struct ibv_context *ctx;
int ib_port;
int state;
int transpose;
uint64_t max_msg_size;
ucc_tl_mlx5_alltoall_node_t node;
ucc_tl_mlx5_alltoall_net_t net;
void *service_bcast_tmp_buf;
int sequence_number;
int op_busy[MAX_OUTSTANDING_OPS];
int num_dci_qps;
uint8_t is_dc;
int previous_msg_size[MAX_OUTSTANDING_OPS];
void *previous_send_address[MAX_OUTSTANDING_OPS];
void *previous_recv_address[MAX_OUTSTANDING_OPS];
uint64_t dummy_atomic_buff;
uint64_t atomic_scratch_bf;
int requested_block_size;
int max_num_of_columns;
struct ibv_mr *scratch_bf_mr;
struct ibv_mr *atomic_scratch_bf_mr;
ucc_rank_t node_size;
ucc_tl_mlx5_a2a_bcast_data_t bcast_data;
} ucc_tl_mlx5_alltoall_t;

ucc_status_t ucc_tl_mlx5_alltoall_init_start(ucc_tl_mlx5_team_t *team);
ucc_status_t ucc_tl_mlx5_alltoall_init_progress(ucc_tl_mlx5_team_t *team);
ucc_status_t ucc_tl_mlx5_team_alltoall_init_start(ucc_tl_mlx5_team_t *team);
ucc_status_t ucc_tl_mlx5_team_alltoall_init_progress(ucc_tl_mlx5_team_t *team);
ucc_status_t ucc_tl_mlx5_alltoall_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);
void ucc_tl_mlx5_alltoall_cleanup(ucc_tl_mlx5_team_t *team);

static inline ucc_tl_mlx5_alltoall_ctrl_t*
Expand Down
Loading

0 comments on commit c11b02e

Please sign in to comment.