This repository has been archived by the owner on Jan 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
use dfs command to create file parallel #1505
Merged
qiyuangong
merged 4 commits into
Intel-bigdata:trunk
from
duzhen1996:script_hdfs_submit
Dec 26, 2017
Merged
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import unittest | ||
from util import * | ||
from threading import Thread | ||
import sys, os, time | ||
from subprocess import Popen, list2cmdline | ||
|
||
FILE_SIZE = 1024 * 1024 | ||
|
||
|
@@ -16,6 +18,62 @@ def test_create_100M_0KB_thread(max_number): | |
wait_for_cmdlets(cids) | ||
|
||
|
||
def cpu_count(): | ||
''' Returns the number of CPUs in the system | ||
''' | ||
num = 1 | ||
if sys.platform == 'win32': | ||
try: | ||
num = int(os.environ['NUMBER_OF_PROCESSORS']) | ||
except (ValueError, KeyError): | ||
pass | ||
elif sys.platform == 'darwin': | ||
try: | ||
num = int(os.popen('sysctl -n hw.ncpu').read()) | ||
except ValueError: | ||
pass | ||
else: | ||
try: | ||
num = os.sysconf('SC_NPROCESSORS_ONLN') | ||
except (ValueError, OSError, AttributeError): | ||
pass | ||
|
||
return num | ||
|
||
|
||
def exec_commands(cmds): | ||
''' Exec commands in parallel in multiple process | ||
(as much as we have CPU) | ||
''' | ||
if not cmds: return # empty list | ||
|
||
def done(p): | ||
return p.poll() is not None | ||
def success(p): | ||
return p.returncode == 0 | ||
def fail(): | ||
sys.exit(1) | ||
|
||
max_task = cpu_count() | ||
processes = [] | ||
while True: | ||
while cmds and len(processes) < max_task: | ||
task = cmds.pop() | ||
print list2cmdline(task) | ||
processes.append(Popen(task)) | ||
|
||
for p in processes: | ||
if done(p): | ||
if success(p): | ||
processes.remove(p) | ||
else: | ||
fail() | ||
|
||
if not processes and not cmds: | ||
break | ||
else: | ||
time.sleep(0.05) | ||
|
||
class ResetEnv(unittest.TestCase): | ||
def test_delete_all_rules(self): | ||
""" | ||
|
@@ -75,24 +133,17 @@ def test_create_10M_DFSIO(self): | |
|
||
|
||
def test_create_10K_0KB_DFSIO_parallel(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think each client should create several dirs, each dir has 10,000~100,000 files. So, maybe you need another loop. |
||
dir_name = TEST_DIR + random_string() | ||
|
||
""" | ||
When there are so many thread is started, and the client cannot handles them, | ||
please set process_group_size less | ||
""" | ||
process_group_size = 200 | ||
|
||
file_index = 0; | ||
for i in range(10000 / process_group_size): | ||
process_group = [] | ||
for j in range(process_group_size): | ||
process_group.append(subprocess.Popen("hdfs dfs -touchz dir_name/" + str(file_index))) | ||
dir_num = 5 | ||
for i in range(dir_num): | ||
file_index = 0 | ||
dir_name = TEST_DIR + random_string() | ||
command_arr = [] | ||
for i in range(10000 / dir_num): | ||
command_arr.append("hdfs dfs -touchz " + dir_name + "/" + str(file_index)) | ||
file_index = file_index + 1 | ||
exec_commands(command_arr) | ||
|
||
|
||
# wait | ||
for k in range(process_group_size): | ||
process_group[k].wait() | ||
|
||
def test_create_100M_0KB_parallel(self): | ||
max_number = 200000 | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this method to util.py