Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR: Handle system commands that use UNC paths on Windows #500

Merged
merged 5 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions ipykernel/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def test_is_complete():
assert reply['content']['status'] == 'complete'


@dec.skipif(sys.platform.startswith('linux'))
def test_complete():
with kernel() as kc:
execute(u'a = 1', kc=kc)
Expand Down Expand Up @@ -317,6 +318,36 @@ def test_message_order():
assert reply['parent_header']['msg_id'] == msg_id


@dec.skipif(sys.platform.startswith('linux'))
def test_unc_paths():
with kernel() as kc, TemporaryDirectory() as td:
drive_file_path = os.path.join(td, 'unc.txt')
with open(drive_file_path, 'w+') as f:
f.write('# UNC test')
unc_root = '\\\\localhost\\C$'
file_path = os.path.splitdrive(os.path.dirname(drive_file_path))[1]
unc_file_path = os.path.join(unc_root, file_path[1:])

iopub = kc.iopub_channel

kc.execute("cd {0:s}".format(unc_file_path))
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
assert reply['content']['status'] == 'ok'
out, err = assemble_output(iopub)
assert unc_file_path in out

flush_channels(kc)
kc.execute(code="ls")
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
assert reply['content']['status'] == 'ok'
out, err = assemble_output(iopub)
assert 'unc.txt' in out

kc.execute(code="cd")
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
assert reply['content']['status'] == 'ok'


def test_shutdown():
"""Kernel exits after polite shutdown_request"""
with new_kernel() as kc:
Expand Down
38 changes: 37 additions & 1 deletion ipykernel/zmqshell.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
)
from IPython.utils import openpy
from ipykernel.jsonutil import json_clean, encode_images
from IPython.utils.process import arg_split
from IPython.utils.process import arg_split, system
from ipython_genutils import py3compat
from ipython_genutils.py3compat import unicode_type
from traitlets import (
Expand Down Expand Up @@ -601,5 +601,41 @@ def init_virtualenv(self):
# https://ipython.readthedocs.io/en/latest/install/kernel_install.html
pass

def system_piped(self, cmd):
"""Call the given cmd in a subprocess, piping stdout/err

Parameters
----------
cmd : str
Command to execute (can not end in '&', as background processes are
not supported. Should not be a command that expects input
other than simple text.
"""
if cmd.rstrip().endswith('&'):
# this is *far* from a rigorous test
# We do not support backgrounding processes because we either use
# pexpect or pipes to read from. Users can always just call
# os.system() or use ip.system=ip.system_raw
# if they really want a background process.
raise OSError("Background processes not supported.")

# we explicitly do NOT return the subprocess status code, because
# a non-None value would trigger :func:`sys.displayhook` calls.
# Instead, we store the exit_code in user_ns.
# Also, protect system call from UNC paths on Windows here too
# as is done in InteractiveShell.system_raw
if sys.platform == 'win32':
cmd = self.var_expand(cmd, depth=1)
from IPython.utils._process_win32 import AvoidUNCPath
with AvoidUNCPath() as path:
if path is not None:
cmd = 'pushd %s &&%s' % (path, cmd)
self.user_ns['_exit_code'] = system(cmd)
else:
self.user_ns['_exit_code'] = system(self.var_expand(cmd, depth=1))

# Ensure new system_piped implementation is used
system = system_piped


InteractiveShellABC.register(ZMQInteractiveShell)