diff --git a/benchmark/fluid/Dockerfile b/benchmark/fluid/Dockerfile index 8298fcf95a507..b9eaca5ee6b48 100644 --- a/benchmark/fluid/Dockerfile +++ b/benchmark/fluid/Dockerfile @@ -19,4 +19,4 @@ ADD *.whl / RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s ENV LD_LIBRARY_PATH=/usr/local/lib -ADD fluid_benchmark.py dataset.py models/ /workspace/ +ADD fluid_benchmark.py recordio_converter.py models/ /workspace/ diff --git a/benchmark/fluid/README.md b/benchmark/fluid/README.md index 1b0c7dce8bd6f..48280a5621b68 100644 --- a/benchmark/fluid/README.md +++ b/benchmark/fluid/README.md @@ -42,6 +42,16 @@ Currently supported `--model` argument include: PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2 ``` +## Prepare the RecordIO file to Achieve Better Performance + +Run the following command will generate RecordIO files like "mnist.recordio" under the path +and batch_size you choose, you can use batch_size=1 so that later reader can change the batch_size +at any time using `fluid.batch`. + +```bash +python -c 'from recordio_converter import *; prepare_mnist("data", 1)' +``` + ## Run Distributed Benchmark on Kubernetes Cluster You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index 49f26255f315c..bd0243aa609bb 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -38,10 +38,12 @@ def parse_args(): default='resnet', help='The model to run benchmark with.') parser.add_argument( - '--batch_size', type=int, default=32, help='The minibatch size.') + '--batch_size', + type=int, + default=32, + help='The batch size on each gpu.') parser.add_argument( '--learning_rate', type=float, default=0.001, help='The learning rate.') - # TODO(wuyi): add "--use_fake_data" option back. parser.add_argument( '--skip_batch_num', type=int, @@ -49,7 +51,10 @@ def parse_args(): help='The first num of minibatch num to skip, for better performance test' ) parser.add_argument( - '--iterations', type=int, default=80, help='The number of minibatches.') + '--iterations', + type=int, + default=80, + help='The number of minibatches, set to -1 to run all batches.') parser.add_argument( '--pass_num', type=int, default=100, help='The number of passes.') parser.add_argument( @@ -69,6 +74,7 @@ def parse_args(): type=int, default=1, help='If gpus > 1, will use ParallelExecutor to run, else use Executor.') + # this option is available only for vgg and resnet. parser.add_argument( '--cpus', type=int, @@ -78,7 +84,7 @@ def parse_args(): '--data_set', type=str, default='flowers', - choices=['cifar10', 'flowers'], + choices=['cifar10', 'flowers', 'imagenet'], help='Optional dataset for benchmark.') parser.add_argument( '--infer_only', action='store_true', help='If set, run forward only.') @@ -108,6 +114,16 @@ def parse_args(): default='local', choices=['local', 'pserver', 'nccl2'], help='Choose parameter update method, can be local, pserver, nccl2.') + parser.add_argument( + '--use_reader_op', + action='store_true', + help='Whether to use reader op, and must specify the data path if set this to true.' + ) + parser.add_argument( + '--data_path', + type=str, + default="", + help='Directory that contains all the training recordio files.') args = parser.parse_args() return args @@ -210,26 +226,50 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) exe = fluid.Executor(place) exe.run(startup_prog) - feed_var_list = [ - var for var in train_prog.global_block().vars.itervalues() - if var.is_data - ] - feeder = fluid.DataFeeder(feed_var_list, place) + + if not args.use_reader_op: + feed_var_list = [ + var for var in train_prog.global_block().vars.itervalues() + if var.is_data + ] + feeder = fluid.DataFeeder(feed_var_list, place) iters, num_samples, start_time = 0, 0, time.time() for pass_id in range(args.pass_num): train_losses = [] - for batch_id, data in enumerate(train_reader()): + if not args.use_reader_op: + reader_generator = train_reader() + batch_id = 0 + data = None + while True: + if not args.use_reader_op: + data = next(reader_generator, None) + if data == None: + break + if iters == args.iterations: + break if iters == args.skip_batch_num: start_time = time.time() num_samples = 0 - if iters == args.iterations: - break - loss = exe.run(train_prog, - feed=feeder.feed(data), - fetch_list=[avg_loss]) + + if args.use_reader_op: + try: + loss = exe.run(train_prog, fetch_list=[avg_loss]) + except fluid.core.EnforceNotMet as ex: + break + else: + loss = exe.run(train_prog, + feed=feeder.feed(data), + fetch_list=[avg_loss]) iters += 1 - num_samples += len(data) + batch_id += 1 + # FIXME(wuyi): For use_reader_op, if the current + # pass is not the last, the last batch of this pass + # is also equal to args.batch_size. + if args.use_reader_op: + num_samples += args.batch_size * args.gpus + else: + num_samples += len(data) train_losses.append(loss) print("Pass: %d, Iter: %d, Loss: %f\n" % (pass_id, iters, np.mean(train_losses))) @@ -250,10 +290,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, args, train_prog, startup_prog, nccl_id_var, num_trainers, trainer_id): - feed_var_list = [ - var for var in train_prog.global_block().vars.itervalues() - if var.is_data - ] + place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) + if not args.use_reader_op: + feed_var_list = [ + var for var in train_prog.global_block().vars.itervalues() + if var.is_data + ] + feeder = fluid.DataFeeder(feed_var_list, place) + # generate fake: if args.use_fake_data: for var in feed_var_list: @@ -270,7 +314,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, "value": 1.0, "dtype": var.dtype}) - place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) if nccl_id_var and trainer_id == 0: #FIXME(wuyi): wait other trainer to start listening time.sleep(30) @@ -287,12 +330,21 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, num_trainers=num_trainers, trainer_id=trainer_id) - feeder = fluid.DataFeeder(feed_var_list, place) for pass_id in range(args.pass_num): num_samples = 0 iters = 0 start_time = time.time() - for batch_id, data in enumerate(train_reader()): + if not args.use_reader_op: + reader_generator = train_reader() + batch_id = 0 + data = None + while True: + if not args.use_reader_op: + data = next(reader_generator, None) + if data == None: + break + if iters == args.iterations: + break if args.profile and pass_id == 0 and batch_id == 5: profiler.start_profiler("All") elif args.profile and pass_id == 0 and batch_id == 10: @@ -301,19 +353,26 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, if iters == args.skip_batch_num: start_time = time.time() num_samples = 0 - if iters == args.iterations: - break - if args.use_fake_data: - loss, = exe.run([avg_loss.name]) + if args.use_fake_data or args.use_reader_op: + try: + loss, = exe.run([avg_loss.name]) + except fluid.core.EnforceNotMet as ex: + break else: loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) if args.update_method == "pserver": exe.bcast_params() - num_samples += len(data) + if args.use_reader_op: + num_samples += args.batch_size * args.gpus + else: + num_samples += len(data) iters += 1 if batch_id % 1 == 0: print("Pass %d, batch %d, loss %s" % (pass_id, batch_id, np.array(loss))) + batch_id += 1 + if args.use_reader_op: + num_samples = num_samples * args.gpus print_train_time(start_time, time.time(), num_samples) if not args.no_test and batch_acc: test_acc = test(startup_exe, infer_prog, test_reader, feeder, diff --git a/benchmark/fluid/models/machine_translation.py b/benchmark/fluid/models/machine_translation.py index 635b3373dd27b..69541adf6b7e5 100644 --- a/benchmark/fluid/models/machine_translation.py +++ b/benchmark/fluid/models/machine_translation.py @@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor): def get_model(args): + if args.use_reader_op: + raise Exception("machine_translation do not support reader op for now.") embedding_dim = 512 encoder_size = 512 decoder_size = 512 @@ -221,7 +223,7 @@ def get_model(args): train_batch_generator = paddle.batch( paddle.reader.shuffle( paddle.dataset.wmt14.train(dict_size), buf_size=1000), - batch_size=args.batch_size) + batch_size=args.batch_size * args.gpus) test_batch_generator = paddle.batch( paddle.reader.shuffle( diff --git a/benchmark/fluid/models/mnist.py b/benchmark/fluid/models/mnist.py index 28a38a931cf6c..8e740dc6896b7 100644 --- a/benchmark/fluid/models/mnist.py +++ b/benchmark/fluid/models/mnist.py @@ -20,6 +20,7 @@ import argparse import time import cProfile +import os import paddle import paddle.fluid as fluid @@ -65,9 +66,24 @@ def cnn_model(data): def get_model(args): - # Input data - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + data_file = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1, 1, 28, 28], (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=args.gpus, + pass_num=args.pass_num) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + images, label = fluid.layers.read_file(data_file) + else: + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') if args.device == 'CPU' and args.cpus > 1: places = fluid.layers.get_places(args.cpus) @@ -103,7 +119,7 @@ def get_model(args): # Reader train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=args.batch_size) + paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus) test_reader = paddle.batch( paddle.dataset.mnist.test(), batch_size=args.batch_size) return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc diff --git a/benchmark/fluid/models/resnet.py b/benchmark/fluid/models/resnet.py index f951f73a35dc4..2ee2b5be09bfc 100644 --- a/benchmark/fluid/models/resnet.py +++ b/benchmark/fluid/models/resnet.py @@ -19,6 +19,7 @@ import functools import numpy as np import time +import os import cProfile, pstats, StringIO @@ -26,6 +27,7 @@ import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.profiler as profiler +from recordio_converter import imagenet_train, imagenet_test def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): @@ -122,16 +124,48 @@ def get_model(args): else: dshape = [32, 32, 3] model = resnet_cifar10 - else: + train_reader = paddle.dataset.cifar.train10() + test_reader = paddle.dataset.cifar.test10() + elif args.data_set == "flowers": class_dim = 102 if args.data_format == 'NCHW': dshape = [3, 224, 224] else: dshape = [224, 224, 3] model = resnet_imagenet - - input = fluid.layers.data(name='data', shape=dshape, dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + train_reader = paddle.dataset.flowers.train() + test_reader = paddle.dataset.flowers.test() + elif args.data_set == "imagenet": + class_dim = 1000 + if args.data_format == 'NCHW': + dshape = [3, 224, 224] + else: + dshape = [224, 224, 3] + model = resnet_imagenet + if not args.data_path: + raise Exception( + "Must specify --data_path when training with imagenet") + train_reader = imagenet_train(args.data_path) + test_reader = imagenet_test(args.data_path) + + if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + data_file = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1] + dshape, (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=args.gpus, + pass_num=args.pass_num) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + input, label = fluid.layers.read_file(data_file) + else: + input = fluid.layers.data(name='data', shape=dshape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') if args.device == 'CPU' and args.cpus > 1: places = fluid.layers.get_places(args.cpus) @@ -162,15 +196,10 @@ def get_model(args): optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) - train_reader = paddle.batch( + batched_train_reader = paddle.batch( paddle.reader.shuffle( - paddle.dataset.cifar.train10() - if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), - buf_size=5120), - batch_size=args.batch_size) - test_reader = paddle.batch( - paddle.dataset.cifar.test10() - if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), - batch_size=args.batch_size) - - return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc + train_reader, buf_size=5120), + batch_size=args.batch_size * args.gpus) + batched_test_reader = paddle.batch(train_reader, batch_size=args.batch_size) + + return avg_cost, inference_program, optimizer, batched_train_reader, batched_test_reader, batch_acc diff --git a/benchmark/fluid/models/stacked_dynamic_lstm.py b/benchmark/fluid/models/stacked_dynamic_lstm.py index 1b680d76a8ba1..e1c4857f1a365 100644 --- a/benchmark/fluid/models/stacked_dynamic_lstm.py +++ b/benchmark/fluid/models/stacked_dynamic_lstm.py @@ -44,6 +44,9 @@ def __impl__(): def get_model(args): + if args.use_reader_op: + raise Exception( + "stacked_dynamic_lstm do not support reader op for now.") lstm_size = 512 emb_dim = 512 crop_size = 1500 @@ -114,7 +117,7 @@ def gate_common( train_reader = batch( paddle.reader.shuffle( crop_sentence(imdb.train(word_dict), crop_size), buf_size=25000), - batch_size=args.batch_size) + batch_size=args.batch_size * args.gpus) test_reader = batch( paddle.reader.shuffle( crop_sentence(imdb.test(word_dict), crop_size), buf_size=25000), diff --git a/benchmark/fluid/models/vgg.py b/benchmark/fluid/models/vgg.py index 53856c5f7acd3..6092cdeb884b3 100644 --- a/benchmark/fluid/models/vgg.py +++ b/benchmark/fluid/models/vgg.py @@ -22,6 +22,7 @@ import paddle.fluid.core as core import argparse import functools +import os def vgg16_bn_drop(input): @@ -65,9 +66,24 @@ def get_model(args): else: data_shape = [224, 224, 3] - # Input data - images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + data_file = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1] + data_shape, (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=args.gpus, + pass_num=args.pass_num) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + images, label = fluid.layers.read_file(data_file) + else: + images = fluid.layers.data(name='data', shape=dshape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') # Train program net = vgg16_bn_drop(images) @@ -95,7 +111,7 @@ def get_model(args): paddle.dataset.cifar.train10() if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), buf_size=5120), - batch_size=args.batch_size) + batch_size=args.batch_size * args.gpus) test_reader = paddle.batch( paddle.dataset.cifar.test10() if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), diff --git a/benchmark/fluid/recordio_converter.py b/benchmark/fluid/recordio_converter.py new file mode 100644 index 0000000000000..f2dc39109bf1b --- /dev/null +++ b/benchmark/fluid/recordio_converter.py @@ -0,0 +1,164 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import random +import paddle +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.dataset import mnist, cifar, flowers, image + + +def convert_2_recordio(py_reader, outfilepath, batch_size, shape_data, + shape_label): + num_batches = 0 + with fluid.program_guard(fluid.Program(), fluid.Program()): + reader = paddle.batch(py_reader(), batch_size=batch_size) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=shape_data), + fluid.layers.data( + name='label', shape=shape_label, dtype='int64'), + ], + place=fluid.CPUPlace()) + num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( + outfilepath, reader, feeder) + return num_batches + + +def prepare_mnist(outpath, batch_size): + outfilepath = os.path.join(outpath, "mnist.recordio") + convert_2_recordio(mnist.train, outfilepath, batch_size, [784], [1]) + + +def prepare_cifar10(outpath, batch_size): + outfilepath = os.path.join(outpath, "cifar.recordio") + convert_2_recordio(cifar.train10, outfilepath, batch_size, [3, 32, 32], [1]) + + +def prepare_flowers(outpath, batch_size): + outfilepath = os.path.join(outpath, "flowers.recordio") + convert_2_recordio(flowers.train, outfilepath, batch_size, [3, 224, 224], + [1]) + + +def default_mapper(sample): + img, label = sample + img = image.simple_transform( + img, 256, 224, True, mean=[103.94, 116.78, 123.68]) + return img.flatten().astype('float32'), label + + +def imagenet_train(data_dir): + contents = os.listdir(data_dir) + if set(contents) != set( + ["train", "train.txt", "val", "val_set", "val.txt", "unzip.sh"]): + raise Exception("Imagenet data contents error!") + img2label = dict() + imgfilelist = [] + with open(os.path.join(data_dir, "train.txt")) as fn: + while 1: + l = fn.readline() + if not l: + break + img, lbl = l[:-1].split(" ") + img2label[img] = int(lbl) + imgfilelist.append(img) + # shuffle all, this is slow + random.shuffle(imgfilelist) + + def train_reader(): + for idx, imgfile in enumerate(imgfilelist): + data = image.load_image( + os.path.join(data_dir, "train", imgfile.lower())) + label = [img2label[imgfile], ] + yield [data, label] + + return paddle.reader.map_readers(default_mapper, train_reader) + + +def imagenet_test(data_dir): + contents = os.listdir(data_dir) + if set(contents) != set( + ["train", "train.txt", "val", "val_set", "val.txt", "unzip.sh"]): + raise Exception("Imagenet data contents error!") + img2label = dict() + imgfilelist = [] + with open(os.path.join(data_dir, "val.txt")) as fn: + while 1: + l = fn.readline() + if not l: + break + img, lbl = l[:-1].split(" ") + img2label[img] = int(lbl) + imgfilelist.append(img) + + def test_reader(): + for idx, imgfile in enumerate(imgfilelist): + base_path = os.path.join(data_dir, "val", imgfile.split(".")[0]) + image_path = ".".join([base_path, "jpeg"]) + data = image.load_image(image_path) + label = [img2label[imgfile], ] + yield [data, label] + + return paddle.reader.map_readers(default_mapper, test_reader) + + +# FIXME(wuyi): delete this when https://github.com/PaddlePaddle/Paddle/pull/11066 is merged +def convert_reader_to_recordio_files( + filename, + batch_per_file, + reader_creator, + feeder, + compressor=core.RecordIOWriter.Compressor.Snappy, + max_num_records=1000, + feed_order=None): + if feed_order is None: + feed_order = feeder.feed_names + f_name, f_ext = os.path.splitext(filename) + assert (f_ext == ".recordio") + + lines = [] + f_idx = 0 + counter = 0 + for idx, batch in enumerate(reader_creator()): + lines.append(batch) + if idx >= batch_per_file and idx % batch_per_file == 0: + filename = "%s-%05d%s" % (f_name, f_idx, f_ext) + with fluid.recordio_writer.create_recordio_writer( + filename, compressor, max_num_records) as writer: + for l in lines: + res = feeder.feed(l) + for each in feed_order: + writer.append_tensor(res[each]) + writer.complete_append_tensor() + counter += 1 + lines = [] + f_idx += 1 + print("written file: ", filename) + return counter + + +def prepare_imagenet(inpath, outpath, batch_size): + r = paddle.batch(imagenet_train(inpath), batch_size=batch_size) + feeder = fluid.DataFeeder( + feed_list=[ + fluid.layers.data( + name="image", shape=[3, 224, 224]), fluid.layers.data( + name="label", shape=[1], dtype='int64') + ], + place=fluid.CPUPlace()) + outpath = os.path.join(outpath, "imagenet.recordio") + convert_reader_to_recordio_files(outpath, 10000, r, feeder) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 8758ac9f94ab9..a56f3ea9db6b9 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -434,7 +434,7 @@ def open_files(filenames, shapes, lod_levels, dtypes, - thread_num, + thread_num=1, buffer_size=None, pass_num=1, for_parallel=True):