Skip to content

Commit

Permalink
Gracefully handle errors when executing operations
Browse files Browse the repository at this point in the history
  • Loading branch information
sdispater committed Jul 9, 2020
1 parent 2859016 commit 35225a7
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 82 deletions.
6 changes: 3 additions & 3 deletions poetry/console/commands/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ def handle(self):
if status != 0 or self.option("dry-run"):
# Revert changes
if not self.option("dry-run"):
self.error(
self.line_error(
"\n"
"Addition failed, reverting pyproject.toml "
"to its original content."
"<error>Failed to add packages, reverting the pyproject.toml file "
"to its original content.</error>"
)

self.poetry.file.write(original_content)
Expand Down
192 changes: 129 additions & 63 deletions poetry/installation/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading

from concurrent.futures import ThreadPoolExecutor
from functools import partial
from concurrent.futures import as_completed
from subprocess import CalledProcessError

from poetry.core.packages.file_dependency import FileDependency
Expand All @@ -15,6 +15,8 @@
from poetry.utils._compat import OrderedDict
from poetry.utils._compat import Path
from poetry.utils._compat import cpu_count
from poetry.utils._compat import decode
from poetry.utils.env import EnvCommandError
from poetry.utils.helpers import safe_rmtree

from .authenticator import Authenticator
Expand All @@ -26,14 +28,6 @@
from .operations.update import Update


def take(n, iterable):
return list(itertools.islice(iterable, n))


def chunked(iterable, n):
return iter(partial(take, n, iter(iterable)), [])


class Executor(object):
def __init__(self, env, pool, config, io, parallel=None):
self._env = env
Expand Down Expand Up @@ -67,6 +61,7 @@ def __init__(self, env, pool, config, io, parallel=None):
self._skipped = {"install": 0, "update": 0, "uninstall": 0}
self._sections = OrderedDict()
self._lock = threading.Lock()
self._shutdown = False

@property
def installations_count(self): # type: () -> int
Expand Down Expand Up @@ -98,7 +93,7 @@ def verbose(self, verbose=True):

return self

def execute(self, operations):
def execute(self, operations): # type: (Operation) -> int
self._total_operations = len(operations)
for job_type in self._executed:
self._executed[job_type] = 0
Expand All @@ -109,49 +104,27 @@ def execute(self, operations):

# We group operations by priority
groups = itertools.groupby(operations, key=lambda o: -o.priority)
i = 0
self._sections = OrderedDict()
for _, group in groups:
for chunk in chunked(group, self._max_workers):
tasks = []
for operation in chunk:
if self.supports_fancy_output():
if id(operation) not in self._sections:
if self._should_write_operation(operation):
self._lock.acquire()
self._sections[id(operation)] = self._io.section()
self._sections[id(operation)].write_line(
" <fg=blue;options=bold>•</> {message}: <fg=blue>Pending...</>".format(
message=self.get_operation_message(operation),
),
)
self._lock.release()
else:
if self._should_write_operation(operation):
if not operation.skipped:
self._io.write_line(
" <fg=blue;options=bold>•</> {message}".format(
message=self.get_operation_message(operation),
),
)
else:
self._io.write_line(
" <fg=default;options=bold,dark>•</> {message}: "
"<fg=default;options=bold,dark>Skipped</> "
"<fg=default;options=dark>for the following reason:</> "
"<fg=default;options=bold,dark>{reason}</>".format(
message=self.get_operation_message(operation),
reason=operation.skip_reason,
)
)

for operation in chunk:
tasks.append(
self._executor.submit(self._execute_operation, operation)
)
i += 1

[t.result() for t in tasks]
tasks = []
for operation in group:
if self._shutdown:
break

tasks.append(self._executor.submit(self._execute_operation, operation))

try:
for task in as_completed(tasks):
task.result()
except KeyboardInterrupt:
self._shutdown = True

if self._shutdown:
self._executor.shutdown(wait=True)

break

return self._shutdown

def _write(self, operation, line):
if not self.supports_fancy_output() or not self._should_write_operation(
Expand All @@ -174,6 +147,84 @@ def _write(self, operation, line):
self._lock.release()

def _execute_operation(self, operation):
try:
if self.supports_fancy_output():
if id(operation) not in self._sections:
if self._should_write_operation(operation):
self._lock.acquire()
self._sections[id(operation)] = self._io.section()
self._sections[id(operation)].write_line(
" <fg=blue;options=bold>•</> {message}: <fg=blue>Pending...</>".format(
message=self.get_operation_message(operation),
),
)
self._lock.release()
else:
if self._should_write_operation(operation):
if not operation.skipped:
self._io.write_line(
" <fg=blue;options=bold>•</> {message}".format(
message=self.get_operation_message(operation),
),
)
else:
self._io.write_line(
" <fg=default;options=bold,dark>•</> {message}: "
"<fg=default;options=bold,dark>Skipped</> "
"<fg=default;options=dark>for the following reason:</> "
"<fg=default;options=bold,dark>{reason}</>".format(
message=self.get_operation_message(operation),
reason=operation.skip_reason,
)
)

try:
result = self._do_execute_operation(operation)
except EnvCommandError as e:
if e.e.returncode == -2:
result = -2
else:
raise

# If we have a result of -2 it means a KeyboardInterrupt
# in the any python subprocess, so we raise a KeyboardInterrupt
# error to be picked up by the error handler.
if result == -2:
raise KeyboardInterrupt
except Exception as e:
from clikit.ui.components.exception_trace import ExceptionTrace

if not self.supports_fancy_output():
io = self._io
else:
message = " <error>•</error> {message}: <error>Failed</error>".format(
message=self.get_operation_message(operation, error=True),
)
self._write(operation, message)
io = self._sections.get(id(operation), self._io)

self._lock.acquire()

trace = ExceptionTrace(e)
trace.render(io)
io.write_line("")

self._shutdown = True
self._lock.release()
except KeyboardInterrupt:
message = " <warning>•</warning> {message}: <warning>Cancelled</warning>".format(
message=self.get_operation_message(operation, warning=True),
)
if not self.supports_fancy_output():
self._io.write_line(message)
else:
self._write(operation, message)

self._lock.acquire()
self._shutdown = True
self._lock.release()

def _do_execute_operation(self, operation):
method = operation.job_type

operation_message = self.get_operation_message(operation)
Expand All @@ -191,7 +242,7 @@ def _execute_operation(self, operation):

self._skipped[operation.job_type] += 1

return
return 0

if not self._enabled or self._dry_run:
self._io.write_line(
Expand All @@ -200,9 +251,12 @@ def _execute_operation(self, operation):
)
)

return
return 0

result = getattr(self, "_execute_{}".format(method))(operation)

getattr(self, "_execute_{}".format(method))(operation)
if result != 0:
return result

message = " <fg=green;options=bold>•</> {message}".format(
message=self.get_operation_message(operation, done=True),
Expand All @@ -211,6 +265,8 @@ def _execute_operation(self, operation):

self._increment_operations_count(operation, True)

return result

def _increment_operations_count(self, operation, executed):
self._lock.acquire()
if executed:
Expand All @@ -222,15 +278,25 @@ def _increment_operations_count(self, operation, executed):
self._lock.release()

def run_pip(self, *args, **kwargs): # type: (...) -> str
return self._env.run("python", "-m", "pip", *args, **kwargs)
try:
return self._env.run("python", "-m", "pip", *args, **kwargs)
except EnvCommandError as e:
if "KeyboardInterrupt" in decode(e.e.output):
return -2

def get_operation_message(self, operation, done=False):
raise

def get_operation_message(self, operation, done=False, error=False, warning=False):
base_tag = "fg=default"
operation_color = "c2"
source_operation_color = "c2"
package_color = "c1"

if done:
if error:
operation_color = "error"
elif warning:
operation_color = "warning"
elif done:
operation_color = "success"

if operation.skipped:
Expand Down Expand Up @@ -318,18 +384,18 @@ def _display_summary(self, operations):
self._io.write_line("")

def _execute_install(self, operation): # type: (Install) -> None
self._install(operation)
return self._install(operation)

def _execute_update(self, operation): # type: (Update) -> None
self._update(operation)
return self._update(operation)

def _execute_uninstall(self, operation): # type: (Uninstall) -> None
message = " <fg=blue;options=bold>•</> {message}: <info>Removing...</info>".format(
message=self.get_operation_message(operation),
)
self._write(operation, message)

self._remove(operation)
return self._remove(operation)

def _install(self, operation):
package = operation.package
Expand Down Expand Up @@ -360,7 +426,7 @@ def _install(self, operation):
if operation.job_type == "update":
args.insert(2, "-U")

self.run_pip(*args)
return self.run_pip(*args)

def _update(self, operation):
return self._install(operation)
Expand All @@ -375,10 +441,10 @@ def _remove(self, operation):
safe_rmtree(str(src_dir))

try:
self.run_pip("uninstall", package.name, "-y")
return self.run_pip("uninstall", package.name, "-y")
except CalledProcessError as e:
if "not installed" in str(e):
return
return 0

raise

Expand Down
7 changes: 4 additions & 3 deletions poetry/installation/installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ def run(self):
self._execute_operations = False

local_repo = Repository()
self._do_install(local_repo)

return 0
return self._do_install(local_repo)

def dry_run(self, dry_run=True): # type: (bool) -> Installer
self._dry_run = dry_run
Expand Down Expand Up @@ -279,7 +278,7 @@ def _do_install(self, local_repo):
self._filter_operations(ops, local_repo)

# Execute operations
self._execute(ops)
return self._execute(ops)

def _write_lock_file(self, repo): # type: (Repository) -> None
if self._update and self._write_lock:
Expand Down Expand Up @@ -335,6 +334,8 @@ def _execute(self, operations):
for op in operations:
self._execute_operation(op)

return 0

def _execute_operation(self, operation): # type: (Operation) -> None
"""
Execute a given operation.
Expand Down
10 changes: 5 additions & 5 deletions tests/console/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ def updates(self):
def removals(self):
return self._uninstalls

def _execute_operation(self, operation):
super(Executor, self)._execute_operation(operation)
def _do_execute_operation(self, operation):
super(Executor, self)._do_execute_operation(operation)

if not operation.skipped:
getattr(self, "_{}s".format(operation.job_type)).append(operation.package)

def _execute_install(self, operation):
pass
return 0

def _execute_update(self, operation):
pass
return 0

def _execute_remove(self, operation):
pass
return 0


@pytest.fixture()
Expand Down
Loading

0 comments on commit 35225a7

Please sign in to comment.