From 583c98d1667812e70d8b193a9153986eaa35cf53 Mon Sep 17 00:00:00 2001 From: ZhenDu <454858191@qq.com> Date: Mon, 25 Dec 2017 16:28:12 +0800 Subject: [PATCH 1/4] use dfs command to create file parallel --- supports/integration-test/reset_env.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/supports/integration-test/reset_env.py b/supports/integration-test/reset_env.py index a98f6adc580..60cbcad8b8f 100644 --- a/supports/integration-test/reset_env.py +++ b/supports/integration-test/reset_env.py @@ -73,6 +73,27 @@ def test_create_10M_DFSIO(self): subprocess.call("hdfs dfs -mv /benchmarks/TestDFSIO/io_data " + TEST_DIR + str(i) + "_data") + + def test_create_10K_0KB_DFSIO_parallel(self): + dir_name = TEST_DIR + random_string() + + """ + When there are so many thread is started, and the client cannot handles them, + please set process_group_size less + """ + process_group_size = 200 + + file_index = 0; + for i in range(10000 / process_group_size): + process_group = [] + for j in range(process_group_size): + process_group.append(subprocess.Popen("hdfs dfs -touchz dir_name/" + str(file_index))) + file_index = file_index + 1 + + # wait + for k in range(process_group_size): + process_group[k].wait() + def test_create_100M_0KB_parallel(self): max_number = 200000 dir_number = 50 From 4cb69c2f6d5cf97ae0e58235d9c08d1cacf0a306 Mon Sep 17 00:00:00 2001 From: ZhenDu <454858191@qq.com> Date: Mon, 25 Dec 2017 20:09:18 +0800 Subject: [PATCH 2/4] refine the function --- supports/integration-test/reset_env.py | 83 +++++++++++++++++++++----- 1 file changed, 67 insertions(+), 16 deletions(-) diff --git a/supports/integration-test/reset_env.py b/supports/integration-test/reset_env.py index 60cbcad8b8f..de203b445ce 100644 --- a/supports/integration-test/reset_env.py +++ b/supports/integration-test/reset_env.py @@ -1,6 +1,8 @@ import unittest from util import * from threading import Thread +import sys, os, time +from subprocess import Popen, list2cmdline FILE_SIZE = 1024 * 1024 @@ -16,6 +18,62 @@ def test_create_100M_0KB_thread(max_number): wait_for_cmdlets(cids) +def cpu_count(): + ''' Returns the number of CPUs in the system + ''' + num = 1 + if sys.platform == 'win32': + try: + num = int(os.environ['NUMBER_OF_PROCESSORS']) + except (ValueError, KeyError): + pass + elif sys.platform == 'darwin': + try: + num = int(os.popen('sysctl -n hw.ncpu').read()) + except ValueError: + pass + else: + try: + num = os.sysconf('SC_NPROCESSORS_ONLN') + except (ValueError, OSError, AttributeError): + pass + + return num + + +def exec_commands(cmds): + ''' Exec commands in parallel in multiple process + (as much as we have CPU) + ''' + if not cmds: return # empty list + + def done(p): + return p.poll() is not None + def success(p): + return p.returncode == 0 + def fail(): + sys.exit(1) + + max_task = cpu_count() + processes = [] + while True: + while cmds and len(processes) < max_task: + task = cmds.pop() + print list2cmdline(task) + processes.append(Popen(task)) + + for p in processes: + if done(p): + if success(p): + processes.remove(p) + else: + fail() + + if not processes and not cmds: + break + else: + time.sleep(0.05) + class ResetEnv(unittest.TestCase): def test_delete_all_rules(self): """ @@ -75,24 +133,17 @@ def test_create_10M_DFSIO(self): def test_create_10K_0KB_DFSIO_parallel(self): - dir_name = TEST_DIR + random_string() - - """ - When there are so many thread is started, and the client cannot handles them, - please set process_group_size less - """ - process_group_size = 200 - - file_index = 0; - for i in range(10000 / process_group_size): - process_group = [] - for j in range(process_group_size): - process_group.append(subprocess.Popen("hdfs dfs -touchz dir_name/" + str(file_index))) + dir_num = 5 + for i in range(dir_num): + file_index = 0 + dir_name = TEST_DIR + random_string() + command_arr = [] + for i in range(10000 / dir_num): + command_arr.append("hdfs dfs -touchz " + dir_name + "/" + str(file_index)) file_index = file_index + 1 + exec_commands(command_arr) + - # wait - for k in range(process_group_size): - process_group[k].wait() def test_create_100M_0KB_parallel(self): max_number = 200000 From b763e9ce4cd4df90b0493f84fe9ef915a5b69b65 Mon Sep 17 00:00:00 2001 From: ZhenDu <454858191@qq.com> Date: Mon, 25 Dec 2017 21:25:06 +0800 Subject: [PATCH 3/4] refine the code --- supports/integration-test/reset_env.py | 24 ------------------------ supports/integration-test/util.py | 24 ++++++++++++++++++++++++ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/supports/integration-test/reset_env.py b/supports/integration-test/reset_env.py index de203b445ce..9c5b9e6ed80 100644 --- a/supports/integration-test/reset_env.py +++ b/supports/integration-test/reset_env.py @@ -17,30 +17,6 @@ def test_create_100M_0KB_thread(max_number): cids.append(cid) wait_for_cmdlets(cids) - -def cpu_count(): - ''' Returns the number of CPUs in the system - ''' - num = 1 - if sys.platform == 'win32': - try: - num = int(os.environ['NUMBER_OF_PROCESSORS']) - except (ValueError, KeyError): - pass - elif sys.platform == 'darwin': - try: - num = int(os.popen('sysctl -n hw.ncpu').read()) - except ValueError: - pass - else: - try: - num = os.sysconf('SC_NPROCESSORS_ONLN') - except (ValueError, OSError, AttributeError): - pass - - return num - - def exec_commands(cmds): ''' Exec commands in parallel in multiple process (as much as we have CPU) diff --git a/supports/integration-test/util.py b/supports/integration-test/util.py index 2b82c136cfb..7efb66d2e7d 100644 --- a/supports/integration-test/util.py +++ b/supports/integration-test/util.py @@ -2,6 +2,8 @@ import random import time import uuid +import sys, os +from subprocess import Popen, list2cmdline # Server info BASE_URL = "http://localhost:7045" @@ -23,6 +25,28 @@ TEST_DIR = "/ssmtest/" +def cpu_count(): + ''' Returns the number of CPUs in the system + ''' + num = 1 + if sys.platform == 'win32': + try: + num = int(os.environ['NUMBER_OF_PROCESSORS']) + except (ValueError, KeyError): + pass + elif sys.platform == 'darwin': + try: + num = int(os.popen('sysctl -n hw.ncpu').read()) + except ValueError: + pass + else: + try: + num = os.sysconf('SC_NPROCESSORS_ONLN') + except (ValueError, OSError, AttributeError): + pass + + return num + def random_file_path(): return TEST_DIR + random_string() From 9df3670186ab6f3e4813634b18a8643f754ed050 Mon Sep 17 00:00:00 2001 From: ZhenDu <454858191@qq.com> Date: Tue, 26 Dec 2017 16:40:55 +0800 Subject: [PATCH 4/4] fix some bugs --- supports/integration-test/main.py | 0 supports/integration-test/reset_env.py | 6 +++--- 2 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 supports/integration-test/main.py diff --git a/supports/integration-test/main.py b/supports/integration-test/main.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/supports/integration-test/reset_env.py b/supports/integration-test/reset_env.py index 9c5b9e6ed80..8e30873a5ee 100644 --- a/supports/integration-test/reset_env.py +++ b/supports/integration-test/reset_env.py @@ -2,7 +2,7 @@ from util import * from threading import Thread import sys, os, time -from subprocess import Popen, list2cmdline +import subprocess FILE_SIZE = 1024 * 1024 @@ -35,8 +35,8 @@ def fail(): while True: while cmds and len(processes) < max_task: task = cmds.pop() - print list2cmdline(task) - processes.append(Popen(task)) + print task + processes.append(subprocess.Popen(task, shell=True)) for p in processes: if done(p):