Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluid benchmark support recordio reader #11121

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/fluid/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ ADD *.whl /
RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s

ENV LD_LIBRARY_PATH=/usr/local/lib
ADD fluid_benchmark.py dataset.py models/ /workspace/
ADD fluid_benchmark.py recordio_converter.py models/ /workspace/
10 changes: 10 additions & 0 deletions benchmark/fluid/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ Currently supported `--model` argument include:
PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2
```

## Prepare the RecordIO file to Achieve Better Performance

Run the following command will generate RecordIO files like "mnist.recordio" under the path
and batch_size you choose, you can use batch_size=1 so that later reader can change the batch_size
at any time using `fluid.batch`.

```bash
python -c 'from recordio_converter import *; prepare_mnist("data", 1)'
```

## Run Distributed Benchmark on Kubernetes Cluster

You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will
Expand Down
115 changes: 87 additions & 28 deletions benchmark/fluid/fluid_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,23 @@ def parse_args():
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
'--batch_size',
type=int,
default=32,
help='The batch size on each gpu.')
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.')
'--iterations',
type=int,
default=80,
help='The number of minibatches, set to -1 to run all batches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
Expand All @@ -69,6 +74,7 @@ def parse_args():
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
Expand All @@ -78,7 +84,7 @@ def parse_args():
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers'],
choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--infer_only', action='store_true', help='If set, run forward only.')
Expand Down Expand Up @@ -108,6 +114,16 @@ def parse_args():
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--use_reader_op',
action='store_true',
help='Whether to use reader op, and must specify the data path if set this to true.'
)
parser.add_argument(
'--data_path',
type=str,
default="",
help='Directory that contains all the training recordio files.')
args = parser.parse_args()
return args

Expand Down Expand Up @@ -210,26 +226,50 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(startup_prog)
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)

if not args.use_reader_op:
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)

iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num):
train_losses = []
for batch_id, data in enumerate(train_reader()):
if not args.use_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True:
if not args.use_reader_op:
data = next(reader_generator, None)
if data == None:
break
if iters == args.iterations:
break
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
if iters == args.iterations:
break
loss = exe.run(train_prog,
feed=feeder.feed(data),
fetch_list=[avg_loss])

if args.use_reader_op:
try:
loss = exe.run(train_prog, fetch_list=[avg_loss])
except fluid.core.EnforceNotMet as ex:
break
else:
loss = exe.run(train_prog,
feed=feeder.feed(data),
fetch_list=[avg_loss])
iters += 1
num_samples += len(data)
batch_id += 1
# FIXME(wuyi): For use_reader_op, if the current
# pass is not the last, the last batch of this pass
# is also equal to args.batch_size.
if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
train_losses.append(loss)
print("Pass: %d, Iter: %d, Loss: %f\n" %
(pass_id, iters, np.mean(train_losses)))
Expand All @@ -250,10 +290,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
batch_acc, args, train_prog, startup_prog, nccl_id_var,
num_trainers, trainer_id):
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
]
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
if not args.use_reader_op:
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)

# generate fake:
if args.use_fake_data:
for var in feed_var_list:
Expand All @@ -270,7 +314,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
"value": 1.0,
"dtype": var.dtype})

place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
if nccl_id_var and trainer_id == 0:
#FIXME(wuyi): wait other trainer to start listening
time.sleep(30)
Expand All @@ -287,12 +330,21 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
num_trainers=num_trainers,
trainer_id=trainer_id)

feeder = fluid.DataFeeder(feed_var_list, place)
for pass_id in range(args.pass_num):
num_samples = 0
iters = 0
start_time = time.time()
for batch_id, data in enumerate(train_reader()):
if not args.use_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True:
if not args.use_reader_op:
data = next(reader_generator, None)
if data == None:
break
if iters == args.iterations:
break
if args.profile and pass_id == 0 and batch_id == 5:
profiler.start_profiler("All")
elif args.profile and pass_id == 0 and batch_id == 10:
Expand All @@ -301,19 +353,26 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
if iters == args.iterations:
break
if args.use_fake_data:
loss, = exe.run([avg_loss.name])
if args.use_fake_data or args.use_reader_op:
try:
loss, = exe.run([avg_loss.name])
except fluid.core.EnforceNotMet as ex:
break
else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
if args.update_method == "pserver":
exe.bcast_params()
num_samples += len(data)
if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
iters += 1
if batch_id % 1 == 0:
print("Pass %d, batch %d, loss %s" %
(pass_id, batch_id, np.array(loss)))
batch_id += 1
if args.use_reader_op:
num_samples = num_samples * args.gpus
print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc:
test_acc = test(startup_exe, infer_prog, test_reader, feeder,
Expand Down
4 changes: 3 additions & 1 deletion benchmark/fluid/models/machine_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor):


def get_model(args):
if args.use_reader_op:
raise Exception("machine_translation do not support reader op for now.")
embedding_dim = 512
encoder_size = 512
decoder_size = 512
Expand All @@ -221,7 +223,7 @@ def get_model(args):
train_batch_generator = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.wmt14.train(dict_size), buf_size=1000),
batch_size=args.batch_size)
batch_size=args.batch_size * args.gpus)

test_batch_generator = paddle.batch(
paddle.reader.shuffle(
Expand Down
24 changes: 20 additions & 4 deletions benchmark/fluid/models/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import argparse
import time
import cProfile
import os

import paddle
import paddle.fluid as fluid
Expand Down Expand Up @@ -65,9 +66,24 @@ def cnn_model(data):


def get_model(args):
# Input data
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.use_reader_op:
filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use glob to specify the files.

]
data_file = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1, 1, 28, 28], (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=args.gpus,
pass_num=args.pass_num)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

if args.device == 'CPU' and args.cpus > 1:
places = fluid.layers.get_places(args.cpus)
Expand Down Expand Up @@ -103,7 +119,7 @@ def get_model(args):

# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=args.batch_size)
paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=args.batch_size)
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc
59 changes: 44 additions & 15 deletions benchmark/fluid/models/resnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import functools
import numpy as np
import time
import os

import cProfile, pstats, StringIO

import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
from recordio_converter import imagenet_train, imagenet_test


def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'):
Expand Down Expand Up @@ -122,16 +124,48 @@ def get_model(args):
else:
dshape = [32, 32, 3]
model = resnet_cifar10
else:
train_reader = paddle.dataset.cifar.train10()
test_reader = paddle.dataset.cifar.test10()
elif args.data_set == "flowers":
class_dim = 102
if args.data_format == 'NCHW':
dshape = [3, 224, 224]
else:
dshape = [224, 224, 3]
model = resnet_imagenet

input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
train_reader = paddle.dataset.flowers.train()
test_reader = paddle.dataset.flowers.test()
elif args.data_set == "imagenet":
class_dim = 1000
if args.data_format == 'NCHW':
dshape = [3, 224, 224]
else:
dshape = [224, 224, 3]
model = resnet_imagenet
if not args.data_path:
raise Exception(
"Must specify --data_path when training with imagenet")
train_reader = imagenet_train(args.data_path)
test_reader = imagenet_test(args.data_path)

if args.use_reader_op:
filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
]
data_file = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1] + dshape, (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=args.gpus,
pass_num=args.pass_num)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file)
else:
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

if args.device == 'CPU' and args.cpus > 1:
places = fluid.layers.get_places(args.cpus)
Expand Down Expand Up @@ -162,15 +196,10 @@ def get_model(args):

optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)

train_reader = paddle.batch(
batched_train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.cifar.train10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(),
buf_size=5120),
batch_size=args.batch_size)
test_reader = paddle.batch(
paddle.dataset.cifar.test10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
batch_size=args.batch_size)

return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc
train_reader, buf_size=5120),
batch_size=args.batch_size * args.gpus)
batched_test_reader = paddle.batch(train_reader, batch_size=args.batch_size)

return avg_cost, inference_program, optimizer, batched_train_reader, batched_test_reader, batch_acc
Loading