Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collective async wait op #31463

Merged

Conversation

wangxicoding
Copy link
Contributor

@wangxicoding wangxicoding commented Mar 7, 2021

PR types

New features

PR changes

Ops

Describe

Add collective async wait_comm and wait_compute op.
Performance test:

import numpy as np
import paddle
import paddle.fluid.unique_name as unique_name
from paddle.fluid import core
import os
import sys
import time


def runtime_config():
    args = {}
    rank = int(os.getenv("PADDLE_TRAINER_ID", 0))
    endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS', '127.0.0.0:0')
    endpoints = endpoints.split(',')
    return rank, endpoints


def init_comm(program, rank, endpoints):
    current_endpoint = endpoints[rank]
    nranks = len(endpoints)

    other_endpoints = endpoints[:]
    other_endpoints.remove(current_endpoint)

    block = program.global_block()
    nccl_id_var = block.create_var(
         name=unique_name.generate('nccl_id'),
         persistable=True,
         type=core.VarDesc.VarType.RAW)

    block.append_op(
        type='c_gen_nccl_id',
        inputs={},
        outputs={'Out': nccl_id_var},
        attrs={
            'rank': rank,
            'endpoint': current_endpoint,
            'other_endpoints': other_endpoints,
        })

    block.append_op(
        type='c_comm_init',
        inputs={'X': nccl_id_var},
        outputs={},
        attrs={
            'nranks': nranks,
            'rank': rank,
            'ring_id': 0,
        })


def get_model(main_prog, startup_prog, sync=False):
    ring_id = 0
    with paddle.static.program_guard(main_prog, startup_prog):
        a = paddle.static.data(
            name="a", shape=[1000, 10000], dtype='float32')
        b = paddle.assign(a)

        c = a + b
        d = a + b

        if not sync:
            main_prog.global_block().append_op(
                type='c_wait_compute',
                inputs={'X': c},
                outputs={'Out': c},
                attrs={'ring_id': ring_id})

        for i in range(64):
            main_prog.global_block().append_op(
                type="c_allreduce_sum",
                inputs={'X': c},
                outputs={'Out': c},
                attrs={'ring_id': ring_id,
                      'use_calc_stream': True if sync else False,
                      })
            for j in range(10):
                e = 2 * d
            paddle.assign(e, d)

        if not sync:
            main_prog.global_block().append_op(
                type="c_wait_comm",
                inputs={'X': c},
                outputs={'Out': c},
                attrs={'ring_id': ring_id})

        out = c - d
        return out

def run_trainer(sync):
    train_prog = paddle.static.Program()
    startup_prog = paddle.static.Program()

    rank, endpoints = runtime_config()
    init_comm(startup_prog, rank, endpoints)

    result = get_model(train_prog, startup_prog, sync)

    print(train_prog)

    place = paddle.set_device('gpu')
    exe = paddle.static.Executor(place)
    exe.run(startup_prog)

    np.random.seed(0)
    a = np.random.random((1000, 10000)).astype('float32')

    for i in range(10):
        if i == 2:
            t0 = time.time()
        out = exe.run(train_prog,
                      feed={'a': a},
                      fetch_list=[result.name])
    cost_time = time.time() - t0
    print(out)
    print('time={}'.format(cost_time))


if __name__ == "__main__":
    sync = False
    if len(sys.argv) == 2:
        sync = sys.argv[1] == 'True'
    print('run sync mode={}'.format(sync))
    paddle.enable_static()
    run_trainer(sync)
  • allreduce with calc stream sync run
    fleetrun --gpus 0,1 test.py True
  • allreduce with comm stream async run
    fleetrun --gpus 0,1 test.py False

Test Time

Sync Async
1.627s 1.493s

@paddle-bot-old
Copy link

paddle-bot-old bot commented Mar 7, 2021

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

Copy link
Contributor

@qili93 qili93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for HIP platform, compile pass

Copy link

@sandyhouse sandyhouse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@gongweibao gongweibao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@luotao1 luotao1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for framework.py

@wangxicoding wangxicoding merged commit 83a2fb1 into PaddlePaddle:develop Mar 10, 2021
@wangxicoding wangxicoding deleted the add_collective_async_wait_op branch April 15, 2021 02:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants