Skip to content

Commit

Permalink
torch: refactor crossing barrier (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
yhpeng authored and ymjiang committed Nov 19, 2019
1 parent e4bbd24 commit 98b242f
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 119 deletions.
54 changes: 0 additions & 54 deletions byteps/bytescheduler/README.md

This file was deleted.

18 changes: 0 additions & 18 deletions byteps/bytescheduler/__init__.py

This file was deleted.

Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,27 @@
broadcast_optimizer_state = bps.broadcast_optimizer_state


class _ScheduledOptimizer(_DistributedOptimizer):
class _CrossBarrier(_DistributedOptimizer):
"""An optimizer that wraps a _DistributedOptimizer, intercepting push-pull operations.
This class enables overlapping gradient push-pull with both backward and forward propagation while maintaining
correct dependencies. It can achieve even higher training performance than current BytePS.
correct dependencies. It can achieve even higher training performance than the default BytePS with proper system
parameters. To understand the principles behind barrier crossing, check the paper
https://dl.acm.org/citation.cfm?id=3359642
"""
def __init__(self, model, byteps_opt, num_steps=10**6):
"""Construct a new ScheduledOptimizer, which uses byteps optimizer under the hood for averaging gradients
across all workers.
Args:
model: The training model. ByteScheduler uses the model object to register hooks.
model: The training model. BytePS uses the model object to register hooks.
byteps_opt: Optimizer to use for averaging gradients and applying updates.
num_steps: The maximum number of training steps. ByteScheduler needs to know when to stop cross-iteration
num_steps: The maximum number of training steps. BytePS needs to know when to stop cross-iteration
scheduling.
"""
self._model = model
self._opt = byteps_opt
self._logger = logging.getLogger("ByteScheduler")
self._logger = logging.getLogger("CrossBarrier")

self._logger.info("CrossBarrier is enabled.")
self._logger.debug("byteps size {}, rank {}".format(size(), rank()))
self._desc = "rank {}".format(rank())

Expand All @@ -59,19 +63,14 @@ def __init__(self, model, byteps_opt, num_steps=10**6):
self._register_forward_hooks()
self._register_hooks()

# Poll whether the tensor push-pull is finished.
self._event_queue = queue.Queue()
self._poller = threading.Thread(target=self._poll, args=())
self._poller.start()
# Poll whether the tensor push-pull is finished.
self._event_queue = queue.Queue()
self._poller = threading.Thread(target=self._poll, args=())
self._poller.start()

def __getattr__(self, item):
return getattr(self._opt, item)

def __del__(self):
"""Clean up"""
self._event_queue.put((None, None, None))
self._poller.join()

def step(self, closure=None):
"""Override the default step function."""
self._logger.debug("{} calls step() {}".format(self._desc, self._step))
Expand All @@ -84,7 +83,9 @@ def step(self, closure=None):
self._logger.debug("final step {}, waiting for push-pull completion.".format(self._final_step))
while not self._event_queue.empty():
time.sleep(0.001)
self._logger.debug("training finished!")
self._event_queue.put((None, None, None))
self._poller.join()
self._logger.info("training finished!")
loss = None
if closure is not None:
loss = closure()
Expand All @@ -105,6 +106,13 @@ def zero_grad(self):
else:
self._opt.zero_grad()

def _get_parameter_name(self, p):
if self._is_tensor_instance:
name = self._parameter_names.get(p.__hash__())
else:
name = self._parameter_names.get(p)
return name

def _register_hooks(self):
for param_group in self.param_groups:
for p in param_group['params']:
Expand Down Expand Up @@ -137,13 +145,13 @@ def _push_pull_grad_async(self, p):
Returns:
an push-pull handle and context
"""
name = self._parameter_names.get(id(p))
name = self._get_parameter_name(p)
tensor = p.grad
tensor_compressed, ctx = self._compression.compress(tensor)

self._locks[p].acquire()
handle = byteps_push_pull(tensor_compressed, average=True, name="Gradient."+name)
self._logger.debug("{} calls byteps_push_pull for {}".format(self._desc, self._parameter_names[id(p)]))
self._logger.debug("{} calls byteps_push_pull for {}".format(self._desc, self._get_parameter_name(p)))
# Add to queue to poll completion
self._event_queue.put((p, handle, ctx))
return handle, ctx
Expand All @@ -159,17 +167,17 @@ def _poll(self):
if handle is not None and poll(handle):
output = synchronize(handle)
p.grad.set_(self._compression.decompress(output, ctx))
self._logger.debug("{} {} finished push-pull".format(self._desc, self._parameter_names[id(p)]))
self._logger.debug("{} {} finished push-pull".format(self._desc, self._get_parameter_name(p)))
self._push_pull_delay[p] = self.backward_passes_per_step
# So far ByteScheduler only supports SGD, Adam and RMSprop optimizers in torch
# So only support SGD, Adam and RMSprop optimizers in torch
if isinstance(self._opt, torch.optim.SGD):
self._sgd(p)
elif isinstance(self._opt, torch.optim.Adam):
self._adam(p)
elif isinstance(self._opt, torch.optim.RMSprop):
self._rmsprop(p)
else:
raise ValueError("Invalid optimizer! ByteScheduler only supports SGD, Adam and RMSprop.")
raise ValueError("Invalid optimizer! Only support SGD, Adam and RMSprop.")
self._zero_one_grad(p)
# notify update completion and parameter is ready for forward propagation
if p in self._locks:
Expand Down Expand Up @@ -200,7 +208,7 @@ def pre_forward_hook(mod, input):
if p not in self._locks:
continue
with self._locks[p]:
self._logger.debug("{} {} is ready.".format(self._desc, self._parameter_names[id(p)]))
self._logger.debug("{} {} is ready.".format(self._desc, self._get_parameter_name(p)))

self._logger.debug("{} starts forward {}.".format(self._desc, mod))

Expand Down Expand Up @@ -237,9 +245,9 @@ def _sgd(self, p):
nesterov = group['nesterov']

for gp in group['params']:
if self._parameter_names[id(p)] != self._parameter_names[id(gp)] or gp.shape != p.shape:
if self._get_parameter_name(p) != self._get_parameter_name(gp) or gp.shape != p.shape:
continue
self._logger.debug("{} is updating {}".format(self._desc, self._parameter_names[id(p)]))
self._logger.debug("{} is updating {}".format(self._desc, self._get_parameter_name(p)))
if p.grad is None:
continue
d_p = p.grad.data
Expand Down Expand Up @@ -267,9 +275,9 @@ def _adam(self, p):
"""
for group in self.param_groups:
for gp in group['params']:
if self._parameter_names[id(p)] != self._parameter_names[id(gp)] or gp.shape != p.shape:
if self._get_parameter_name(p) != self._get_parameter_name(gp) or gp.shape != p.shape:
continue
self._logger.debug("{} is updating {}".format(self._desc, self._parameter_names[id(p)]))
self._logger.debug("{} is updating {}".format(self._desc, self._get_parameter_name(p)))
if p.grad is None:
continue
grad = p.grad.data
Expand Down Expand Up @@ -328,9 +336,9 @@ def _rmsprop(self, p):
"""
for group in self.param_groups:
for gp in group['params']:
if self._parameter_names[id(p)] != self._parameter_names[id(gp)] or gp.shape != p.shape:
if self._get_parameter_name(p) != self._get_parameter_name(gp) or gp.shape != p.shape:
continue
self._logger.debug("{} is updating {}".format(self._desc, self._parameter_names[id(p)]))
self._logger.debug("{} is updating {}".format(self._desc, self._get_parameter_name(p)))
if p.grad is None:
continue
grad = p.grad.data
Expand Down Expand Up @@ -388,15 +396,30 @@ def wrapped_func(*args, **kwargs):
hijack(_DistributedOptimizer, '_register_hooks')


def DistributedOptimizer(model,
optimizer,
named_parameters=None,
compression=Compression.none,
backward_passes_per_step=1,
num_steps=10**6):
"""Wrap Torch optimizer using BytePS DistributedOptimizer and ByteScheduler _ScheduledOptimizer."""
def _init_logger():
logger = logging.getLogger("CrossBarrier")
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(filename)s:%(lineno)s %(levelname)s: %(message)s',
'%H:%M:%S')
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
fh = logging.FileHandler('cross_barrier.log', 'w')
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.propagate = False
logger.setLevel(logging.INFO)


def CrossBarrier(model,
optimizer,
named_parameters=None,
compression=Compression.none,
backward_passes_per_step=1,
num_steps=10**6):
"""Wrap Torch optimizer using BytePS DistributedOptimizer and _CrossBarrier."""
bps_opt = _bps_DistributedOptimizer(optimizer, named_parameters, compression, backward_passes_per_step)
return _ScheduledOptimizer(model, bps_opt, num_steps)
return _CrossBarrier(model, bps_opt, num_steps)


_init_bsc()
_init_logger()
33 changes: 33 additions & 0 deletions docs/cross-barrier.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Cross Global Barrier

This eliminates the global barrier between training iterations for distributed training frameworks (e.g.,
PyTorch), so that the priority-based communication scheduling in BytePS can be effective.

## Why Crossing Barrier?

Existing distributed training frameworks (PyTorch, TensorFlow, etc) do not fully utilize the potentials of overlapping
computation and communication to speed up neural network training: they only support communication overlapping with
backward propagation. But due to layer-wise dependencies in DNN training, we can actually schedule gradient
synchronization order based on when they are consumed in the next iteration, and hence overlap communication with
forward-propagation of the next iteration! Read the paper https://dl.acm.org/citation.cfm?id=3359642 for more
communication scheduling details.

To make this idea work, the first step is to remove the global barrier between two iterations to build layer-wise
dependencies, so that the forward computation of next step can start without waiting for parameter synchronization
completion of all parameters.

Fig.1 shows the dependency graph with global barrier. Machine learning frameworks such as PyTorch and TensorFlow have
similar dependencies when using BytePS for push and pull.

![](images/dag_barrier.png)

*Fig.1: Dependency Graph With Global Barrier*

Fig. 2 shows the dependency graph after removing global barrier. What we do here is to change the dependency
graph from Fig. 1 to Fig. 2 by removing the barrier, building layer-wise dependencies while guaranteeing computation correctness.

![](images/dag_without_barrier.png)
*Fig.2: Dependency Graph After Removing Global Barrier*



File renamed without changes
File renamed without changes
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,32 @@
import timeit
import numpy as np
import os
import byteps.bytescheduler.torch.optimizer as bps
import byteps.torch.cross_barrier as bps

"""
This example shows how to enable ByteScheduler on top of BytePS in PyTorch. Note that you can use BytePS without
ByteScheduler at all, so the limitations of ByteScheduler do not apply to BytePS.
ByteScheduler enables overlapping gradient push-pull with both backward computation and forward computation, while
This example shows how to enable barrier crossing on top of BytePS in PyTorch. Note that you can use BytePS without
crossing barrier at all.
Crossing barrier enables overlapping gradient push-pull with both backward computation and forward computation, while
maintaining correct dependencies, e.g., the forward computation of a layer will not start until the parameter of this
layer is updated. Hence it can further improves training performance beyond BytePS.
layer is updated. Hence it can further improves training performance beyond BytePS. See the paper
https://dl.acm.org/citation.cfm?id=3359642 for more details.
To use it, just change the import statement and add two more arugments (i.e., model, num_steps) when wrapping the Torch
optimizer, as shown below:
```
import byteps.bytescheduler.torch.optimizer as bps
optimizer = bps.DistributedOptimizer(model, optimizer, named_parameters, compressionm, backward_passes_per_step, num_steps)
import byteps.torch.cross_barrier as bps
optimizer = bps.CrossBarrier(model, optimizer, named_parameters, compression, backward_passes_per_step, num_steps)
```
So far ByteScheduler supports SGD, Adam and RMSprop optimizers. Please submit a ticket if you need support for
So far we support SGD, Adam and RMSprop optimizers. Please submit a ticket if you need support for
any other optimizers.
To see performance gain, the system parameters should be properly set, including BYTEPS_PARTITION_BYTES and
BYTEPS_SCHEDULING_CREDIT.
"""

# Benchmark settings
parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark',
parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark Without Barrier',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
Expand Down Expand Up @@ -73,17 +79,18 @@
# Move model to GPU.
model.cuda()

# You may try one of the following optimizers
optimizer = optim.SGD(model.parameters(), lr=0.01)
# optimizer = optim.Adam(model.parameters(), lr=0.01)
# optimizer = optim.RMSprop(model.parameters(), lr=0.01)

# BytePS: (optional) compression algorithm.
compression = bps.Compression.fp16 if args.fp16_allreduce else bps.Compression.none

# ByteScheduler: wrap Torch optimizer with DistributedOptimizer.
# Wrap Torch optimizer with CrossBarrier.
# You need to specify two additional args, i.e., model and num_steps.
# Note that ByteScheduler only supports SGD, Adam and RMSProp optimizers so far.
optimizer = bps.DistributedOptimizer(model,
# Note that we only support SGD, Adam and RMSProp optimizers so far.
optimizer = bps.CrossBarrier(model,
optimizer,
named_parameters=model.named_parameters(),
compression=compression,
Expand Down
3 changes: 3 additions & 0 deletions example/pytorch/start_pytorch_byteps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ elif [ "$EVAL_TYPE" == "imagenet" ]; then
elif [ "$EVAL_TYPE" == "benchmark" ]; then
echo "running benchmark..."
python $path/benchmark_byteps.py $@
elif [ "$EVAL_TYPE" == "nobarrierbenchmark" ]; then
echo "running benchmark without global barrier..."
python $path/benchmark_cross_barrier_byteps.py $@
elif [ "$EVAL_TYPE" == "microbenchmark" ]; then
echo "running microbenchmark"
python $path/microbenchmark-byteps.py $@
Expand Down

0 comments on commit 98b242f

Please sign in to comment.