From b5c71d144d3ba75c947ec4a5ffab5132146e49fe Mon Sep 17 00:00:00 2001 From: Sergey Lebedev Date: Wed, 15 Mar 2023 06:21:49 +0000 Subject: [PATCH] TL/CUDA: fix linear algorithms --- src/components/ec/base/ucc_ec_base.h | 2 +- src/components/ec/cuda/ec_cuda.c | 43 +++++- src/components/ec/cuda/ec_cuda.h | 13 +- .../ec/cuda/ec_cuda_executor_interruptible.c | 56 +++++++- .../tl/cuda/allgather/allgather_linear.c | 3 +- .../tl/cuda/allgatherv/allgatherv_linear.c | 135 +++++++++--------- .../reduce_scatter/reduce_scatter_linear.c | 3 +- .../reduce_scatterv/reduce_scatterv_linear.c | 31 ++-- src/components/tl/cuda/tl_cuda.h | 4 +- src/components/tl/cuda/tl_cuda_team_topo.c | 1 + src/components/tl/cuda/tl_cuda_team_topo.h | 17 +-- tools/perf/ucc_pt_benchmark.cc | 2 +- tools/perf/ucc_pt_coll.h | 3 +- tools/perf/ucc_pt_config.cc | 2 +- tools/perf/ucc_pt_config.h | 2 + tools/perf/ucc_pt_op_memcpy.cc | 48 ++++++- tools/perf/ucc_pt_op_reduce.cc | 4 + tools/perf/ucc_pt_op_reduce_strided.cc | 4 + 18 files changed, 258 insertions(+), 115 deletions(-) diff --git a/src/components/ec/base/ucc_ec_base.h b/src/components/ec/base/ucc_ec_base.h index db57f1da85..5e91c57865 100644 --- a/src/components/ec/base/ucc_ec_base.h +++ b/src/components/ec/base/ucc_ec_base.h @@ -79,7 +79,7 @@ typedef struct ucc_ee_executor_params { /* Maximum number of buffers for UCC_EE_EXECUTOR_TASK_REDUCE_MULTI_DST and UCC_EE_EXECUTOR_TASK_COPY_MULTI operations */ -#define UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS 6 +#define UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS 7 /* Reduces "n_srcs" buffers (each contains "count" elements of type "dt") into "dst" buffer. diff --git a/src/components/ec/cuda/ec_cuda.c b/src/components/ec/cuda/ec_cuda.c index b8fd0b1bfe..9b149e4f41 100644 --- a/src/components/ec/cuda/ec_cuda.c +++ b/src/components/ec/cuda/ec_cuda.c @@ -50,6 +50,11 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = { ucc_offsetof(ucc_ec_cuda_config_t, exec_num_streams), UCC_CONFIG_TYPE_ULUNITS}, + {"EXEC_COPY_LARGE_THRESH", "1M", + "Memcopy size to switch from kernel copy to cudaMemcpy", + ucc_offsetof(ucc_ec_cuda_config_t, exec_copy_thresh), + UCC_CONFIG_TYPE_MEMUNITS}, + {"REDUCE_NUM_BLOCKS", "auto", "Number of thread blocks to use for reduction in interruptible mode", ucc_offsetof(ucc_ec_cuda_config_t, reduce_num_blocks), @@ -146,6 +151,40 @@ static ucc_mpool_ops_t ucc_ec_cuda_event_mpool_ops = { .obj_cleanup = ucc_ec_cuda_event_cleanup, }; +static void ucc_ec_cuda_graph_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused +{ + ucc_ec_cuda_executor_interruptible_task_t *task = + (ucc_ec_cuda_executor_interruptible_task_t *) obj; + cudaGraphNode_t memcpy_node; + int i; + + CUDA_FUNC(cudaGraphCreate(&task->graph, 0)); + for (i = 0; i < UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; i++) { + CUDA_FUNC( + cudaGraphAddMemcpyNode1D(&memcpy_node, task->graph, NULL, 0, + (void*)1, (void*)1, 1, cudaMemcpyDefault)); + } + CUDA_FUNC( + cudaGraphInstantiate(&task->graph_exec, task->graph, NULL, + NULL, 0)); +} + +static void ucc_ec_cuda_graph_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused +{ + ucc_ec_cuda_executor_interruptible_task_t *task = + (ucc_ec_cuda_executor_interruptible_task_t *) obj; + + CUDA_FUNC(cudaGraphExecDestroy(task->graph_exec)); + CUDA_FUNC(cudaGraphDestroy(task->graph)); +} + +static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = { + .chunk_alloc = ucc_mpool_hugetlb_malloc, + .chunk_release = ucc_mpool_hugetlb_free, + .obj_init = ucc_ec_cuda_graph_init, + .obj_cleanup = ucc_ec_cuda_graph_cleanup, +}; + static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock) { if (*nt != UCC_ULUNITS_AUTO) { @@ -243,8 +282,8 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) status = ucc_mpool_init( &ucc_ec_cuda.executor_interruptible_tasks, 0, sizeof(ucc_ec_cuda_executor_interruptible_task_t), 0, UCC_CACHE_LINE_SIZE, - 16, UINT_MAX, NULL, UCC_THREAD_MULTIPLE, - "interruptible executor tasks"); + 16, UINT_MAX, &ucc_ec_cuda_interruptible_task_mpool_ops, + UCC_THREAD_MULTIPLE, "interruptible executor tasks"); if (status != UCC_OK) { ec_error(&ucc_ec_cuda.super, "failed to create interruptible tasks pool"); return status; diff --git a/src/components/ec/cuda/ec_cuda.h b/src/components/ec/cuda/ec_cuda.h index 677d14b9a7..012ba65a15 100644 --- a/src/components/ec/cuda/ec_cuda.h +++ b/src/components/ec/cuda/ec_cuda.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -48,6 +48,7 @@ typedef struct ucc_ec_cuda_config { unsigned long reduce_num_blocks; int reduce_num_threads; int use_cooperative_launch; + unsigned long exec_copy_thresh; } ucc_ec_cuda_config_t; typedef struct ucc_ec_cuda { @@ -75,12 +76,14 @@ typedef struct ucc_ec_cuda_stream_request { cudaStream_t stream; } ucc_ec_cuda_stream_request_t; +#define MAX_SUBTASKS 12 typedef struct ucc_ec_cuda_executor_interruptible_task { ucc_ee_executor_task_t super; void *event; + cudaGraph_t graph; + cudaGraphExec_t graph_exec; } ucc_ec_cuda_executor_interruptible_task_t; -#define MAX_SUBTASKS 12 typedef struct ucc_ec_cuda_executor_persistent_task { ucc_ee_executor_task_t super; int num_subtasks; @@ -133,9 +136,9 @@ extern ucc_ec_cuda_t ucc_ec_cuda; ucc_ec_cuda.stream_initialized = 1; \ } \ ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); \ - if (ucc_unlikely(cudaSuccess != cuda_st)) { \ - return cuda_error_to_ucc_status(cuda_st); \ - } \ + if (ucc_unlikely(cudaSuccess != cuda_st)) { \ + return cuda_error_to_ucc_status(cuda_st); \ + } \ } \ } while(0) diff --git a/src/components/ec/cuda/ec_cuda_executor_interruptible.c b/src/components/ec/cuda/ec_cuda_executor_interruptible.c index e4a027f4d3..b3d2e68b68 100644 --- a/src/components/ec/cuda/ec_cuda_executor_interruptible.c +++ b/src/components/ec/cuda/ec_cuda_executor_interruptible.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -54,7 +54,11 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor, { cudaStream_t stream = NULL; ucc_ec_cuda_executor_interruptible_task_t *ee_task; - ucc_status_t status; + ucc_status_t status; + cudaGraphNode_t nodes[UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS]; + size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; + int i; + status = ucc_cuda_executor_interruptible_get_stream(&stream); if (ucc_unlikely(status != UCC_OK)) { @@ -85,10 +89,46 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor, } break; case UCC_EE_EXECUTOR_TASK_COPY_MULTI: - status = ucc_ec_cuda_copy_multi_kernel(task_args, stream); - if (ucc_unlikely(status != UCC_OK)) { - ec_error(&ucc_ec_cuda.super, "failed to start copy multi op"); - goto free_task; + if ((task_args->copy_multi.counts[0] > EC_CUDA_CONFIG->exec_copy_thresh) && + (task_args->copy_multi.num_vectors > 2)) { + cudaGraphGetNodes(ee_task->graph, nodes, &num_nodes); + for (i = 0; i < task_args->copy_multi.num_vectors; i++) { + status = CUDA_FUNC( + cudaGraphExecMemcpyNodeSetParams1D(ee_task->graph_exec, nodes[i], + task_args->copy_multi.dst[i], + task_args->copy_multi.src[i], + task_args->copy_multi.counts[i], + cudaMemcpyDefault)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to instantiate graph"); + goto free_task; + } + + } + for (; i < UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; i++) { + status = CUDA_FUNC( + cudaGraphExecMemcpyNodeSetParams1D(ee_task->graph_exec, nodes[i], + task_args->copy_multi.dst[0], + task_args->copy_multi.src[0], + 1, cudaMemcpyDefault)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to instantiate graph"); + goto free_task; + } + } + + status = CUDA_FUNC(cudaGraphLaunch(ee_task->graph_exec, stream)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to instantiate graph"); + goto free_task; + } + + } else { + status = ucc_ec_cuda_copy_multi_kernel(task_args, stream); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to start copy multi op"); + goto free_task; + } } break; case UCC_EE_EXECUTOR_TASK_REDUCE: @@ -141,6 +181,10 @@ ucc_cuda_executor_interruptible_task_finalize(ucc_ee_executor_task_t *task) ucc_assert(task->status == UCC_OK); status = ucc_ec_cuda_event_destroy(ee_task->event); + // if (ee_task->graph) { + // cudaGraphExecDestroy(ee_task->graph_exec); + // cudaGraphDestroy(ee_task->graph); + // } ucc_mpool_put(task); return status; } diff --git a/src/components/tl/cuda/allgather/allgather_linear.c b/src/components/tl/cuda/allgather/allgather_linear.c index f50ac85894..572035c6a5 100644 --- a/src/components/tl/cuda/allgather/allgather_linear.c +++ b/src/components/tl/cuda/allgather/allgather_linear.c @@ -15,7 +15,8 @@ ucc_status_t ucc_tl_cuda_allgather_linear_init(ucc_base_coll_args_t *coll_args, ucc_tl_cuda_task_t *task; ucc_status_t status; - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo))) { + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/allgatherv/allgatherv_linear.c b/src/components/tl/cuda/allgatherv/allgatherv_linear.c index 9b3c67b344..c1600541a1 100644 --- a/src/components/tl/cuda/allgatherv/allgatherv_linear.c +++ b/src/components/tl/cuda/allgatherv/allgatherv_linear.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -157,8 +157,10 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) ucc_status_t st; int step, i; void * sbuf, *dbuf; + ucc_rank_t peer;//, nv; size_t send_size, frag_size, frag_offset, local_offset, remote_offset, scratch_offset, rank_offset; + ucc_ee_executor_task_args_t eargs; st = ucc_coll_task_get_executor(&task->super, &exec); if (ucc_unlikely(st != UCC_OK)) { @@ -167,16 +169,15 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) step = get_rank_step(task, trank, 0); while (step < num_steps) { - if (task->allgatherv_linear.num_posted > - task->allgatherv_linear.num_completed) { - for (i = 0; i < tsize * 2; i++) { + if ((task->allgatherv_linear.exec_task[0] != NULL) || + (task->allgatherv_linear.exec_task[1] != NULL)) { + for (i = 0; i < 2; i++) { etask = task->allgatherv_linear.exec_task[i]; if (etask != NULL) { st = ucc_ee_executor_task_test(etask); if (st == UCC_OK) { ucc_ee_executor_task_finalize(etask); task->allgatherv_linear.exec_task[i] = NULL; - task->allgatherv_linear.num_completed++; } else { if (ucc_likely(st > 0)) { return UCC_INPROGRESS; @@ -190,6 +191,7 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) continue; } + for (i = 0; i < tsize; i++) { if (get_rank_step(task, i, 0) < step) { return UCC_INPROGRESS; @@ -218,19 +220,21 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) } else { sbuf = PTR_OFFSET(task->allgatherv_linear.sbuf, frag_offset); } - for (i = 0; i < tsize; i++) { - if (i == trank) { - continue; - } + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY_MULTI; + for (i = 0; i < tsize - 1; i++) { + peer = (trank + i + 1) % UCC_TL_TEAM_SIZE(team); scratch_offset = get_scratch_offset(team, dt, trank); - dbuf = PTR_OFFSET(TASK_SCRATCH(task, i), + dbuf = PTR_OFFSET(TASK_SCRATCH(task, peer), remote_offset + scratch_offset); - - st = ecopy(dbuf, sbuf, frag_size, exec, - &task->allgatherv_linear.exec_task[i]); - if (ucc_unlikely(st != UCC_OK)) { - return st; - } + eargs.copy_multi.src[i] = sbuf; + eargs.copy_multi.dst[i] = dbuf; + eargs.copy_multi.counts[i] = frag_size; + } + eargs.copy_multi.num_vectors = tsize - 1; + st = ucc_ee_executor_task_post(exec, &eargs, + &task->allgatherv_linear.exec_task[0]); + if (ucc_unlikely(st != UCC_OK)) { + return st; } if (!UCC_IS_INPLACE(*args)) { rank_offset = @@ -240,22 +244,19 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) st = ecopy(dbuf, sbuf, task->allgatherv_linear.get_count(task, trank) * dt_size, - exec, &task->allgatherv_linear.exec_task[tsize]); + exec, &task->allgatherv_linear.exec_task[1]); if (ucc_unlikely(st != UCC_OK)) { return st; } - task->allgatherv_linear.num_posted++; } - task->allgatherv_linear.num_posted += tsize - 1; } else if (step == (num_steps - 1)) { - for (i = 0; i < tsize; i++) { - if (i == trank) { - continue; - } - scratch_offset = get_scratch_offset(team, dt, i); + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY_MULTI; + for (i = 0; i < tsize - 1; i++) { + peer = (trank + i + 1) % UCC_TL_TEAM_SIZE(team); + scratch_offset = get_scratch_offset(team, dt, peer); rank_offset = - task->allgatherv_linear.get_offset(task, i) * dt_size; - send_size = task->allgatherv_linear.get_count(task, i); + task->allgatherv_linear.get_offset(task, peer) * dt_size; + send_size = task->allgatherv_linear.get_count(task, peer); frag_offset = ucc_buffer_block_offset(send_size, nfrags, step - 1) * dt_size; @@ -266,14 +267,16 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) local_offset + scratch_offset); dbuf = PTR_OFFSET(task->allgatherv_linear.rbuf, rank_offset + frag_offset); - - st = ecopy(dbuf, sbuf, frag_size, exec, - &task->allgatherv_linear.exec_task[i]); - if (ucc_unlikely(st != UCC_OK)) { - return st; - } + eargs.copy_multi.src[i] = sbuf; + eargs.copy_multi.dst[i] = dbuf; + eargs.copy_multi.counts[i] = frag_size; + } + eargs.copy_multi.num_vectors = tsize - 1; + st = ucc_ee_executor_task_post(exec, &eargs, + &task->allgatherv_linear.exec_task[0]); + if (ucc_unlikely(st != UCC_OK)) { + return st; } - task->allgatherv_linear.num_posted += tsize - 1; } else { send_size = task->allgatherv_linear.get_count(task, trank); frag_size = @@ -285,28 +288,29 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) sbuf = PTR_OFFSET(task->allgatherv_linear.rbuf, rank_offset + frag_offset); scratch_offset = get_scratch_offset(team, dt, trank); - for (i = 0; i < tsize; i++) { - if (i == trank) { - continue; - } - dbuf = PTR_OFFSET(TASK_SCRATCH(task, i), + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY_MULTI; + for (i = 0; i < tsize - 1; i++) { + peer = (trank + i + 1) % UCC_TL_TEAM_SIZE(team); + dbuf = PTR_OFFSET(TASK_SCRATCH(task, peer), remote_offset + scratch_offset); - - st = ecopy(dbuf, sbuf, frag_size, exec, - &task->allgatherv_linear.exec_task[i]); - if (ucc_unlikely(st != UCC_OK)) { - return st; - } + eargs.copy_multi.src[i] = sbuf; + eargs.copy_multi.dst[i] = dbuf; + eargs.copy_multi.counts[i] = frag_size; + } + eargs.copy_multi.num_vectors = tsize - 1; + st = ucc_ee_executor_task_post(exec, &eargs, + &task->allgatherv_linear.exec_task[0]); + if (ucc_unlikely(st != UCC_OK)) { + return st; } - for (i = 0; i < tsize; i++) { - if (i == trank) { - continue; - } - scratch_offset = get_scratch_offset(team, dt, i); + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY_MULTI; + for (i = 0; i < tsize - 1; i++) { + peer = (trank + i + 1) % UCC_TL_TEAM_SIZE(team); + scratch_offset = get_scratch_offset(team, dt, peer); rank_offset = - task->allgatherv_linear.get_offset(task, i) * dt_size; - send_size = task->allgatherv_linear.get_count(task, i); + task->allgatherv_linear.get_offset(task, peer) * dt_size; + send_size = task->allgatherv_linear.get_count(task, peer); frag_offset = ucc_buffer_block_offset(send_size, nfrags, step - 1) * dt_size; @@ -317,14 +321,16 @@ ucc_tl_cuda_allgatherv_linear_progress_frag(ucc_tl_cuda_task_t *task) local_offset + scratch_offset); dbuf = PTR_OFFSET(task->allgatherv_linear.rbuf, rank_offset + frag_offset); - - st = ecopy(dbuf, sbuf, frag_size, exec, - &task->allgatherv_linear.exec_task[tsize + i]); - if (ucc_unlikely(st != UCC_OK)) { - return st; - } + eargs.copy_multi.src[i] = sbuf; + eargs.copy_multi.dst[i] = dbuf; + eargs.copy_multi.counts[i] = frag_size; + } + eargs.copy_multi.num_vectors = tsize - 1; + st = ucc_ee_executor_task_post(exec, &eargs, + &task->allgatherv_linear.exec_task[1]); + if (ucc_unlikely(st != UCC_OK)) { + return st; } - task->allgatherv_linear.num_posted += 2 * (tsize - 1); } } @@ -338,7 +344,7 @@ void ucc_tl_cuda_allgatherv_linear_progress(ucc_coll_task_t *coll_task) ucc_status_t st; task->super.status = UCC_INPROGRESS; - switch (task->allgatherv_ring.stage) { + switch (task->allgatherv_linear.stage) { case STAGE_SYNC: if (ucc_tl_cuda_get_sync(task) != UCC_OK) { task->super.status = UCC_INPROGRESS; @@ -387,14 +393,12 @@ ucc_status_t ucc_tl_cuda_allgatherv_linear_start(ucc_coll_task_t *coll_task) ucc_tl_cuda_team_t *team = TASK_TEAM(task); ucc_coll_args_t * args = &TASK_ARGS(task); ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); - ucc_datatype_t dt = task->allgatherv_ring.dt; + ucc_datatype_t dt = task->allgatherv_linear.dt; ucc_rank_t i; size_t send_size, frag_size, ssize; task->allgatherv_linear.stage = STAGE_SYNC; task->allgatherv_linear.sbuf = args->src.info.buffer; - task->allgatherv_linear.num_posted = 0; - task->allgatherv_linear.num_completed = 0; if (args->coll_type == UCC_COLL_TYPE_ALLGATHERV) { task->allgatherv_linear.rbuf = args->dst.info_v.buffer; } else { @@ -415,9 +419,9 @@ ucc_status_t ucc_tl_cuda_allgatherv_linear_start(ucc_coll_task_t *coll_task) ssize = get_scratch_size(team, dt); frag_size = ucc_min(ssize / 2 / ucc_dt_size(dt) / tsize, send_size); task->allgatherv_linear.num_frags = ucc_div_round_up(send_size, frag_size); + task->allgatherv_linear.exec_task[0] = NULL; + task->allgatherv_linear.exec_task[1] = NULL; - memset(task->allgatherv_linear.exec_task, 0, - 2 * tsize * sizeof(ucc_ee_executor_task_t *)); return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); } @@ -429,7 +433,8 @@ ucc_status_t ucc_tl_cuda_allgatherv_linear_init(ucc_base_coll_args_t *coll_args, ucc_tl_cuda_task_t *task; ucc_status_t status; - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo))) { + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c b/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c index 0b67479670..8635aa3760 100644 --- a/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c +++ b/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c @@ -19,7 +19,8 @@ ucc_status_t ucc_tl_cuda_reduce_scatter_linear_init(ucc_base_coll_args_t *coll_a return UCC_ERR_NOT_SUPPORTED; } - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo))) { + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c b/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c index ee2e92f410..ea420f5d85 100644 --- a/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c +++ b/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -162,31 +162,32 @@ ucc_tl_cuda_reduce_scatterv_linear_copy(ucc_tl_cuda_task_t *task, size_t scratch_offset, scratch_stride, send_size, frag_size, frag_offset, rank_offset; ucc_ee_executor_task_args_t eargs; - ucc_rank_t i, nv; + ucc_rank_t i, nv, peer; scratch_offset = get_scratch_offset(team, dt, trank); scratch_stride = get_scratch_stride(team, dt); eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY_MULTI; eargs.flags = 0; for (i = 0, nv = 0; i < tsize; i++) { - if (i == trank) { + peer = (trank + i) % UCC_TL_TEAM_SIZE(team); + if (peer == trank) { continue; } - send_size = task->reduce_scatterv_linear.get_count(task, i); + send_size = task->reduce_scatterv_linear.get_count(task, peer); frag_size = ucc_buffer_block_count(send_size, nfrags, step); frag_offset = ucc_buffer_block_offset(send_size, nfrags, step); - rank_offset = task->reduce_scatterv_linear.get_offset(task, i); + rank_offset = task->reduce_scatterv_linear.get_offset(task, peer); if (frag_size == 0) { continue; } eargs.copy_multi.src[nv] = PTR_OFFSET(sbuf, (rank_offset + frag_offset) * dt_size); eargs.copy_multi.counts[nv] = frag_size * dt_size; - if (trank < i) { - eargs.copy_multi.dst[nv] = PTR_OFFSET(TASK_SCRATCH(task, i), + if (trank < peer) { + eargs.copy_multi.dst[nv] = PTR_OFFSET(TASK_SCRATCH(task, peer), remote_offset + scratch_offset); } else { - eargs.copy_multi.dst[nv] = PTR_OFFSET(TASK_SCRATCH(task, i), + eargs.copy_multi.dst[nv] = PTR_OFFSET(TASK_SCRATCH(task, peer), remote_offset + scratch_offset - scratch_stride); } nv++; @@ -312,17 +313,20 @@ ucc_tl_cuda_reduce_scatterv_linear_progress_frag(ucc_tl_cuda_task_t *task) } if (step == 0) { - ucc_tl_cuda_reduce_scatterv_linear_copy(task, exec, sbuf, step, + st = ucc_tl_cuda_reduce_scatterv_linear_copy(task, exec, sbuf, step, remote_offset, &task->reduce_scatterv_linear.exec_task[0]); } else if (step == (num_steps - 1)) { - ucc_tl_cuda_reduce_scatterv_linear_reduce(task, exec, sbuf, rbuf, + st = ucc_tl_cuda_reduce_scatterv_linear_reduce(task, exec, sbuf, rbuf, step - 1, local_offset, &task->reduce_scatterv_linear.exec_task[1]); } else { - ucc_tl_cuda_reduce_scatterv_linear_copy(task, exec, sbuf, step, + st = ucc_tl_cuda_reduce_scatterv_linear_copy(task, exec, sbuf, step, remote_offset, &task->reduce_scatterv_linear.exec_task[0]); - ucc_tl_cuda_reduce_scatterv_linear_reduce(task, exec, sbuf, rbuf, + st = ucc_tl_cuda_reduce_scatterv_linear_reduce(task, exec, sbuf, rbuf, step - 1, local_offset, &task->reduce_scatterv_linear.exec_task[1]); } + if (ucc_unlikely(st != UCC_OK)) { + return st; + } if ((task->reduce_scatterv_linear.exec_task[0] == NULL) && (task->reduce_scatterv_linear.exec_task[1] == NULL)) { @@ -432,7 +436,8 @@ ucc_tl_cuda_reduce_scatterv_linear_init(ucc_base_coll_args_t *coll_args, return UCC_ERR_NOT_SUPPORTED; } - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo))) { + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/tl_cuda.h b/src/components/tl/cuda/tl_cuda.h index b4c0685c7b..792100c80c 100644 --- a/src/components/tl/cuda/tl_cuda.h +++ b/src/components/tl/cuda/tl_cuda.h @@ -215,12 +215,10 @@ struct ucc_tl_cuda_task { struct { int stage; int num_frags; - int num_posted; - int num_completed; ucc_datatype_t dt; void * sbuf; void * rbuf; - ucc_ee_executor_task_t *exec_task[2 * UCC_TL_CUDA_MAX_PEERS]; + ucc_ee_executor_task_t *exec_task[2]; size_t (*get_count)(const ucc_tl_cuda_task_t *task, ucc_rank_t block); size_t (*get_offset)(const ucc_tl_cuda_task_t *task, diff --git a/src/components/tl/cuda/tl_cuda_team_topo.c b/src/components/tl/cuda/tl_cuda_team_topo.c index e1f3011ae2..b1161ede6a 100644 --- a/src/components/tl/cuda/tl_cuda_team_topo.c +++ b/src/components/tl/cuda/tl_cuda_team_topo.c @@ -247,6 +247,7 @@ ucc_tl_cuda_team_topo_init_proxies(const ucc_tl_cuda_team_t *team, } topo->num_proxies = num_proxies; + topo->is_fully_connected = (num_proxies == 0) ? 1 : 0; if (num_proxies == 0) { return UCC_OK; } diff --git a/src/components/tl/cuda/tl_cuda_team_topo.h b/src/components/tl/cuda/tl_cuda_team_topo.h index 0b4b840837..96b6d63a5b 100644 --- a/src/components/tl/cuda/tl_cuda_team_topo.h +++ b/src/components/tl/cuda/tl_cuda_team_topo.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -22,12 +22,13 @@ typedef struct ucc_tl_cuda_ring { } ucc_tl_cuda_ring_t; typedef struct ucc_tl_cuda_team_topo { - ucc_rank_t *matrix; /* nvlink adjacency matrix */ - int proxy_needed; /* is proxy needed for current rank */ - int num_proxies; /* number of entries in proxies list */ - ucc_tl_cuda_proxy_t *proxies; /* list of pairs where current rank is proxy */ - int num_rings; /* number of entries in rings list */ - ucc_tl_cuda_ring_t *rings; /* list of rings for ring algorithms */ + ucc_rank_t *matrix; /* nvlink adjacency matrix */ + int proxy_needed; /* is proxy needed for current rank */ + int num_proxies; /* number of entries in proxies list */ + ucc_tl_cuda_proxy_t *proxies; /* list of pairs where current rank is proxy */ + int num_rings; /* number of entries in rings list */ + ucc_tl_cuda_ring_t *rings; /* list of rings for ring algorithms */ + int is_fully_connected; /* no proxies in team topo */ } ucc_tl_cuda_team_topo_t; ucc_status_t ucc_tl_cuda_team_topo_create(const ucc_tl_team_t *team, @@ -52,7 +53,7 @@ ucc_tl_cuda_team_topo_is_direct(const ucc_tl_team_t *team, static inline int ucc_tl_cuda_team_topo_is_fully_conntected(const ucc_tl_cuda_team_topo_t *topo) { - return topo->num_proxies == 0; + return topo->is_fully_connected; } #endif diff --git a/tools/perf/ucc_pt_benchmark.cc b/tools/perf/ucc_pt_benchmark.cc index 7be5e5271f..be04ff5088 100644 --- a/tools/perf/ucc_pt_benchmark.cc +++ b/tools/perf/ucc_pt_benchmark.cc @@ -58,7 +58,7 @@ ucc_pt_benchmark::ucc_pt_benchmark(ucc_pt_benchmark_config cfg, coll = new ucc_pt_coll_scatterv(cfg.dt, cfg.mt, cfg.inplace, comm); break; case UCC_PT_OP_TYPE_MEMCPY: - coll = new ucc_pt_op_memcpy(cfg.dt, cfg.mt, comm); + coll = new ucc_pt_op_memcpy(cfg.dt, cfg.mt, cfg.n_bufs, comm); break; case UCC_PT_OP_TYPE_REDUCEDT: coll = new ucc_pt_op_reduce(cfg.dt, cfg.mt, cfg.op, cfg.n_bufs, comm); diff --git a/tools/perf/ucc_pt_coll.h b/tools/perf/ucc_pt_coll.h index cec5c7e18b..0628cc242a 100644 --- a/tools/perf/ucc_pt_coll.h +++ b/tools/perf/ucc_pt_coll.h @@ -180,8 +180,9 @@ class ucc_pt_coll_scatterv: public ucc_pt_coll { class ucc_pt_op_memcpy: public ucc_pt_coll { ucc_memory_type_t mem_type; ucc_datatype_t data_type; + int num_bufs; public: - ucc_pt_op_memcpy(ucc_datatype_t dt, ucc_memory_type mt, + ucc_pt_op_memcpy(ucc_datatype_t dt, ucc_memory_type mt, int nbufs, ucc_pt_comm *communicator); ucc_status_t init_args(size_t count, ucc_pt_test_args_t &args) override; void free_args(ucc_pt_test_args_t &args) override; diff --git a/tools/perf/ucc_pt_config.cc b/tools/perf/ucc_pt_config.cc index fff4759a8f..409ef2be94 100644 --- a/tools/perf/ucc_pt_config.cc +++ b/tools/perf/ucc_pt_config.cc @@ -25,7 +25,7 @@ ucc_pt_config::ucc_pt_config() { bench.n_warmup_large = 20; bench.large_thresh = 64 * 1024; bench.full_print = false; - bench.n_bufs = 2; + bench.n_bufs = UCC_PT_DEFAULT_N_BUFS; bench.root = 0; bench.root_shift = 0; comm.mt = bench.mt; diff --git a/tools/perf/ucc_pt_config.h b/tools/perf/ucc_pt_config.h index b7a5f1a267..80543d7491 100644 --- a/tools/perf/ucc_pt_config.h +++ b/tools/perf/ucc_pt_config.h @@ -15,6 +15,8 @@ #include #include "utils/ucc_log.h" +#define UCC_PT_DEFAULT_N_BUFS 0 + enum ucc_pt_bootstrap_type_t { UCC_PT_BOOTSTRAP_MPI, UCC_PT_BOOTSTRAP_UCX diff --git a/tools/perf/ucc_pt_op_memcpy.cc b/tools/perf/ucc_pt_op_memcpy.cc index fd2084f31e..585e8f0e64 100644 --- a/tools/perf/ucc_pt_op_memcpy.cc +++ b/tools/perf/ucc_pt_op_memcpy.cc @@ -11,6 +11,7 @@ #include ucc_pt_op_memcpy::ucc_pt_op_memcpy(ucc_datatype_t dt, ucc_memory_type mt, + int nbufs, ucc_pt_comm *communicator) : ucc_pt_coll(communicator) { @@ -19,8 +20,18 @@ ucc_pt_op_memcpy::ucc_pt_op_memcpy(ucc_datatype_t dt, ucc_memory_type mt, has_range_ = true; has_bw_ = true; + if (nbufs == UCC_PT_DEFAULT_N_BUFS) { + nbufs = 1; + } + + if (nbufs > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS) { + throw std::runtime_error("max supported number of copy buffer is " + STR(UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)); + } + data_type = dt; mem_type = mt; + num_bufs = nbufs; } ucc_status_t ucc_pt_op_memcpy::init_args(size_t count, @@ -29,14 +40,27 @@ ucc_status_t ucc_pt_op_memcpy::init_args(size_t count, ucc_ee_executor_task_args_t &args = test_args.executor_args; size_t size = count * ucc_dt_size(data_type); ucc_status_t st; + int i; - UCCCHECK_GOTO(ucc_pt_alloc(&dst_header, size, mem_type), exit, st); - UCCCHECK_GOTO(ucc_pt_alloc(&src_header, size, mem_type), free_dst, st); + UCCCHECK_GOTO(ucc_pt_alloc(&dst_header, num_bufs * size, mem_type), + exit, st); + UCCCHECK_GOTO(ucc_pt_alloc(&src_header, num_bufs * size, mem_type), + free_dst, st); - args.task_type = UCC_EE_EXECUTOR_TASK_COPY; - args.copy.dst = dst_header->addr; - args.copy.src = src_header->addr; - args.copy.len = size; + if (num_bufs == 1) { + args.task_type = UCC_EE_EXECUTOR_TASK_COPY; + args.copy.dst = dst_header->addr; + args.copy.src = src_header->addr; + args.copy.len = size; + } else { + args.task_type = UCC_EE_EXECUTOR_TASK_COPY_MULTI; + args.copy_multi.num_vectors = num_bufs; + for (i = 0; i < num_bufs; i++) { + args.copy_multi.src[i] = PTR_OFFSET(src_header->addr, size * i); + args.copy_multi.dst[i] = PTR_OFFSET(dst_header->addr, size * i); + args.copy_multi.counts[i] = size; + } + } return UCC_OK; free_dst: @@ -49,7 +73,17 @@ float ucc_pt_op_memcpy::get_bw(float time_ms, int grsize, ucc_pt_test_args_t test_args) { ucc_ee_executor_task_args_t &args = test_args.executor_args; - float S = args.copy.len; + float S; + int i; + + if (args.task_type == UCC_EE_EXECUTOR_TASK_COPY) { + S = args.copy.len; + } else { + S = 0; + for (i = 0; i < args.copy_multi.num_vectors; i++) { + S += args.copy_multi.counts[i]; + } + } return 2 * (S / time_ms) / 1000.0; } diff --git a/tools/perf/ucc_pt_op_reduce.cc b/tools/perf/ucc_pt_op_reduce.cc index 9e5225428e..d9683c0d50 100644 --- a/tools/perf/ucc_pt_op_reduce.cc +++ b/tools/perf/ucc_pt_op_reduce.cc @@ -20,6 +20,10 @@ ucc_pt_op_reduce::ucc_pt_op_reduce(ucc_datatype_t dt, ucc_memory_type mt, has_range_ = true; has_bw_ = true; + if (nbufs == UCC_PT_DEFAULT_N_BUFS) { + nbufs = 2; + } + if (nbufs < 2) { throw std::runtime_error("dt reduce op requires at least 2 bufs"); } diff --git a/tools/perf/ucc_pt_op_reduce_strided.cc b/tools/perf/ucc_pt_op_reduce_strided.cc index f3e9f66171..290d98745d 100644 --- a/tools/perf/ucc_pt_op_reduce_strided.cc +++ b/tools/perf/ucc_pt_op_reduce_strided.cc @@ -22,6 +22,10 @@ ucc_pt_op_reduce_strided::ucc_pt_op_reduce_strided(ucc_datatype_t dt, has_range_ = true; has_bw_ = true; + if (nbufs == UCC_PT_DEFAULT_N_BUFS) { + nbufs = 2; + } + if (nbufs < 2) { throw std::runtime_error("dt reduce op requires at least 2 bufs"); }