Skip to content

Commit

Permalink
Merge pull request #7396 from reyoung/feature/parallel_for_unittest
Browse files Browse the repository at this point in the history
Feature/parallel for unittest
  • Loading branch information
reyoung authored Jan 11, 2018
2 parents 95c0c12 + 83c7253 commit a9dbdab
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 41 deletions.
5 changes: 4 additions & 1 deletion paddle/framework/init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <string.h> // for strdup
#include <algorithm>
#include <string>

Expand Down Expand Up @@ -60,7 +61,9 @@ void InitDevices() {
}

void InitGLOG(const std::string &prog_name) {
google::InitGoogleLogging(prog_name.c_str());
// glog will not hold the ARGV[0] inside.
// Use strdup to alloc a new string.
google::InitGoogleLogging(strdup(prog_name.c_str()));
google::InstallFailureSignalHandler();
}

Expand Down
191 changes: 151 additions & 40 deletions python/paddle/v2/fluid/tests/test_parallel_op.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,156 @@
import unittest

import paddle.v2.fluid.layers as layers
import paddle.v2.fluid as fluid
from paddle.v2.fluid.framework import Program
from paddle.v2.fluid.executor import Executor
from paddle.v2.fluid.backward import append_backward
import numpy as np
import paddle.v2.fluid.core as core


class ParallelOpTest(unittest.TestCase):
def setUp(self):
x = layers.data(
shape=[-1, 30, 40],
dtype='float32',
name='x',
append_batch_size=False,
stop_gradient=False)

places = layers.get_places(device_count=4)
pd = layers.ParallelDo(places=places)

with pd.do():
data = pd.read_input(x)
hidden = layers.fc(input=data, size=7)
pd.write_output(hidden)
data = pd()
loss = layers.mean(x=data)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(loss)

exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
exe.run(fluid.default_main_program(),
feed={
x.name: np.random.uniform(0.1, 0.6,
(20, 30, 40)).astype("float32")
})

def test_forward(self):
pass
import numpy


class BaseParallelForTest(unittest.TestCase):
def run_test(self, callback, feed, fetch):
"""
Run the unittest for parallel.for
Args:
callback(callable): A callable function returns a generator. There
are two yields in the generator function. The first yield
returns the data layers, and the second yield returns the loss.
The modified data variables will be sent back during the first
yield.
feed(dict): The executor feeding dictionary.
fetch(list|basestr): The fetch name lists.
Returns:
None
Raises:
AssertionError when the computation of cpu, parallel.for in cpu,
gpu, parallel.for in gpu are different.
"""
cpu = fluid.CPUPlace()
result_cpu = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=cpu,
use_parallel=False)
result_cpu_parallel = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=cpu,
use_parallel=True)
if fluid.core.is_compile_gpu():
gpu = fluid.CUDAPlace(0)
result_gpu = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=False)
result_gpu_parallel = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=True)
self._assert_same_(fetch, result_cpu, result_cpu_parallel,
result_gpu, result_gpu_parallel)
else:
self._assert_same_(fetch, result_cpu, result_cpu_parallel)

def _run_test_impl_(self, callback, feed, fetch, place, use_parallel=False):
"""
Run a single test, returns the fetch values
Args:
place(Place): the computation place.
use_parallel(bool): Whether use parallel.for or not.
Returns:
Fetched numpy arrays.
"""
if isinstance(fetch, basestring):
fetch = [fetch]
main = fluid.Program()
startup = fluid.Program()
# Fix seed
main.random_seed = 10
startup.random_seed = 10

with fluid.program_guard(main, startup):
generator = callback()
# Automatically insert parallel do if use_parallel = True
if use_parallel:
places = fluid.layers.get_places()
pd = fluid.layers.ParallelDo(places)
data = next(generator)

if isinstance(data, fluid.Variable):
data = [data]

with pd.do():
ins = map(pd.read_input, data)
if len(ins) == 1:
ins = ins[0]
loss = generator.send(ins) # patch input
pd.write_output(loss)

loss = pd()
else:
data = next(generator)
loss = generator.send(data)
self.assertIsNotNone(loss)
avg_loss = fluid.layers.mean(x=loss)
fluid.backward.append_backward(loss=avg_loss)

exe = fluid.Executor(place)
exe.run(startup)
return exe.run(main, feed=feed, fetch_list=fetch)

def _assert_same_(self, fetch, *args):
"""
Assert the return values of `run_test` are same.
Args:
fetch: Fetch list. Used for print error message
*args: The fetch result lists of each situations.
Returns:
None
Raises:
AssertionError
"""

def _impl_(a, b, fetch_id, item_id):
item_str = ['CPU', 'ParallelCPU', 'GPU', 'ParallelGPU']
flag = numpy.allclose(a, b, rtol=0.1)
self.assertTrue(flag, "The {0} are different in {1}".format(
fetch[fetch_id], item_str[item_id]))

for i, items in enumerate(zip(*args)):
self.assertGreater(len(items), 0)
for j in range(1, len(items)):
_impl_(items[0], items[j], fetch_id=i, item_id=j)


class ParallelOpTest(BaseParallelForTest):
def test_simple_fc(self):
def __network__():
x = fluid.layers.data(shape=[784], dtype='float32', name='img')
# FIXME: This is a bug of parallel.do
x.stop_gradient = False
x = yield x
hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
loss = fluid.layers.mean(x=hidden)
yield loss

self.run_test(
callback=__network__,
feed={
'img':
numpy.random.random(size=(128 * 3, 784)).astype('float32')
},
fetch='fc1.w@GRAD')


if __name__ == '__main__':
Expand Down

0 comments on commit a9dbdab

Please sign in to comment.