Skip to content

Commit

Permalink
TL/MLX5: a2a part 1 -- coll init
Browse files Browse the repository at this point in the history
  • Loading branch information
samnordmann committed Jun 5, 2023
1 parent 447c786 commit f1881da
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 14 deletions.
11 changes: 9 additions & 2 deletions src/components/tl/mlx5/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

if TL_MLX5_ENABLED

alltoall = \
alltoall/alltoall.h \
alltoall/alltoall.c \
alltoall/alltoall_mkeys.h \
alltoall/alltoall_mkeys.c

sources = \
tl_mlx5.h \
tl_mlx5.c \
Expand All @@ -18,8 +24,9 @@ sources = \
tl_mlx5_pd.h \
tl_mlx5_pd.c \
tl_mlx5_rcache.c \
tl_mlx5_dm.c \
tl_mlx5_dm.h
tl_mlx5_dm.h \
tl_mlx5_dm.c \
$(alltoall)

module_LTLIBRARIES = libucc_tl_mlx5.la
libucc_tl_mlx5_la_SOURCES = $(sources)
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/mlx5/tl_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ static ucc_config_field_t ucc_tl_mlx5_lib_config_table[] = {
{"", "", NULL, ucc_offsetof(ucc_tl_mlx5_lib_config_t, super),
UCC_CONFIG_TYPE_TABLE(ucc_tl_lib_config_table)},

{"ASR_BARRIER", "0", "Boolean - use service barrier or p2p sync of ASRs",
ucc_offsetof(ucc_tl_mlx5_lib_config_t, asr_barrier), UCC_CONFIG_TYPE_UINT},

{"DM_BUF_SIZE", "8k", "Size of the pre-allocated DeviceMemory buffer",
ucc_offsetof(ucc_tl_mlx5_lib_config_t, dm_buf_size),
UCC_CONFIG_TYPE_MEMUNITS},
Expand Down
21 changes: 20 additions & 1 deletion src/components/tl/mlx5/tl_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#define UCC_TL_MLX5_PROFILE_REQUEST_EVENT UCC_PROFILE_REQUEST_EVENT
#define UCC_TL_MLX5_PROFILE_REQUEST_FREE UCC_PROFILE_REQUEST_FREE

#define ATOMIC_IN_MEMIC 1
#define DC_KEY 1

typedef struct ucc_tl_mlx5_iface {
Expand All @@ -49,11 +50,12 @@ typedef struct ucc_tl_mlx5_ib_qp_conf {

typedef struct ucc_tl_mlx5_lib_config {
ucc_tl_lib_config_t super;
int asr_barrier;
int block_size;
int num_dci_qps;
int dc_threshold;
size_t dm_buf_size;
unsigned long dm_buf_num;
size_t dm_buf_num;
int dm_host;
ucc_tl_mlx5_ib_qp_conf_t qp_conf;
} ucc_tl_mlx5_lib_config_t;
Expand Down Expand Up @@ -84,6 +86,7 @@ typedef struct ucc_tl_mlx5_context {
UCC_CLASS_DECLARE(ucc_tl_mlx5_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);

typedef struct ucc_tl_mlx5_task ucc_tl_mlx5_task_t;
typedef struct ucc_tl_mlx5_schedule ucc_tl_mlx5_schedule_t;
typedef struct ucc_tl_mlx5_dm_chunk {
ptrdiff_t offset; /* 0 based offset from the beginning of
Expand All @@ -97,6 +100,8 @@ typedef enum
{
TL_MLX5_TEAM_STATE_INIT,
TL_MLX5_TEAM_STATE_POSTED,
TL_MLX5_TEAM_STATE_A2A_INIT,
TL_MLX5_TEAM_STATE_A2A_POSTED
} ucc_tl_mlx5_team_state_t;

typedef struct ucc_tl_mlx5_team {
Expand Down Expand Up @@ -145,4 +150,18 @@ typedef struct ucc_tl_mlx5_rcache_region {
#define IS_SERVICE_TEAM(_team) \
((_team)->super.super.params.scope == UCC_CL_LAST + 1)

#define SQUARED(_num) ((_num) * (_num))

ucc_status_t tl_mlx5_create_rcache(ucc_tl_mlx5_context_t *ctx);

ucc_status_t ucc_tl_mlx5_asr_socket_init(ucc_tl_mlx5_context_t *ctx,
ucc_rank_t group_size, int *socket,
const char *sock_path);

ucc_status_t ucc_tl_mlx5_dm_alloc_reg(struct ibv_context *ib_ctx,
struct ibv_pd *pd, int dm_host,
size_t buf_size, size_t *buf_num_p,
struct ibv_dm **ptr, struct ibv_mr **mr,
ucc_base_lib_t *lib);

#endif
34 changes: 28 additions & 6 deletions src/components/tl/mlx5/tl_mlx5_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "tl_mlx5.h"
#include "schedule/ucc_schedule.h"
#include "alltoall/alltoall.h"

typedef struct ucc_tl_mlx5_task {
ucc_coll_task_t super;
Expand All @@ -18,6 +19,21 @@ typedef struct ucc_tl_mlx5_schedule {
ucc_schedule_t super;
union {
struct {
int seq_num;
int seq_index;
int num_of_blocks_columns;
int block_size;
int started;
int send_blocks_enqueued;
int blocks_sent;
int blocks_completed;
ucc_tl_mlx5_a2a_op_t *op;
ucc_tl_mlx5_rcache_region_t *send_rcache_region_p;
ucc_tl_mlx5_rcache_region_t *recv_rcache_region_p;
size_t msg_size;
ucc_service_coll_req_t *barrier_req;
int barrier_scratch[2];
int wait_wc;
} alltoall;
};
} ucc_tl_mlx5_schedule_t;
Expand Down Expand Up @@ -54,16 +70,22 @@ static inline void ucc_tl_mlx5_put_task(ucc_tl_mlx5_task_t *task)
ucc_mpool_put(task);
}

static inline ucc_tl_mlx5_schedule_t *
ucc_tl_mlx5_get_schedule(ucc_tl_mlx5_team_t * team,
ucc_base_coll_args_t *coll_args)
static inline ucc_status_t
ucc_tl_mlx5_get_schedule(ucc_tl_mlx5_team_t * team,
ucc_base_coll_args_t * coll_args,
ucc_tl_mlx5_schedule_t **schedule)
{
ucc_tl_mlx5_context_t * ctx = UCC_TL_MLX5_TEAM_CTX(team);
ucc_tl_mlx5_schedule_t *schedule = ucc_mpool_get(&ctx->req_mp);

*schedule = ucc_mpool_get(&ctx->req_mp);

if (ucc_unlikely(!(*schedule))) {
return UCC_ERR_NO_MEMORY;
}
UCC_TL_MLX5_PROFILE_REQUEST_NEW(schedule, "tl_mlx5_sched", 0);
ucc_schedule_init(&schedule->super, coll_args, &team->super.super);
return schedule;

return ucc_schedule_init(&((*schedule)->super), coll_args,
&team->super.super);
}

static inline void ucc_tl_mlx5_put_schedule(ucc_tl_mlx5_schedule_t *schedule)
Expand Down
2 changes: 2 additions & 0 deletions src/components/tl/mlx5/tl_mlx5_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#include "tl_mlx5.h"
#include "utils/ucc_math.h"
#include "schedule/ucc_schedule.h"
#include "tl_mlx5_ib.h"
#include <limits.h>
#include "tl_mlx5_coll.h"
#include "tl_mlx5_pd.h"
#include "utils/arch/cpu.h"
#include "tl_mlx5_pd.h"
#include "tl_mlx5_ib.h"
Expand Down
2 changes: 0 additions & 2 deletions src/components/tl/mlx5/tl_mlx5_dm.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ void ucc_tl_mlx5_dm_cleanup(ucc_tl_mlx5_team_t *team)
return;
}

ucc_mpool_cleanup(&team->dm_pool, 1);

ibv_dereg_mr(team->dm_mr);
if (UCC_TL_MLX5_TEAM_LIB(team)->cfg.dm_host) {
ucc_free(team->dm_ptr);
Expand Down
1 change: 0 additions & 1 deletion src/components/tl/mlx5/tl_mlx5_ib.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#include "tl_mlx5_ib.h"

#define DC_KEY 1
#define UCC_QP_PKEY_INDEX 0
#define UCC_QP_PSN 0x123
#define UCC_QP_ACCESS_FLAGS (IBV_ACCESS_LOCAL_WRITE | \
Expand Down
18 changes: 16 additions & 2 deletions src/components/tl/mlx5/tl_mlx5_team.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
#include "tl_mlx5.h"
#include "tl_mlx5_dm.h"
#include "coll_score/ucc_coll_score.h"
#include "alltoall/alltoall.h"
#include "core/ucc_team.h"
#include <sys/shm.h>

static ucc_status_t ucc_tl_mlx5_topo_init(ucc_tl_mlx5_team_t *team)
{
ucc_subset_t subset;
ucc_status_t status;

status = ucc_ep_map_create_nested(&UCC_TL_CORE_TEAM(team)->ctx_map,
&UCC_TL_TEAM_MAP(team), &team->ctx_map);
if (UCC_OK != status) {
Expand Down Expand Up @@ -79,6 +80,7 @@ UCC_CLASS_CLEANUP_FUNC(ucc_tl_mlx5_team_t)
{
tl_debug(self->super.super.context->lib, "finalizing tl team: %p", self);

ucc_tl_mlx5_a2a_cleanup(self);
ucc_tl_mlx5_dm_cleanup(self);
ucc_tl_mlx5_topo_cleanup(self);
}
Expand Down Expand Up @@ -132,9 +134,21 @@ ucc_status_t ucc_tl_mlx5_team_create_test(ucc_base_team_t *team)
ucc_tl_mlx5_team_destroy(team);
return tl_team->status[1];
}
tl_team->state = TL_MLX5_TEAM_STATE_A2A_INIT;
case TL_MLX5_TEAM_STATE_A2A_INIT:
status = ucc_tl_mlx5_a2a_init_start(tl_team);
if (status != UCC_OK) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to init a2a: %s",
ucc_status_string(status));
return status;
}
tl_team->state = TL_MLX5_TEAM_STATE_A2A_POSTED;
case TL_MLX5_TEAM_STATE_A2A_POSTED:
status = ucc_tl_mlx5_a2a_init_progress(tl_team);
}

return UCC_OK;
return status;
}

ucc_status_t ucc_tl_mlx5_team_get_scores(ucc_base_team_t * tl_team,
Expand Down
1 change: 1 addition & 0 deletions src/core/ucc_service_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
typedef struct ucc_service_coll_req {
ucc_coll_task_t *task;
ucc_team_t *team;
void * data;
ucc_subset_t subset;
} ucc_service_coll_req_t;

Expand Down

0 comments on commit f1881da

Please sign in to comment.