-
Notifications
You must be signed in to change notification settings - Fork 32
/
process_utils.py
102 lines (71 loc) · 2.83 KB
/
process_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
Run something at the command line and capture the output, based on:
https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
Includes handy example code for doing this on multiple processes/threads.
"""
#%% Constants, imports, and environment
import os
import subprocess
os.environ["PYTHONUNBUFFERED"] = "1"
def execute(cmd):
"""
Run [cmd] (a single string) in a shell, yielding each line of output to the caller.
"""
# https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, universal_newlines=True)
for stdout_line in iter(popen.stdout.readline, ""):
yield stdout_line
popen.stdout.close()
return_code = popen.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, cmd)
def execute_and_print(cmd,print_output=True):
"""
Run [cmd] (a single string) in a shell, capturing and printing output. Returns
a dictionary with fields "status" and "output".
"""
to_return = {'status':'unknown','output':''}
output=[]
try:
for s in execute(cmd):
output.append(s)
if print_output:
print(s,end='',flush=True)
to_return['status'] = 0
except subprocess.CalledProcessError as cpe:
print('Caught error: {}'.format(cpe.output))
to_return['status'] = cpe.returncode
to_return['output'] = output
return to_return
#%% Single-threaded test driver for execute_and_print
if False:
#%%
if os.name == 'nt':
execute_and_print('echo hello && ping -n 5 127.0.0.1 && echo goodbye')
else:
execute_and_print('echo hello && sleep 1 && echo goodbye')
#%% Parallel test driver for execute_and_print
if False:
#%%
from functools import partial
from multiprocessing.pool import ThreadPool as ThreadPool
from multiprocessing.pool import Pool as Pool
n_workers = 10
# Should we use threads (vs. processes) for parallelization?
use_threads = True
test_data = ['a','b','c','d']
def process_sample(s):
execute_and_print('echo ' + s,True)
if n_workers == 1:
results = []
for i_sample,sample in enumerate(test_data):
results.append(process_sample(sample))
else:
n_threads = min(n_workers,len(test_data))
if use_threads:
print('Starting parallel thread pool with {} workers'.format(n_threads))
pool = ThreadPool(n_threads)
else:
print('Starting parallel process pool with {} workers'.format(n_threads))
pool = Pool(n_threads)
results = list(pool.map(partial(process_sample),test_data))