From c7eb199b2e94292fc14b10cb75efd0b4bda47e56 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 10 Jan 2018 16:13:10 +0800 Subject: [PATCH 1/5] Init commit --- .../paddle/v2/fluid/tests/test_parallel_op.py | 108 +++++++++++------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 59ed041e7fa1d..3736d5ea5a5c5 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -1,45 +1,73 @@ import unittest - -import paddle.v2.fluid.layers as layers import paddle.v2.fluid as fluid -from paddle.v2.fluid.framework import Program -from paddle.v2.fluid.executor import Executor -from paddle.v2.fluid.backward import append_backward -import numpy as np -import paddle.v2.fluid.core as core - - -class ParallelOpTest(unittest.TestCase): - def setUp(self): - x = layers.data( - shape=[-1, 30, 40], - dtype='float32', - name='x', - append_batch_size=False, - stop_gradient=False) - - places = layers.get_places(device_count=4) - pd = layers.ParallelDo(places=places) - - with pd.do(): - data = pd.read_input(x) - hidden = layers.fc(input=data, size=7) - pd.write_output(hidden) - data = pd() - loss = layers.mean(x=data) - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(loss) - - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(fluid.default_startup_program()) - exe.run(fluid.default_main_program(), - feed={ - x.name: np.random.uniform(0.1, 0.6, - (20, 30, 40)).astype("float32") - }) - - def test_forward(self): - pass +import numpy + + +class BaseParallelForTest(unittest.TestCase): + def main(self, callback, feed, fetch): + cpu = fluid.CPUPlace() + result_cpu = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=cpu, + use_parallel=False) + print result_cpu + + def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): + main = fluid.Program() + startup = fluid.Program() + # Fix seed + main.random_seed = 10 + startup.random_seed = 10 + + with fluid.program_guard(main, startup): + generator = callback() + # Automatically insert parallel do if use_parallel = True + if use_parallel: + places = fluid.layers.get_places() + pd = fluid.layers.ParallelDo(places) + data = next(generator) + + if isinstance(data, fluid.Variable): + data = [data] + with pd.do(): + ins = map(pd.read_input, data) + if len(ins) == 1: + ins = ins[0] + generator.send(ins) # patch input + loss = next(generator) + pd.write_output(loss) + + loss = pd() + else: + data = next(generator) + generator.send(data) + loss = next(generator) + + avg_loss = fluid.layers.mean(x=loss) + fluid.backward.append_backward(loss=avg_loss) + + exe = fluid.Executor(place) + exe.run(startup) + return exe.run(main, feed=feed, fetch_list=fetch) + + +class ParallelOpTest(BaseParallelForTest): + def test_simple_fc(self): + def __network__(): + x = fluid.layers.data(shape=[784], dtype='float32', name='img') + x = yield x + hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') + loss = fluid.layers.mean(x=hidden) + yield loss + + self.main( + callback=__network__, + feed={ + 'img': numpy.random.random(size=(128, 784)).astype('float32') + }, + fetch='fc1.w@GRAD') if __name__ == '__main__': From 2f56995f7c97e0da8bc63406a1fc42f43815135f Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 10 Jan 2018 16:15:54 +0800 Subject: [PATCH 2/5] Fix InitGLOG glog will not hold ARGV[0] inside. --- paddle/framework/init.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paddle/framework/init.cc b/paddle/framework/init.cc index e12bac1d78e3f..4ef82a541efaa 100644 --- a/paddle/framework/init.cc +++ b/paddle/framework/init.cc @@ -11,6 +11,7 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include // for strdup #include #include @@ -60,7 +61,9 @@ void InitDevices() { } void InitGLOG(const std::string &prog_name) { - google::InitGoogleLogging(prog_name.c_str()); + // glog will not hold the ARGV[0] inside. + // Use strdup to alloc a new string. + google::InitGoogleLogging(strdup(prog_name.c_str())); google::InstallFailureSignalHandler(); } From 2412f2f4124250ef0a0b8eb66efbfa08900c5d74 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 10 Jan 2018 16:37:42 +0800 Subject: [PATCH 3/5] Polish Unittest for ParallelFor --- .../paddle/v2/fluid/tests/test_parallel_op.py | 50 ++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 3736d5ea5a5c5..049ae0fe28d35 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -12,9 +12,34 @@ def main(self, callback, feed, fetch): fetch=fetch, place=cpu, use_parallel=False) - print result_cpu + result_cpu_parallel = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=cpu, + use_parallel=True) + if fluid.core.is_compile_gpu(): + gpu = fluid.CUDAPlace(0) + result_gpu = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=gpu, + use_parallel=False) + result_gpu_parallel = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=gpu, + use_parallel=True) + self._assert_same_(fetch, result_cpu, result_cpu_parallel, + result_gpu, result_gpu_parallel) + else: + self._assert_same_(fetch, result_cpu, result_cpu_parallel) def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): + if isinstance(fetch, basestring): + fetch = [fetch] main = fluid.Program() startup = fluid.Program() # Fix seed @@ -31,20 +56,19 @@ def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): if isinstance(data, fluid.Variable): data = [data] + with pd.do(): ins = map(pd.read_input, data) if len(ins) == 1: ins = ins[0] - generator.send(ins) # patch input - loss = next(generator) + loss = generator.send(ins) # patch input pd.write_output(loss) loss = pd() else: data = next(generator) - generator.send(data) - loss = next(generator) - + loss = generator.send(data) + self.assertIsNotNone(loss) avg_loss = fluid.layers.mean(x=loss) fluid.backward.append_backward(loss=avg_loss) @@ -52,11 +76,25 @@ def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): exe.run(startup) return exe.run(main, feed=feed, fetch_list=fetch) + def _assert_same_(self, fetch, *args): + def _impl_(a, b, fetch_id, item_id): + item_str = ['CPU', 'ParallelCPU', 'GPU', 'ParallelGPU'] + flag = numpy.allclose(a, b, rtol=0.1) + self.assertTrue(flag, "The {0} are different in {1}".format( + fetch[fetch_id], item_str[item_id])) + + for i, items in enumerate(zip(*args)): + self.assertGreater(len(items), 0) + for j in range(1, len(items)): + _impl_(items[0], items[j], fetch_id=i, item_id=j) + class ParallelOpTest(BaseParallelForTest): def test_simple_fc(self): def __network__(): x = fluid.layers.data(shape=[784], dtype='float32', name='img') + # FIXME: This is a bug of parallel.do + x.stop_gradient = False x = yield x hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') loss = fluid.layers.mean(x=hidden) From 12aca860bf24ddc05a06722c1fc11ff3cfedc893 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Thu, 11 Jan 2018 11:03:54 +0800 Subject: [PATCH 4/5] Add comment --- .../paddle/v2/fluid/tests/test_parallel_op.py | 58 ++++++++++++++++--- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 049ae0fe28d35..dde7206e4fd8d 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -4,15 +4,35 @@ class BaseParallelForTest(unittest.TestCase): - def main(self, callback, feed, fetch): + def run_test(self, callback, feed, fetch): + """ + Run the unittest for parallel.for + Args: + callback(callable): A callable function returns a generator. There + are two yields in the generator function. The first yield + returns the data layers, and the second yield returns the loss. + The modified data variables will be sent back during the first + yield. + + feed(dict): The executor feeding dictionary. + fetch(list|basestr): The fetch name lists. + + Returns: + None + + Raises: + AssertionError when the computation of cpu, parallel.for in cpu, + gpu, parallel.for in gpu are different. + + """ cpu = fluid.CPUPlace() - result_cpu = self._main_impl_( + result_cpu = self._run_test_impl_( callback=callback, feed=feed, fetch=fetch, place=cpu, use_parallel=False) - result_cpu_parallel = self._main_impl_( + result_cpu_parallel = self._run_test_impl_( callback=callback, feed=feed, fetch=fetch, @@ -20,13 +40,13 @@ def main(self, callback, feed, fetch): use_parallel=True) if fluid.core.is_compile_gpu(): gpu = fluid.CUDAPlace(0) - result_gpu = self._main_impl_( + result_gpu = self._run_test_impl_( callback=callback, feed=feed, fetch=fetch, place=gpu, use_parallel=False) - result_gpu_parallel = self._main_impl_( + result_gpu_parallel = self._run_test_impl_( callback=callback, feed=feed, fetch=fetch, @@ -37,7 +57,17 @@ def main(self, callback, feed, fetch): else: self._assert_same_(fetch, result_cpu, result_cpu_parallel) - def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): + def _run_test_impl_(self, callback, feed, fetch, place, use_parallel=False): + """ + Run a single test, returns the fetch values + Args: + place(Place): the computation place. + use_parallel(bool): Whether use parallel.for or not. + + Returns: + Fetched numpy arrays. + + """ if isinstance(fetch, basestring): fetch = [fetch] main = fluid.Program() @@ -77,6 +107,20 @@ def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): return exe.run(main, feed=feed, fetch_list=fetch) def _assert_same_(self, fetch, *args): + """ + Assert the return values of `run_test` are same. + Args: + fetch: Fetch list. Used for print error message + *args: The fetch result lists of each situations. + + Returns: + None + + Raises: + AssertionError + + """ + def _impl_(a, b, fetch_id, item_id): item_str = ['CPU', 'ParallelCPU', 'GPU', 'ParallelGPU'] flag = numpy.allclose(a, b, rtol=0.1) @@ -100,7 +144,7 @@ def __network__(): loss = fluid.layers.mean(x=hidden) yield loss - self.main( + self.run_test( callback=__network__, feed={ 'img': numpy.random.random(size=(128, 784)).astype('float32') From 83c72536e662b5e8be7eb1a3e69a4f02ac584b20 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Thu, 11 Jan 2018 12:35:18 +0800 Subject: [PATCH 5/5] Update batch_size --- python/paddle/v2/fluid/tests/test_parallel_op.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index dde7206e4fd8d..2b51a1f50473d 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -147,7 +147,8 @@ def __network__(): self.run_test( callback=__network__, feed={ - 'img': numpy.random.random(size=(128, 784)).astype('float32') + 'img': + numpy.random.random(size=(128 * 3, 784)).astype('float32') }, fetch='fc1.w@GRAD')