import time import argparse import numpy as np from dask_mpi import initialize from distributed import Client from distributed import wait def dummy(arg): return arg if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-T' , '--ntasks' , type=int , default=None , required=True , help='# tasks') parser.add_argument('-r' , '--repetitions' , type=int , default=1 , help='# repetitions') parser.add_argument('-j' , '--json' , type=str , default=None , required=True , help='dask scheduler json file') args = parser.parse_args() e = Client(scheduler_file=args.json) print(f"ntasks {args.ntasks}") item = None data = [item] * args.ntasks labels = [str(x) for x in np.arange(0,args.ntasks)] for irep in range(args.repetitions): tic = time.time() futures = e.map(dummy, data, key=labels) wait(futures,return_when='ALL_COMPLETED') print('time in seconds',time.time()-tic) del futures