diff --git a/supports/integration-test/main.py b/supports/integration-test/main.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/supports/integration-test/reset_env.py b/supports/integration-test/reset_env.py index a98f6adc580..8e30873a5ee 100644 --- a/supports/integration-test/reset_env.py +++ b/supports/integration-test/reset_env.py @@ -1,6 +1,8 @@ import unittest from util import * from threading import Thread +import sys, os, time +import subprocess FILE_SIZE = 1024 * 1024 @@ -15,6 +17,38 @@ def test_create_100M_0KB_thread(max_number): cids.append(cid) wait_for_cmdlets(cids) +def exec_commands(cmds): + ''' Exec commands in parallel in multiple process + (as much as we have CPU) + ''' + if not cmds: return # empty list + + def done(p): + return p.poll() is not None + def success(p): + return p.returncode == 0 + def fail(): + sys.exit(1) + + max_task = cpu_count() + processes = [] + while True: + while cmds and len(processes) < max_task: + task = cmds.pop() + print task + processes.append(subprocess.Popen(task, shell=True)) + + for p in processes: + if done(p): + if success(p): + processes.remove(p) + else: + fail() + + if not processes and not cmds: + break + else: + time.sleep(0.05) class ResetEnv(unittest.TestCase): def test_delete_all_rules(self): @@ -73,6 +107,20 @@ def test_create_10M_DFSIO(self): subprocess.call("hdfs dfs -mv /benchmarks/TestDFSIO/io_data " + TEST_DIR + str(i) + "_data") + + def test_create_10K_0KB_DFSIO_parallel(self): + dir_num = 5 + for i in range(dir_num): + file_index = 0 + dir_name = TEST_DIR + random_string() + command_arr = [] + for i in range(10000 / dir_num): + command_arr.append("hdfs dfs -touchz " + dir_name + "/" + str(file_index)) + file_index = file_index + 1 + exec_commands(command_arr) + + + def test_create_100M_0KB_parallel(self): max_number = 200000 dir_number = 50 diff --git a/supports/integration-test/util.py b/supports/integration-test/util.py index 2b82c136cfb..7efb66d2e7d 100644 --- a/supports/integration-test/util.py +++ b/supports/integration-test/util.py @@ -2,6 +2,8 @@ import random import time import uuid +import sys, os +from subprocess import Popen, list2cmdline # Server info BASE_URL = "http://localhost:7045" @@ -23,6 +25,28 @@ TEST_DIR = "/ssmtest/" +def cpu_count(): + ''' Returns the number of CPUs in the system + ''' + num = 1 + if sys.platform == 'win32': + try: + num = int(os.environ['NUMBER_OF_PROCESSORS']) + except (ValueError, KeyError): + pass + elif sys.platform == 'darwin': + try: + num = int(os.popen('sysctl -n hw.ncpu').read()) + except ValueError: + pass + else: + try: + num = os.sysconf('SC_NPROCESSORS_ONLN') + except (ValueError, OSError, AttributeError): + pass + + return num + def random_file_path(): return TEST_DIR + random_string()