-
Notifications
You must be signed in to change notification settings - Fork 549
Add distributed training examples of PyTorch #4821
Changes from 45 commits
c612608
9b8ce66
e6772d3
b1f5b8c
31a46c8
9057564
610a420
da4b007
6a5fc8c
f51c5aa
b4f03fe
e18c9f8
cf7c284
3a84055
4ad2f85
43a11d2
2e59d33
ed0a7c6
6ac0633
562c448
e4b5dd1
ce8b3ce
0fd1f19
7db6cbd
f46a663
4519685
326b051
d9f2d8d
4bdb7c5
4cbb352
2c488f5
9a93e9f
1bb98ac
353bfdf
078d645
4efc9ac
6373f3a
429a6e9
f8fa108
659c48b
0037ab4
f957c60
863eda6
640c193
42cda8e
8c2c599
eed7c7f
adeb4c6
1f675a1
f0242c7
a54c606
f494dcf
c46c462
b18d0df
f585648
853d112
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
## How OpenPAI Deploy Distributed Jobs | ||
### Taskrole and Instance | ||
When we execute distributed programs on PAI, we can add different task roles for our job. For single server jobs, there is only one task role. For distributed jobs, there may be multiple task roles. For example, when TensorFlow is used to running distributed jobs, it has two roles, including the parameter server and the worker. In distributed jobs, each role may have one or more instances. For example, if it's 8 instances in a worker role of TensorFlow. It means there should be 8 Docker containers for the worker role. Please visit [here](how-to-use-advanced-job-settings.html#multiple-task-roles) for specific operations. | ||
|
||
### Environmental variables | ||
In a distributed job, one task might communicate with others (When we say task, we mean a single instance of a task role). So a task need to be aware of other tasks' runtime information such as IP, port, etc. The system exposes such runtime information as environment variables to each task's Docker container. For mutual communication, users can write code in the container to access those runtime environment variables. Please visit [here](how-to-use-advanced-job-settings.html#environmental-variables-and-port-reservation) for specific operations. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
### Retry policy and Completion policy | ||
If unknown error happens, PAI will retry the job according to user settings. To set a retry policy and completion policy for user's job,PAI asks user to switch to Advanced mode. Please visit [here](how-to-use-advanced-job-settings.html#job-exit-spec-retry-policy-and-completion-policy) for specific operations. | ||
### Run PyTorch Distributed Jobs in OpenPAI | ||
Example Name | Multi-GPU | Multi-Node | Backend |Apex| Job protocol | | ||
---|---|---|---|---|---| | ||
Single-Node DataParallel CIFAR-10 | ✓| x | -|-| [cifar10-single-node-gpus-cpu-DP.yaml](../../../examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.yaml)| | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be starts with "github.com/....." |
||
cifar10-single-mul-DDP-gloo.yaml | ✓| ✓ | gloo|-| [cifar10-single-mul-DDP-gloo.yaml](../../../examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml)| | ||
cifar10-single-mul-DDP-nccl | ✓| ✓ |nccl|-| [cifar10-single-mul-DDP-nccl.yaml](../../../examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml)| | ||
cifar10-single-mul-DDP-gloo-Apex-mixed | ✓| ✓ | gloo|✓ | [cifar10-single-mul-DDP-gloo-Apex-mixed.yaml](../../../examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml)| | ||
cifar10-single-mul-DDP-nccl-Apex-mixed | ✓| ✓ | nccl| ✓ | [cifar10-single-mul-DDP-gloo-Apex-mixed.yaml](../../../examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml)| | ||
imagenet-single-mul-DDP-gloo | ✓| ✓| gloo|-| [imagenet-single-mul-DDP-gloo.yaml](../../../examples/Distributed-example/Lite-imagenet-single-mul-DDP-gloo.yaml)| | ||
## DataParallel | ||
The single node program is simple. The program executed in PAI is exactly the same as the program in our machine. It should be noted that an Worker can be applied in PAI and a Instance can be applied in Worker. In a worker, we can apply for GPUs that we need. We provide an [example](../../../examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.py) of DP. | ||
|
||
## DistributedDataParallel | ||
DDP requires users set a master node ip and port for synchronization in PyTorch. For the port, you can simply set one certain port, such as `5000` as your master port. However, this port may conflict with others. To prevent port conflict, you can reserve a port in OpenPAI, as we mentioned [here](how-to-use-advanced-job-settings.html#environmental-variables-and-port-reservation). The port you reserved is available in environmental variables like `PAI_PORT_LIST_$taskRole_$taskIndex_$portLabel`, where `$taskIndex` means the instance index of that task role. For example, if your task role name is `work` and port label is `SyncPort`, you can add the following code in your PyTorch DDP program: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
``` | ||
os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] | ||
os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] | ||
``` | ||
|
||
If you are using `gloo` as your DDP communication backend, please set correct network interface such as `export GLOO_SOCKET_IFNAME=eth0`. | ||
|
||
|
||
We provide examples with [gloo](../../../examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml) and [nccl](../../../examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml) as backend. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
protocolVersion: 2 | ||
name: imagenet-gloo_8ba8ed42_7606233c | ||
type: job | ||
jobRetryCount: 0 | ||
prerequisites: | ||
- type: dockerimage | ||
uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' | ||
name: docker_image_0 | ||
taskRoles: | ||
worker: | ||
instances: 2 | ||
completion: | ||
minFailedInstances: 1 | ||
taskRetryCount: 0 | ||
dockerImage: docker_image_0 | ||
resourcePerInstance: | ||
gpu: 4 | ||
cpu: 16 | ||
memoryMB: 32768 | ||
ports: | ||
SynPort: 1 | ||
commands: | ||
- export GLOO_SOCKET_IFNAME=eth0 | ||
- 'git clone https://github.com/NVIDIA/apex' | ||
- cd apex | ||
- python setup.py install | ||
- cd .. | ||
- apt update | ||
- apt install -y nfs-common | ||
- mkdir -p /mnt/data | ||
- 'mount 10.151.40.32:/mnt/zhiyuhe /mnt/data' | ||
- >- | ||
wget | ||
https://raw.githubusercontent.com/vvfreesoul/pai/master/examples/Distributed-example/Lite-imagenet-single-mul-DDP-nccl-gloo.py | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
- >- | ||
python Lite-imagenet-single-mul-DDP-nccl-gloo.py -n 2 -g 4 | ||
--dist-backend gloo --epochs 2 /mnt/data/imagenet/unzipped | ||
defaults: | ||
virtualCluster: default | ||
extras: | ||
com.microsoft.pai.runtimeplugin: | ||
- plugin: ssh | ||
parameters: | ||
jobssh: true | ||
userssh: {} | ||
hivedScheduler: | ||
taskRoles: | ||
worker: | ||
skuNum: 1 | ||
skuType: null |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import os | ||
from datetime import datetime | ||
import argparse | ||
import torch.multiprocessing as mp | ||
import torch.backends.cudnn as cudnn | ||
import torchvision | ||
import torchvision.transforms as transforms | ||
import torch | ||
import torch.nn as nn | ||
import torch.nn.functional as F | ||
import torch.distributed as dist | ||
from apex.parallel import DistributedDataParallel as DDP | ||
from apex import amp | ||
|
||
import torchvision.datasets as datasets | ||
import torchvision.models as models | ||
model_names = sorted(name for name in models.__dict__ | ||
if name.islower() and not name.startswith("__") | ||
and callable(models.__dict__[name])) | ||
def main(): | ||
print('run main') | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('data', metavar='DIR', | ||
help='path to dataset') | ||
parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', | ||
choices=model_names, | ||
help='model architecture: ' + | ||
' | '.join(model_names) + | ||
' (default: resnet18)') | ||
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N', | ||
help='number of data loading workers (default: 4)') | ||
parser.add_argument('-g', '--gpus', default=1, type=int, | ||
help='number of gpus per node') | ||
parser.add_argument('-nr', '--nr', default=0, type=int, | ||
help='ranking within the nodes') | ||
parser.add_argument('-b', '--batch-size', default=256, type=int, | ||
metavar='N', | ||
help='mini-batch size (default: 256), this is the total ' | ||
'batch size of all GPUs on the current node when ' | ||
'using Data Parallel or Distributed Data Parallel') | ||
parser.add_argument('--epochs', default=2, type=int, metavar='N', | ||
help='number of total epochs to run') | ||
parser.add_argument('--dist-backend', default='nccl', type=str, | ||
help='distributed backend') | ||
args = parser.parse_args() | ||
args.world_size = args.gpus * args.nodes | ||
print('world_size:',args.world_size) | ||
os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] | ||
os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] | ||
print('master:', os.environ['MASTER_ADDR'], 'port:', os.environ['MASTER_PORT']) | ||
mp.spawn(train, nprocs=args.gpus, args=(args,)) | ||
|
||
def train(gpu, args): | ||
print("start train") | ||
rank = int(os.environ['PAI_TASK_INDEX']) * args.gpus + gpu | ||
dist.init_process_group(backend=args.dist_backend, init_method='env://', world_size=args.world_size, rank=rank) | ||
torch.manual_seed(0) | ||
model=model = models.__dict__[args.arch]() | ||
torch.cuda.set_device(gpu) | ||
model.cuda(gpu) | ||
batch_size = args.batch_size | ||
# define loss function (criterion) and optimizer | ||
criterion = nn.CrossEntropyLoss().cuda(gpu) | ||
optimizer = torch.optim.SGD(model.parameters(), 1e-4) | ||
# Wrap the model | ||
model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) | ||
# Data loading code | ||
traindir = os.path.join(args.data, 'train') | ||
valdir = os.path.join(args.data, 'val') | ||
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], | ||
std=[0.229, 0.224, 0.225]) | ||
train_dataset = datasets.ImageFolder( | ||
traindir, | ||
transforms.Compose([ | ||
transforms.RandomResizedCrop(224), | ||
transforms.RandomHorizontalFlip(), | ||
transforms.ToTensor(), | ||
normalize, | ||
])) | ||
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) | ||
|
||
train_loader = torch.utils.data.DataLoader( | ||
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), | ||
num_workers=args.nodes, pin_memory=True, sampler=train_sampler) | ||
|
||
val_loader = torch.utils.data.DataLoader( | ||
datasets.ImageFolder(valdir, transforms.Compose([ | ||
transforms.Resize(256), | ||
transforms.CenterCrop(224), | ||
transforms.ToTensor(), | ||
normalize, | ||
])), | ||
batch_size=args.batch_size, shuffle=False, | ||
num_workers=args.nodes, pin_memory=True) | ||
start = datetime.now() | ||
total_step = len(train_loader) | ||
for epoch in range(args.epochs): | ||
for i, (images, labels) in enumerate(train_loader): | ||
images = images.cuda(non_blocking=True) | ||
labels = labels.cuda(non_blocking=True) | ||
# Forward pass | ||
outputs = model(images) | ||
loss = criterion(outputs, labels) | ||
|
||
# Backward and optimize | ||
optimizer.zero_grad() | ||
loss.backward() | ||
optimizer.step() | ||
#if (i + 1) % 100 == 0 and gpu == 0: | ||
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step, | ||
loss.item())) | ||
if gpu == 0: | ||
print("Training complete in: " + str(datetime.now() - start)) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import os | ||
from datetime import datetime | ||
import argparse | ||
import torch.multiprocessing as mp | ||
import torch.backends.cudnn as cudnn | ||
import torchvision | ||
import torchvision.transforms as transforms | ||
import torch | ||
import torch.nn as nn | ||
import torch.nn.functional as F | ||
import torch.distributed as dist | ||
from apex.parallel import DistributedDataParallel as DDP | ||
from apex import amp | ||
|
||
import torchvision.datasets as datasets | ||
import torchvision.models as models | ||
model_names = sorted(name for name in models.__dict__ | ||
if name.islower() and not name.startswith("__") | ||
and callable(models.__dict__[name])) | ||
def main(): | ||
print('run main') | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('data', metavar='DIR', | ||
help='path to dataset') | ||
parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', | ||
choices=model_names, | ||
help='model architecture: ' + | ||
' | '.join(model_names) + | ||
' (default: resnet18)') | ||
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N', | ||
help='number of data loading workers (default: 4)') | ||
parser.add_argument('-g', '--gpus', default=1, type=int, | ||
help='number of gpus per node') | ||
parser.add_argument('-nr', '--nr', default=0, type=int, | ||
help='ranking within the nodes') | ||
parser.add_argument('-b', '--batch-size', default=256, type=int, | ||
metavar='N', | ||
help='mini-batch size (default: 256), this is the total ' | ||
'batch size of all GPUs on the current node when ' | ||
'using Data Parallel or Distributed Data Parallel') | ||
parser.add_argument('--epochs', default=2, type=int, metavar='N', | ||
help='number of total epochs to run') | ||
parser.add_argument('--dist-backend', default='nccl', type=str, | ||
help='distributed backend') | ||
args = parser.parse_args() | ||
args.world_size = args.gpus * args.nodes | ||
print('world_size:',args.world_size) | ||
os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] | ||
os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] | ||
print('master:', os.environ['MASTER_ADDR'], 'port:', os.environ['MASTER_PORT']) | ||
mp.spawn(train, nprocs=args.gpus, args=(args,)) | ||
|
||
def train(gpu, args): | ||
print("start train") | ||
rank = int(os.environ['PAI_TASK_INDEX']) * args.gpus + gpu | ||
dist.init_process_group(backend=args.dist_backend, init_method='env://', world_size=args.world_size, rank=rank) | ||
torch.manual_seed(0) | ||
model=model = models.__dict__[args.arch]() | ||
torch.cuda.set_device(gpu) | ||
model.cuda(gpu) | ||
batch_size = args.batch_size | ||
# define loss function (criterion) and optimizer | ||
criterion = nn.CrossEntropyLoss().cuda(gpu) | ||
optimizer = torch.optim.SGD(model.parameters(), 1e-4) | ||
# Wrap the model | ||
model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) | ||
# Wrap the model | ||
model, optimizer = amp.initialize(model, optimizer, opt_level='O2') | ||
model = DDP(model) | ||
# Data loading code | ||
traindir = os.path.join(args.data, 'train') | ||
valdir = os.path.join(args.data, 'val') | ||
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], | ||
std=[0.229, 0.224, 0.225]) | ||
|
||
train_dataset = datasets.ImageFolder( | ||
traindir, | ||
transforms.Compose([ | ||
transforms.RandomResizedCrop(224), | ||
transforms.RandomHorizontalFlip(), | ||
transforms.ToTensor(), | ||
normalize, | ||
])) | ||
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) | ||
|
||
train_loader = torch.utils.data.DataLoader( | ||
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), | ||
num_workers=args.nodes, pin_memory=True, sampler=train_sampler) | ||
|
||
val_loader = torch.utils.data.DataLoader( | ||
datasets.ImageFolder(valdir, transforms.Compose([ | ||
transforms.Resize(256), | ||
transforms.CenterCrop(224), | ||
transforms.ToTensor(), | ||
normalize, | ||
])), | ||
batch_size=args.batch_size, shuffle=False, | ||
num_workers=args.nodes, pin_memory=True) | ||
start = datetime.now() | ||
total_step = len(train_loader) | ||
for epoch in range(args.epochs): | ||
for i, (images, labels) in enumerate(train_loader): | ||
images = images.cuda(non_blocking=True) | ||
labels = labels.cuda(non_blocking=True) | ||
# Forward pass | ||
outputs = model(images) | ||
loss = criterion(outputs, labels) | ||
|
||
# Backward and optimize | ||
optimizer.zero_grad() | ||
with amp.scale_loss(loss, optimizer) as scaled_loss: | ||
scaled_loss.backward() | ||
loss.backward() | ||
optimizer.step() | ||
#if (i + 1) % 100 == 0 and gpu == 0: | ||
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step, | ||
loss.item())) | ||
if gpu == 0: | ||
print("Training complete in: " + str(datetime.now() - start)) | ||
|
||
if __name__ == '__main__': | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how-to-use-advanced-job-settings.html#multiple-task-roles
->how-to-use-advanced-job-settings.md#multiple-task-roles