-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sonic-py-common] Add getstatusoutput_noshell() functions to general module #12065
Changes from 8 commits
6a6dd70
f7d90d0
0e8f4ce
8e3b376
5b256df
bfd3862
38ca258
fbe43bf
99ccf28
cd007de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,4 +1,5 @@ | ||||||
import sys | ||||||
from subprocess import Popen, STDOUT, PIPE, CalledProcessError, check_output | ||||||
|
||||||
|
||||||
def load_module_from_source(module_name, file_path): | ||||||
|
@@ -23,3 +24,90 @@ def load_module_from_source(module_name, file_path): | |||||
sys.modules[module_name] = module | ||||||
|
||||||
return module | ||||||
|
||||||
|
||||||
def getstatusoutput_noshell(cmd): | ||||||
""" | ||||||
This function implements getstatusoutput API from subprocess module | ||||||
but using shell=False to prevent shell injection. | ||||||
Ref: https://github.com/python/cpython/blob/3.10/Lib/subprocess.py#L602 | ||||||
""" | ||||||
try: | ||||||
output = check_output(cmd, universal_newlines=True, stderr=STDOUT) | ||||||
exitcode = 0 | ||||||
except CalledProcessError as ex: | ||||||
output = ex.output | ||||||
exitcode = ex.returncode | ||||||
if output[-1:] == '\n': | ||||||
output = output[:-1] | ||||||
return exitcode, output | ||||||
|
||||||
|
||||||
def getstatusoutput_noshell_pipes(*args): | ||||||
""" | ||||||
This function implements getstatusoutput API from subprocess module | ||||||
but using shell=False to prevent shell injection. Input command | ||||||
includes two or more pipe commands. | ||||||
""" | ||||||
if len(args) < 2: | ||||||
raise ValueError("Need at least 2 processes") | ||||||
# Set up more arguments in every processes | ||||||
for arg in args: | ||||||
arg["stdout"] = PIPE | ||||||
arg["universal_newlines"] = True | ||||||
arg["shell"] = False | ||||||
|
||||||
# Runs all subprocesses connecting stdins and previous arguments | ||||||
# to create the pipeline. Closes stdouts to avoid deadlocks. | ||||||
# Ref: https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline | ||||||
popens = [Popen(**args[0])] | ||||||
for i in range(1,len(args)): | ||||||
args[i]["stdin"] = popens[i-1].stdout | ||||||
popens.append(Popen(**args[i])) | ||||||
popens[i-1].stdout.close() | ||||||
output = popens[-1].communicate()[0] | ||||||
if output[-1:] == '\n': | ||||||
output = output[:-1] | ||||||
|
||||||
# Wait for the processes to terminate and return the exitcodes | ||||||
exitcodes = [0] * len(popens) | ||||||
while popens: | ||||||
last = popens.pop(-1) | ||||||
exitcodes[len(popens)] = last.wait() | ||||||
return (exitcodes, output) | ||||||
|
||||||
|
||||||
def check_output_pipes(*args): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
If you will treat the first element of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
""" | ||||||
This function implements check_output API from subprocess module. | ||||||
Input command includes two or more pipe command. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pipe command -> commands connected by shell pipe(s) #Closed |
||||||
""" | ||||||
if len(args) < 2: | ||||||
raise ValueError("Needs at least 2 processes") | ||||||
# Set up more arguments in every processes | ||||||
for arg in args: | ||||||
arg["stdout"] = PIPE | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
arg["universal_newlines"] = True | ||||||
arg["shell"] = False | ||||||
|
||||||
# Runs all subprocesses connecting stdins and previous arguments | ||||||
# to create the pipeline. Closes stdouts to avoid deadlocks. | ||||||
# Ref: https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline | ||||||
popens = [Popen(**args[0])] | ||||||
for i in range(1,len(args)): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
args[i]["stdin"] = popens[i-1].stdout | ||||||
popens.append(Popen(**args[i])) | ||||||
popens[i-1].stdout.close() | ||||||
output = popens[-1].communicate()[0] | ||||||
|
||||||
# Wait for the processes to terminate and raise an exeption if exitcode is non zero | ||||||
i = 0 | ||||||
while popens: | ||||||
current = popens.pop(0) | ||||||
exitcode = current.wait() | ||||||
if exitcode != 0: | ||||||
raise CalledProcessError(returncode=exitcode, cmd=args[i], output=current.stdout) | ||||||
i += 1 | ||||||
|
||||||
return output | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,31 @@ | ||||||
import sys | ||||||
import pytest | ||||||
import subprocess | ||||||
from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipes, check_output_pipes | ||||||
|
||||||
|
||||||
def test_getstatusoutput_noshell(tmp_path): | ||||||
exitcode, output = getstatusoutput_noshell(['echo', 'sonic']) | ||||||
assert (exitcode, output) == (0, 'sonic') | ||||||
|
||||||
exitcode, output = getstatusoutput_noshell([sys.executable, "-c", "import sys; sys.exit(6)"]) | ||||||
assert exitcode != 0 | ||||||
|
||||||
def test_getstatusoutput_noshell_pipes(): | ||||||
exitcode, output = getstatusoutput_noshell_pipes(dict(args=['echo', 'sonic']), dict(args=['awk', '{print $1}'])) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
You can still call the function in traditional way if function parameter is |
||||||
assert (exitcode, output) == ([0, 0], 'sonic') | ||||||
|
||||||
exitcode, output = getstatusoutput_noshell_pipes(dict(args=[sys.executable, "-c", "import sys; sys.exit(6)"]), dict(args=[sys.executable, "-c", "import sys; sys.exit(8)"])) | ||||||
assert exitcode == [6, 8] | ||||||
|
||||||
def test_check_output_pipes(): | ||||||
output = check_output_pipes(dict(args=['echo', 'sonic']), dict(args=['awk', '{print $1}'])) | ||||||
assert output == 'sonic\n' | ||||||
|
||||||
with pytest.raises(subprocess.CalledProcessError) as e: | ||||||
check_output_pipes(dict(args=[sys.executable, "-c", "import sys; sys.exit(6)"]), dict(args=[sys.executable, "-c", "import sys; sys.exit(0)"])) | ||||||
assert e.returncode == [6, 0] | ||||||
|
||||||
with pytest.raises(subprocess.CalledProcessError) as e: | ||||||
check_output_pipes(dict(args=[sys.executable, "-c", "import sys; sys.exit(0)"]), dict(args=[sys.executable, "-c", "import sys; sys.exit(6)"])) | ||||||
assert e.returncode == [0, 6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit tests to cover new functions. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, working on UT