From 5ed5f6eedbe8f17ee85493a88e6d0f600d5dee0d Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Thu, 2 Aug 2018 14:09:40 +0200 Subject: [PATCH] Implement exponential backoff retry mechanism for transport tasks (#1837) JobProcesses have various tasks the need to execute that require a transport, which can then fail for various reasons due to the command executed over the transport excepting. Examples are the submission of a job calculation as well as updating its scheduler state. These may fail for reasons that do not necessarily mean that the job is unrecoverably lost, such as the internet connection being temporarily unavailable or the scheduler simply not responding. Instead of putting the process in an excepted state, the engine should automatically retry at a later stage. Here we implement the exponential_backoff_retry utility, which is a coroutine that can wrap another function or coroutine and will try to run it, and rerun it when an exception is caught. When an exception is caught as many times as the maximum number of allowed attempts, the exception is reraised. This is implemented in the various transport tasks that are called by the Waiting state of the JobProcess class: * task_submit_job: submit the calculation * task_update_job: update the scheduler state * task_retrieve_job: retrieve the files of the completed calc * task_kill_job: kill the job through the scheduler These are now wrapped in the exponential_backoff_retry coroutine, which will give the process some leeway when they fail for reasons that may often resolve themselves, when given the time. --- aiida/backends/tests/__init__.py | 2 +- aiida/backends/tests/work/test_utils.py | 55 ++ aiida/daemon/execmanager.py | 573 +++++++++---------- aiida/orm/implementation/sqlalchemy/group.py | 9 +- aiida/transport/plugins/local.py | 4 +- aiida/work/job_processes.py | 297 +++++----- aiida/work/persistence.py | 2 +- aiida/work/utils.py | 70 ++- 8 files changed, 565 insertions(+), 447 deletions(-) create mode 100644 aiida/backends/tests/work/test_utils.py diff --git a/aiida/backends/tests/__init__.py b/aiida/backends/tests/__init__.py index adc62e3452..c4c1e92249 100644 --- a/aiida/backends/tests/__init__.py +++ b/aiida/backends/tests/__init__.py @@ -93,7 +93,7 @@ 'work.run': ['aiida.backends.tests.work.run'], 'work.runners': ['aiida.backends.tests.work.test_runners'], 'work.test_transport': ['aiida.backends.tests.work.test_transport'], - 'work.utils': ['aiida.backends.tests.work.utils'], + 'work.utils': ['aiida.backends.tests.work.test_utils'], 'work.work_chain': ['aiida.backends.tests.work.work_chain'], 'work.workfunctions': ['aiida.backends.tests.work.test_workfunctions'], 'work.job_processes': ['aiida.backends.tests.work.job_processes'], diff --git a/aiida/backends/tests/work/test_utils.py b/aiida/backends/tests/work/test_utils.py new file mode 100644 index 0000000000..807a26cd06 --- /dev/null +++ b/aiida/backends/tests/work/test_utils.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +from tornado.ioloop import IOLoop +from tornado.gen import coroutine + +from aiida.backends.testbase import AiidaTestCase +from aiida.work.utils import exponential_backoff_retry + +ITERATION = 0 +MAX_ITERATIONS = 3 + + +class TestExponentialBackoffRetry(AiidaTestCase): + """Tests for the exponential backoff retry coroutine.""" + + @classmethod + def setUpClass(cls, *args, **kwargs): + """Set up a simple authinfo and for later use.""" + super(TestExponentialBackoffRetry, cls).setUpClass(*args, **kwargs) + cls.authinfo = cls.backend.authinfos.create( + computer=cls.computer, + user=cls.backend.users.get_automatic_user()) + cls.authinfo.store() + + def test_exponential_backoff_success(self): + """Test that exponential backoff will successfully catch exceptions as long as max_attempts is not exceeded.""" + ITERATION = 0 + loop = IOLoop() + + @coroutine + def coro(): + """A function that will raise RuntimeError as long as ITERATION is smaller than MAX_ITERATIONS.""" + global ITERATION + ITERATION += 1 + if ITERATION < MAX_ITERATIONS: + raise RuntimeError + + max_attempts = MAX_ITERATIONS + 1 + loop.run_sync(lambda: exponential_backoff_retry(coro, initial_interval=0.1, max_attempts=max_attempts)) + + def test_exponential_backoff_max_attempts_exceeded(self): + """Test that exponential backoff will finally raise if max_attempts is exceeded""" + ITERATION = 0 + loop = IOLoop() + + @coroutine + def coro(): + """A function that will raise RuntimeError as long as ITERATION is smaller than MAX_ITERATIONS.""" + global ITERATION + ITERATION += 1 + if ITERATION < MAX_ITERATIONS: + raise RuntimeError + + max_attempts = MAX_ITERATIONS - 1 + with self.assertRaises(RuntimeError): + loop.run_sync(lambda: exponential_backoff_retry(coro, initial_interval=0.1, max_attempts=max_attempts)) diff --git a/aiida/daemon/execmanager.py b/aiida/daemon/execmanager.py index 0fdacb84e4..603df5bdcb 100644 --- a/aiida/daemon/execmanager.py +++ b/aiida/daemon/execmanager.py @@ -16,8 +16,8 @@ import os from aiida.common import aiidalogger +from aiida.common import exceptions from aiida.common.datastructures import calc_states -from aiida.common.exceptions import ConfigurationError, ModificationNotAllowed from aiida.common.folders import SandboxFolder from aiida.common.links import LinkType from aiida.common.log import get_dblogger_extra @@ -29,364 +29,353 @@ execlogger = aiidalogger.getChild('execmanager') -def update_job_calc_from_job_info(calc, job_info): - """ - Updates the job info for a JobCalculation using job information - as obtained from the scheduler. - - :param calc: The job calculation - :param job_info: the information returned by the scheduler for this job - :return: True if the job state is DONE, False otherwise - :rtype: bool - """ - calc._set_scheduler_state(job_info.job_state) - calc._set_last_jobinfo(job_info) - - return job_info.job_state in JOB_STATES.DONE - - -def update_job_calc_from_detailed_job_info(calc, detailed_job_info): - """ - Updates the detailed job info for a JobCalculation as obtained from - the scheduler - - :param calc: The job calculation - :param detailed_job_info: the detailed information as returned by the - scheduler for this job - """ - from aiida.scheduler.datastructures import JobInfo - - last_jobinfo = calc._get_last_jobinfo() - if last_jobinfo is None: - last_jobinfo = JobInfo() - last_jobinfo.job_id = calc.get_job_id() - last_jobinfo.job_state = JOB_STATES.DONE - - last_jobinfo.detailedJobinfo = detailed_job_info - calc._set_last_jobinfo(last_jobinfo) - - -def submit_calc(calc, authinfo, transport=None): +def submit_calculation(calculation, transport): """ Submit a calculation - :note: if no transport is passed, a new transport is opened and then - closed within this function. If you want to use an already opened - transport, pass it as further parameter. In this case, the transport - has to be already open, and must coincide with the transport of the - the computer defined by the authinfo. - - :param calc: the calculation to submit - (an instance of the aiida.orm.JobCalculation class) - :param authinfo: the AuthInfo object for this calculation. - :param transport: if passed, must be an already opened transport. No checks - are done on the consistency of the given transport with the transport - of the computer defined in the AuthInfo. + :param calculation: the instance of JobCalculation to submit. + :param transport: an already opened transport to use to submit the calculation. """ - from aiida.orm import Code, Computer + from aiida.orm import Code from aiida.common.exceptions import InputValidationError from aiida.orm.data.remote import RemoteData - if not authinfo.enabled: - return - - logger_extra = get_dblogger_extra(calc) + computer = calculation.get_computer() - if transport is None: - t = authinfo.get_transport() - must_open_t = True - else: - t = transport - must_open_t = False + if not computer.is_enabled(): + return - t._set_logger_extra(logger_extra) + logger_extra = get_dblogger_extra(calculation) + transport._set_logger_extra(logger_extra) - if calc._has_cached_links(): + if calculation._has_cached_links(): raise ValueError("Cannot submit calculation {} because it has " "cached input links! If you " "just want to test the submission, use the " "test_submit() method, otherwise store all links" - "first".format(calc.pk)) - - # Double check, in the case the calculation was 'killed' (and therefore - # put in the 'FAILED' state) in the meantime - # Do it as near as possible to the state change below (it would be - # even better to do it with some sort of transaction) - if calc.get_state() != calc_states.TOSUBMIT: - raise ValueError("Can only submit calculations with state=TOSUBMIT! " - "(state of calc {} is {} instead)".format(calc.pk, - calc.get_state())) - # I start to submit the calculation: I set the state - try: - calc._set_state(calc_states.SUBMITTING) - except ModificationNotAllowed: - raise ValueError("The calculation has already been submitted by " - "someone else!") + "first".format(calculation.pk)) + + s = computer.get_scheduler() + s.set_transport(transport) + + with SandboxFolder() as folder: + calcinfo, script_filename = calculation._presubmit(folder, use_unstored_links=False) + + codes_info = calcinfo.codes_info + input_codes = [load_node(_.code_uuid, sub_class=Code) for _ in codes_info] + + for code in input_codes: + if not code.can_run_on(computer): + raise InputValidationError( + "The selected code {} for calculation " + "{} cannot run on computer {}". + format(code.pk, calculation.pk, computer.name)) + + # After this call, no modifications to the folder should be done + calculation._store_raw_input_folder(folder.abspath) + + # NOTE: some logic is partially replicated in the 'test_submit' + # method of JobCalculation. If major logic changes are done + # here, make sure to update also the test_submit routine + remote_user = transport.whoami() + # TODO Doc: {username} field + # TODO: if something is changed here, fix also 'verdi computer test' + remote_working_directory = computer.get_workdir().format( + username=remote_user) + if not remote_working_directory.strip(): + raise exceptions.ConfigurationError( + "[submission of calculation {}] " + "No remote_working_directory configured for computer " + "'{}'".format(calculation.pk, computer.name)) + + # If it already exists, no exception is raised + try: + transport.chdir(remote_working_directory) + except IOError: + execlogger.debug( + "[submission of calculation {}] " + "Unable to chdir in {}, trying to create it". + format(calculation.pk, remote_working_directory), + extra=logger_extra) + try: + transport.makedirs(remote_working_directory) + transport.chdir(remote_working_directory) + except (IOError, OSError) as e: + raise exceptions.ConfigurationError( + "[submission of calculation {}] " + "Unable to create the remote directory {} on " + "computer '{}': {}". + format(calculation.pk, remote_working_directory, computer.name, + e.message)) + # Store remotely with sharding (here is where we choose + # the folder structure of remote jobs; then I store this + # in the calculation properties using _set_remote_dir + # and I do not have to know the logic, but I just need to + # read the absolute path from the calculation properties. + transport.mkdir(calcinfo.uuid[:2], ignore_existing=True) + transport.chdir(calcinfo.uuid[:2]) + transport.mkdir(calcinfo.uuid[2:4], ignore_existing=True) + transport.chdir(calcinfo.uuid[2:4]) + transport.mkdir(calcinfo.uuid[4:]) + transport.chdir(calcinfo.uuid[4:]) + workdir = transport.getcwd() + # I store the workdir of the calculation for later file + # retrieval + calculation._set_remote_workdir(workdir) + + # I first create the code files, so that the code can put + # default files to be overwritten by the plugin itself. + # Still, beware! The code file itself could be overwritten... + # But I checked for this earlier. + for code in input_codes: + if code.is_local(): + # Note: this will possibly overwrite files + for f in code.get_folder_list(): + transport.put(code.get_abs_path(f), f) + transport.chmod(code.get_local_executable(), 0o755) # rwxr-xr-x + + # copy all files, recursively with folders + for f in folder.get_content_list(): + execlogger.debug("[submission of calculation {}] " + "copying file/folder {}...".format(calculation.pk, f), + extra=logger_extra) + transport.put(folder.get_abs_path(f), f) + + # local_copy_list is a list of tuples, + # each with (src_abs_path, dest_rel_path) + # NOTE: validation of these lists are done + # inside calculation._presubmit() + local_copy_list = calcinfo.local_copy_list + remote_copy_list = calcinfo.remote_copy_list + remote_symlink_list = calcinfo.remote_symlink_list + + if local_copy_list is not None: + for src_abs_path, dest_rel_path in local_copy_list: + execlogger.debug("[submission of calculation {}] " + "copying local file/folder to {}".format( + calculation.pk, dest_rel_path), + extra=logger_extra) + transport.put(src_abs_path, dest_rel_path) + + if remote_copy_list is not None: + for (remote_computer_uuid, remote_abs_path, + dest_rel_path) in remote_copy_list: + if remote_computer_uuid == computer.uuid: + execlogger.debug("[submission of calculation {}] " + "copying {} remotely, directly on the machine " + "{}".format(calculation.pk, dest_rel_path, computer.name)) + try: + transport.copy(remote_abs_path, dest_rel_path) + except (IOError, OSError): + execlogger.warning("[submission of calculation {}] " + "Unable to copy remote resource from {} to {}! " + "Stopping.".format(calculation.pk, + remote_abs_path, dest_rel_path), + extra=logger_extra) + raise + else: + # TODO: implement copy between two different + # machines! + raise NotImplementedError( + "[presubmission of calculation {}] " + "Remote copy between two different machines is " + "not implemented yet".format(calculation.pk)) + + if remote_symlink_list is not None: + for (remote_computer_uuid, remote_abs_path, + dest_rel_path) in remote_symlink_list: + if remote_computer_uuid == computer.uuid: + execlogger.debug("[submission of calculation {}] " + "copying {} remotely, directly on the machine " + "{}".format(calculation.pk, dest_rel_path, computer.name)) + try: + transport.symlink(remote_abs_path, dest_rel_path) + except (IOError, OSError): + execlogger.warning("[submission of calculation {}] " + "Unable to create remote symlink from {} to {}! " + "Stopping.".format(calculation.pk, + remote_abs_path, dest_rel_path), + extra=logger_extra) + raise + else: + raise IOError("It is not possible to create a symlink " + "between two different machines for " + "calculation {}".format(calculation.pk)) + + remotedata = RemoteData(computer=computer, remote_path=workdir) + remotedata.add_link_from(calculation, label='remote_folder', link_type=LinkType.CREATE) + remotedata.store() + + job_id = s.submit_from_script(transport.getcwd(), script_filename) + calculation._set_job_id(job_id) + + +def update_calculation(calculation, transport): + """ + Update the scheduler state of a calculation - try: - if must_open_t: - t.open() + :param calculation: the instance of JobCalculation to update. + :param transport: an already opened transport to use to query the scheduler + """ + scheduler = calculation.get_computer().get_scheduler() + scheduler.set_transport(transport) - s = authinfo.computer.get_scheduler() - s.set_transport(t) + job_id = calculation.get_job_id() - computer = calc.get_computer() + kwargs = {'as_dict': True} - with SandboxFolder() as folder: - calcinfo, script_filename = calc._presubmit( - folder, use_unstored_links=False) - - codes_info = calcinfo.codes_info - input_codes = [load_node(_.code_uuid, sub_class=Code) - for _ in codes_info] - - for code in input_codes: - if not code.can_run_on(computer): - raise InputValidationError( - "The selected code {} for calculation " - "{} cannot run on computer {}". - format(code.pk, calc.pk, computer.name)) - - # After this call, no modifications to the folder should be done - calc._store_raw_input_folder(folder.abspath) - - # NOTE: some logic is partially replicated in the 'test_submit' - # method of JobCalculation. If major logic changes are done - # here, make sure to update also the test_submit routine - remote_user = t.whoami() - # TODO Doc: {username} field - # TODO: if something is changed here, fix also 'verdi computer test' - remote_working_directory = authinfo.get_workdir().format( - username=remote_user) - if not remote_working_directory.strip(): - raise ConfigurationError( - "[submission of calc {}] " - "No remote_working_directory configured for computer " - "'{}'".format(calc.pk, computer.name)) - - # If it already exists, no exception is raised - try: - t.chdir(remote_working_directory) - except IOError: - execlogger.debug( - "[submission of calc {}] " - "Unable to chdir in {}, trying to create it". - format(calc.pk, remote_working_directory), - extra=logger_extra) - try: - t.makedirs(remote_working_directory) - t.chdir(remote_working_directory) - except (IOError, OSError) as e: - raise ConfigurationError( - "[submission of calc {}] " - "Unable to create the remote directory {} on " - "computer '{}': {}". - format(calc.pk, remote_working_directory, computer.name, - e.message)) - # Store remotely with sharding (here is where we choose - # the folder structure of remote jobs; then I store this - # in the calculation properties using _set_remote_dir - # and I do not have to know the logic, but I just need to - # read the absolute path from the calculation properties. - t.mkdir(calcinfo.uuid[:2], ignore_existing=True) - t.chdir(calcinfo.uuid[:2]) - t.mkdir(calcinfo.uuid[2:4], ignore_existing=True) - t.chdir(calcinfo.uuid[2:4]) - t.mkdir(calcinfo.uuid[4:]) - t.chdir(calcinfo.uuid[4:]) - workdir = t.getcwd() - # I store the workdir of the calculation for later file - # retrieval - calc._set_remote_workdir(workdir) - - # I first create the code files, so that the code can put - # default files to be overwritten by the plugin itself. - # Still, beware! The code file itself could be overwritten... - # But I checked for this earlier. - for code in input_codes: - if code.is_local(): - # Note: this will possibly overwrite files - for f in code.get_folder_list(): - t.put(code.get_abs_path(f), f) - t.chmod(code.get_local_executable(), 0o755) # rwxr-xr-x - - # copy all files, recursively with folders - for f in folder.get_content_list(): - execlogger.debug("[submission of calc {}] " - "copying file/folder {}...".format(calc.pk, f), - extra=logger_extra) - t.put(folder.get_abs_path(f), f) - - # local_copy_list is a list of tuples, - # each with (src_abs_path, dest_rel_path) - # NOTE: validation of these lists are done - # inside calc._presubmit() - local_copy_list = calcinfo.local_copy_list - remote_copy_list = calcinfo.remote_copy_list - remote_symlink_list = calcinfo.remote_symlink_list - - if local_copy_list is not None: - for src_abs_path, dest_rel_path in local_copy_list: - execlogger.debug("[submission of calc {}] " - "copying local file/folder to {}".format( - calc.pk, dest_rel_path), - extra=logger_extra) - t.put(src_abs_path, dest_rel_path) - - if remote_copy_list is not None: - for (remote_computer_uuid, remote_abs_path, - dest_rel_path) in remote_copy_list: - if remote_computer_uuid == computer.uuid: - execlogger.debug("[submission of calc {}] " - "copying {} remotely, directly on the machine " - "{}".format(calc.pk, dest_rel_path, computer.name)) - try: - t.copy(remote_abs_path, dest_rel_path) - except (IOError, OSError): - execlogger.warning("[submission of calc {}] " - "Unable to copy remote resource from {} to {}! " - "Stopping.".format(calc.pk, - remote_abs_path, dest_rel_path), - extra=logger_extra) - raise - else: - # TODO: implement copy between two different - # machines! - raise NotImplementedError( - "[presubmission of calc {}] " - "Remote copy between two different machines is " - "not implemented yet".format(calc.pk)) - - if remote_symlink_list is not None: - for (remote_computer_uuid, remote_abs_path, - dest_rel_path) in remote_symlink_list: - if remote_computer_uuid == computer.uuid: - execlogger.debug("[submission of calc {}] " - "copying {} remotely, directly on the machine " - "{}".format(calc.pk, dest_rel_path, computer.name)) - try: - t.symlink(remote_abs_path, dest_rel_path) - except (IOError, OSError): - execlogger.warning("[submission of calc {}] " - "Unable to create remote symlink from {} to {}! " - "Stopping.".format(calc.pk, - remote_abs_path, dest_rel_path), - extra=logger_extra) - raise - else: - raise IOError("It is not possible to create a symlink " - "between two different machines for " - "calculation {}".format(calc.pk)) - - remotedata = RemoteData(computer=computer, remote_path=workdir) - remotedata.add_link_from(calc, label='remote_folder', link_type=LinkType.CREATE) - remotedata.store() - - job_id = s.submit_from_script(t.getcwd(), script_filename) - calc._set_job_id(job_id) - # This should always be possible, because we should be - # the only ones submitting this calculations, - # so I do not check the ModificationNotAllowed - calc._set_state(calc_states.WITHSCHEDULER) - - execlogger.debug("submitted calculation {} on {} with " - "jobid {}".format(calc.pk, computer.name, job_id), - extra=logger_extra) + if scheduler.get_feature('can_query_by_user'): + kwargs['user'] = "$USER" + else: + # In general schedulers can either query by user or by jobs, but not both + # (see also docs of the Scheduler class) + kwargs['jobs'] = [job_id] - except Exception as e: - import traceback + found_jobs = scheduler.getJobs(**kwargs) - execlogger.error('Submission of calc {} failed, check also the log file! Traceback: {}'.format( - calc.pk, traceback.format_exc()), extra=logger_extra) + info = found_jobs.get(job_id, None) + if info is None: + # If the job is computed or not found assume it's done + job_done = True + calculation._set_scheduler_state(JOB_STATES.DONE) + else: + update_job_calc_from_job_info(calculation, info) + job_done = info.job_state == JOB_STATES.DONE + + if job_done: + # If the job is done, also get detailed job info try: - calc._set_state(calc_states.SUBMISSIONFAILED) - except ModificationNotAllowed: - execlogger.debug('failed to set state of calculation<{}> to SUBMISSIONFAILED'.format( - calc.pk, calc_states.SUBMISSIONFAILED), extra=logger_extra) - pass + detailed_job_info = scheduler.get_detailed_jobinfo(job_id) + except exceptions.FeatureNotAvailable: + detailed_job_info = ('This scheduler does not implement get_detailed_jobinfo') - raise + update_job_calc_from_detailed_job_info(calculation, detailed_job_info) - finally: - # close the transport, but only if it was opened within this function - if must_open_t: - t.close() + return job_done -def retrieve_all(job, transport, retrieved_temporary_folder): +def retrieve_calculation(calculation, transport, retrieved_temporary_folder): """ - Retrieve all the files of a completed job calculation using the given transport. If the job defined - anything in the `retrieve_temporary_list`, those entries will be stored in the `retrieved_temporary_folder`. - The caller is responsible for creating and destroying this folder. + Retrieve all the files of a completed job calculation using the given transport. - :param job: the finished JobCalculation whose files to retrieve - :param transport: the Transport instance to use for the file retrieval + If the job defined anything in the `retrieve_temporary_list`, those entries will be stored in the + `retrieved_temporary_folder`. The caller is responsible for creating and destroying this folder. + + :param calculation: the instance of JobCalculation to update. + :param transport: an already opened transport to use for the retrieval. :param retrieved_temporary_folder: the absolute path to a directory in which to store the files listed, if any, in the `retrieved_temporary_folder` of the jobs CalcInfo """ - logger_extra = get_dblogger_extra(job) - - try: - job._set_state(calc_states.RETRIEVING) - except ModificationNotAllowed: - # Someone else has already started to retrieve it, - # just log and continue - execlogger.debug( - "Attempting to retrieve more than once " - "calculation {}: skipping!".format(job.pk), - extra=logger_extra - ) - return + logger_extra = get_dblogger_extra(calculation) - execlogger.debug("Retrieving calc {}".format(job.pk), extra=logger_extra) - workdir = job._get_remote_workdir() + execlogger.debug("Retrieving calc {}".format(calculation.pk), extra=logger_extra) + workdir = calculation._get_remote_workdir() execlogger.debug( - "[retrieval of calc {}] chdir {}".format(job.pk, workdir), + "[retrieval of calc {}] chdir {}".format(calculation.pk, workdir), extra=logger_extra) # Create the FolderData node to attach everything to retrieved_files = FolderData() retrieved_files.add_link_from( - job, label=job._get_linkname_retrieved(), + calculation, label=calculation._get_linkname_retrieved(), link_type=LinkType.CREATE) with transport: transport.chdir(workdir) # First, retrieve the files of folderdata - retrieve_list = job._get_retrieve_list() - retrieve_temporary_list = job._get_retrieve_temporary_list() - retrieve_singlefile_list = job._get_retrieve_singlefile_list() + retrieve_list = calculation._get_retrieve_list() + retrieve_temporary_list = calculation._get_retrieve_temporary_list() + retrieve_singlefile_list = calculation._get_retrieve_singlefile_list() with SandboxFolder() as folder: - retrieve_files_from_list(job, transport, folder.abspath, retrieve_list) + retrieve_files_from_list(calculation, transport, folder.abspath, retrieve_list) # Here I retrieved everything; now I store them inside the calculation retrieved_files.replace_with_folder(folder.abspath, overwrite=True) # Second, retrieve the singlefiles with SandboxFolder() as folder: - _retrieve_singlefiles(job, transport, folder, retrieve_singlefile_list, logger_extra) + _retrieve_singlefiles(calculation, transport, folder, retrieve_singlefile_list, logger_extra) # Retrieve the temporary files in the retrieved_temporary_folder if any files were # specified in the 'retrieve_temporary_list' key if retrieve_temporary_list: - retrieve_files_from_list(job, transport, retrieved_temporary_folder, retrieve_temporary_list) + retrieve_files_from_list(calculation, transport, retrieved_temporary_folder, retrieve_temporary_list) # Log the files that were retrieved in the temporary folder for filename in os.listdir(retrieved_temporary_folder): execlogger.debug("[retrieval of calc {}] Retrieved temporary file or folder '{}'".format( - job.pk, filename), extra=logger_extra) + calculation.pk, filename), extra=logger_extra) # Store everything execlogger.debug( "[retrieval of calc {}] " - "Storing retrieved_files={}".format(job.pk, retrieved_files.dbnode.pk), + "Storing retrieved_files={}".format(calculation.pk, retrieved_files.dbnode.pk), extra=logger_extra) retrieved_files.store() +def kill_calculation(calculation, transport): + """ + Kill the calculation through the scheduler + + :param calculation: the instance of JobCalculation to kill. + :param transport: an already opened transport to use to address the scheduler + """ + job_id = calculation.get_job_id() + + # Get the scheduler plugin class and initialize it with the correct transport + scheduler = calculation.get_computer().get_scheduler() + scheduler.set_transport(transport) + + # Call the proper kill method for the job ID of this calculation + result = scheduler.kill(job_id) + + if result is not True: + raise exceptions.RemoteOperationError('scheduler.kill({}) was unsuccessful'.format(job_id)) + else: + calculation._set_scheduler_state(JOB_STATES.DONE) + + +def update_job_calc_from_job_info(calc, job_info): + """ + Updates the job info for a JobCalculation using job information + as obtained from the scheduler. + + :param calc: The job calculation + :param job_info: the information returned by the scheduler for this job + :return: True if the job state is DONE, False otherwise + :rtype: bool + """ + calc._set_scheduler_state(job_info.job_state) + calc._set_last_jobinfo(job_info) + + return job_info.job_state in JOB_STATES.DONE + + +def update_job_calc_from_detailed_job_info(calc, detailed_job_info): + """ + Updates the detailed job info for a JobCalculation as obtained from + the scheduler + + :param calc: The job calculation + :param detailed_job_info: the detailed information as returned by the + scheduler for this job + """ + from aiida.scheduler.datastructures import JobInfo + + last_jobinfo = calc._get_last_jobinfo() + if last_jobinfo is None: + last_jobinfo = JobInfo() + last_jobinfo.job_id = calc.get_job_id() + last_jobinfo.job_state = JOB_STATES.DONE + + last_jobinfo.detailedJobinfo = detailed_job_info + calc._set_last_jobinfo(last_jobinfo) + + def parse_results(job, retrieved_temporary_folder=None): """ Parse the results for a given JobCalculation (job) @@ -448,7 +437,7 @@ def parse_results(job, retrieved_temporary_folder=None): job._set_state(calc_states.FINISHED) else: job._set_state(calc_states.FAILED) - except ModificationNotAllowed: + except exceptions.ModificationNotAllowed: # I should have been the only one to set it, but # in order to avoid useless error messages, I just ignore pass diff --git a/aiida/orm/implementation/sqlalchemy/group.py b/aiida/orm/implementation/sqlalchemy/group.py index 7a40853465..184b49f230 100644 --- a/aiida/orm/implementation/sqlalchemy/group.py +++ b/aiida/orm/implementation/sqlalchemy/group.py @@ -204,18 +204,19 @@ def add_nodes(self, nodes): def nodes(self): class iterator(object): def __init__(self, dbnodes): - self.dbnodes = dbnodes + self._dbnodes = dbnodes + self._iter = dbnodes.__iter__() self.generator = self._genfunction() def _genfunction(self): - for n in self.dbnodes: + for n in self._iter: yield n.get_aiida_class() def __iter__(self): return self def __len__(self): - return len(self.dbnodes) + return self._dbnodes.count() # For future python-3 compatibility def __next__(self): @@ -224,7 +225,7 @@ def __next__(self): def next(self): return next(self.generator) - return iterator(self._dbgroup.dbnodes.all()) + return iterator(self._dbgroup.dbnodes) def remove_nodes(self, nodes): if not self.is_stored: diff --git a/aiida/transport/plugins/local.py b/aiida/transport/plugins/local.py index a772597a21..8e1570110d 100644 --- a/aiida/transport/plugins/local.py +++ b/aiida/transport/plugins/local.py @@ -26,8 +26,8 @@ import StringIO import glob +from aiida.transport import cli as transport_cli from aiida.transport.transport import Transport, TransportInternalError -from aiida import transport # refactor or raise the limit: issue #1784 @@ -871,4 +871,4 @@ def _get_safe_interval_suggestion_string(cls, computer): return cls._DEFAULT_SAFE_OPEN_INTERVAL -CONFIGURE_LOCAL_CMD = transport.cli.create_configure_cmd('local') +CONFIGURE_LOCAL_CMD = transport_cli.create_configure_cmd('local') diff --git a/aiida/work/job_processes.py b/aiida/work/job_processes.py index 99ab32402e..570af20013 100644 --- a/aiida/work/job_processes.py +++ b/aiida/work/job_processes.py @@ -8,10 +8,13 @@ # For further information please visit http://www.aiida.net # ########################################################################### from collections import namedtuple +import functools +import logging import shutil import sys import tempfile -import tornado.gen +import traceback +from tornado.gen import coroutine, Return import plumpy from plumpy.ports import PortNamespace @@ -22,8 +25,8 @@ from aiida.daemon import execmanager from aiida.orm.calculation.job import JobCalculation from aiida.orm.calculation.job import JobCalculationExitStatus -from aiida.scheduler.datastructures import JOB_STATES from aiida.work.process_builder import JobProcessBuilder +from aiida.work.utils import exponential_backoff_retry from . import persistence from . import processes @@ -37,170 +40,188 @@ class TransportTaskException(Exception): - def __init__(self, calc_state): - self.calc_state = calc_state + pass -@tornado.gen.coroutine -def submit_job(calc_node, transport_queue, cancelled_flag): - """ A task to submit a job calculation """ +LOGGER = logging.getLogger(__name__) - authinfo = calc_node.get_computer().get_authinfo(calc_node.get_user()) + +@coroutine +def task_submit_job(node, transport_queue, cancelled_flag): + """ + Transport task that will attempt to submit a job calculation + + The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager + function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will + retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. + If all retries fail, the task will raise a TransportTaskException + + :param node: the node that represents the job calculation + :param transport_queue: the TransportQueue from which to request a Transport + :param cancelled_flag: the cancelled flag that will be queried to determine whethr the job was cancelled + :raises: Return if the tasks was successfully completed + :raises: TransportTaskException if after the maximum number of retries the transport task still excepted + """ + initial_interval = 1 + max_attempts = 1 + + authinfo = node.get_computer().get_authinfo(node.get_user()) with transport_queue.request_transport(authinfo) as request: transport = yield request + # It may have taken time to get the transport, check if we've been cancelled if cancelled_flag.cancelled: - raise plumpy.CancelledError("Submit cancelled") + raise plumpy.CancelledError('task_submit_job for calculation<{}> cancelled'.format(node.pk)) + + LOGGER.info('submitting calculation<{}>'.format(node.pk)) + node._set_state(calc_states.SUBMITTING) - calc_node.logger.info('Submitting calculation<{}>'.format(calc_node.pk)) try: - execmanager.submit_calc(calc_node, authinfo, transport) + corout = functools.partial(execmanager.submit_calculation, node, transport) + result = yield exponential_backoff_retry(corout, initial_interval, max_attempts, logger=node.logger) except Exception: - import traceback - calc_node.logger.debug("Submission failed:\n{}".format(traceback.format_exc())) - raise TransportTaskException(calc_states.SUBMISSIONFAILED) - + LOGGER.warning('submitting calculation<{}> failed:\n{}'.format(node.pk, traceback.format_exc())) + node._set_state(calc_states.SUBMISSIONFAILED) + raise TransportTaskException('submit_calculation failed {} times consecutively'.format(max_attempts)) + else: + LOGGER.info('submitting calculation<{}> successful'.format(node.pk)) + node._set_state(calc_states.WITHSCHEDULER) + raise Return(result) -@tornado.gen.coroutine -def update_scheduler_state(calc_node, transport_queue, cancelled_flag): - calc_node.logger.info('Updating scheduler state calculation<{}>'.format(calc_node.pk)) - authinfo = calc_node.get_computer().get_authinfo(calc_node.get_user()) +@coroutine +def task_update_job(node, transport_queue, cancelled_flag): + """ + Transport task that will attempt to update the scheduler state of a job calculation + + The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager + function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will + retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. + If all retries fail, the task will raise a TransportTaskException + + :param node: the node that represents the job calculation + :param transport_queue: the TransportQueue from which to request a Transport + :param cancelled_flag: the cancelled flag that will be queried to determine whethr the job was cancelled + :raises: Return if the tasks was successfully completed + :raises: TransportTaskException if after the maximum number of retries the transport task still excepted + """ + initial_interval = 1 + max_attempts = 5 - # We are the only ones to set the calc state to COMPUTED, so if it is set here - # it was already completed in a previous task that got shutdown and reactioned - if calc_node.get_state() == calc_states.COMPUTED: - raise tornado.gen.Return(True) + authinfo = node.get_computer().get_authinfo(node.get_user()) with transport_queue.request_transport(authinfo) as request: transport = yield request # It may have taken time to get the transport, check if we've been cancelled if cancelled_flag.cancelled: - raise plumpy.CancelledError("Update cancelled") + raise plumpy.CancelledError('task_update_job for calculation<{}> cancelled'.format(node.pk)) - scheduler = calc_node.get_computer().get_scheduler() - scheduler.set_transport(transport) + LOGGER.info('updating calculation<{}>'.format(node.pk)) - job_id = calc_node.get_job_id() - - kwargs = {'as_dict': True} - if scheduler.get_feature('can_query_by_user'): - kwargs['user'] = "$USER" - else: - # In general schedulers can either query by user or by jobs, but not both - # (see also docs of the Scheduler class) - kwargs['jobs'] = [job_id] - found_jobs = scheduler.getJobs(**kwargs) - - info = found_jobs.get(job_id, None) - if info is None: - # If the job is computed or not found assume it's done - job_done = True - calc_node._set_scheduler_state(JOB_STATES.DONE) + try: + corout = functools.partial(execmanager.update_calculation, node, transport) + result = yield exponential_backoff_retry(corout, initial_interval, max_attempts, logger=node.logger) + except Exception: + LOGGER.warning('updating calculation<{}> failed:\n{}'.format(node.pk, traceback.format_exc())) + node._set_state(calc_states.FAILED) + raise TransportTaskException('update_calculation failed {} times consecutively'.format(max_attempts)) else: - execmanager.update_job_calc_from_job_info(calc_node, info) - - job_done = info.job_state == JOB_STATES.DONE + LOGGER.info('updating calculation<{}> successful'.format(node.pk)) + if result: + node._set_state(calc_states.COMPUTED) + raise Return(result) - if job_done: - # If the job is done, also get detailed job info - try: - detailed_job_info = scheduler.get_detailed_jobinfo(job_id) - except exceptions.FeatureNotAvailable: - detailed_job_info = ( - u"AiiDA MESSAGE: This scheduler does not implement " - u"the routine get_detailed_jobinfo to retrieve " - u"the information on " - u"a job after it has finished.") - - execmanager.update_job_calc_from_detailed_job_info(calc_node, detailed_job_info) - calc_node._set_state(calc_states.COMPUTED) - - raise tornado.gen.Return(job_done) +@coroutine +def task_retrieve_job(node, transport_queue, cancelled_flag, retrieved_temporary_folder): + """ + Transport task that will attempt to retrieve all files of a completed job calculation + + The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager + function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will + retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. + If all retries fail, the task will raise a TransportTaskException + + :param node: the node that represents the job calculation + :param transport_queue: the TransportQueue from which to request a Transport + :param cancelled_flag: the cancelled flag that will be queried to determine whethr the job was cancelled + :raises: Return if the tasks was successfully completed + :raises: TransportTaskException if after the maximum number of retries the transport task still excepted + """ + initial_interval = 1 + max_attempts = 5 -@tornado.gen.coroutine -def retrieve_job(calc_node, transport_queue, retrieved_temporary_folder, cancelled_flag): - """ This returns the retrieved temporary folder """ - calc_node.logger.info('Retrieving completed calculation<{}>'.format(calc_node.pk)) + authinfo = node.get_computer().get_authinfo(node.get_user()) - authinfo = calc_node.get_computer().get_authinfo(calc_node.get_user()) with transport_queue.request_transport(authinfo) as request: transport = yield request # It may have taken time to get the transport, check if we've been cancelled if cancelled_flag.cancelled: - raise plumpy.CancelledError("Retrieve cancelled") + raise plumpy.CancelledError('task_retrieve_job for calculation<{}> cancelled'.format(node.pk)) + + LOGGER.info('retrieving calculation<{}>'.format(node.pk)) + node._set_state(calc_states.RETRIEVING) try: - folder = execmanager.retrieve_all(calc_node, transport, retrieved_temporary_folder) + corout = functools.partial(execmanager.retrieve_calculation, node, transport, retrieved_temporary_folder) + result = yield exponential_backoff_retry(corout, initial_interval, max_attempts, logger=node.logger) except Exception: - import traceback - calc_node.logger.debug("Retrieval failed:\n{}".format(traceback.format_exc())) - raise TransportTaskException(calc_states.RETRIEVALFAILED) + LOGGER.warning('retrieving calculation<{}> failed:\n{}'.format(node.pk, traceback.format_exc())) + node._set_state(calc_states.RETRIEVALFAILED) + raise TransportTaskException('retrieve_calculation failed {} times consecutively'.format(max_attempts)) else: - # WARNING: This has to be here and not inside the try, otherwise - # it will get swallowed up the general Exception catch - raise tornado.gen.Return(folder) + LOGGER.info('retrieving calculation<{}> successful'.format(node.pk)) + raise Return(result) -def kill_job(calc_node, transport_queue, cancelled_flag): +@coroutine +def task_kill_job(node, transport_queue, cancelled_flag): """ - Kill a calculation on the cluster. - - Can only be called if the calculation is in status WITHSCHEDULER. - - The command tries to run the kill command as provided by the scheduler, - and raises an exception is something goes wrong. - No changes of calculation status are done (they will be done later by - the calculation manager). - - .. todo: if the status is TOSUBMIT, check with some lock that it is not - actually being submitted at the same time in another thread. + Transport task that will attempt to kill a job calculation + + The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager + function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will + retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. + If all retries fail, the task will raise a TransportTaskException + + :param node: the node that represents the job calculation + :param transport_queue: the TransportQueue from which to request a Transport + :param cancelled_flag: the cancelled flag that will be queried to determine whethr the job was cancelled + :raises: Return if the tasks was successfully completed + :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ + initial_interval = 1 + max_attempts = 5 - job_id = calc_node.get_job_id() - calc_state = calc_node.get_state() - - if calc_state == calc_states.NEW or calc_state == calc_states.TOSUBMIT: - calc_node._set_state(calc_states.FAILED) - calc_node._set_scheduler_state(JOB_STATES.DONE) - calc_node.logger.warning( - "Calculation {} killed by the user (it was in {} state)".format(calc_node.pk, calc_state)) - raise tornado.gen.Return(True) + authinfo = node.get_computer().get_authinfo(node.get_user()) - if calc_state != calc_states.WITHSCHEDULER: - raise InvalidOperation("Cannot kill a calculation in {} state".format(calc_state)) - - authinfo = calc_node.get_computer().get_authinfo(calc_node.get_user()) + if node.get_state() in [calc_states.NEW, calc_states.TOSUBMIT]: + node._set_state(calc_states.FAILED) + LOGGER.warning('calculation<{}> killed, it was in the {} state'.format(node.pk, node.get_state())) + raise Return(True) with transport_queue.request_transport(authinfo) as request: transport = yield request # It may have taken time to get the transport, check if we've been cancelled if cancelled_flag.cancelled: - raise plumpy.CancelledError("Kill cancelled") - - # Get the scheduler plugin class and initialize it with the correct transport - scheduler = calc_node.get_computer().get_scheduler() - scheduler.set_transport(transport) + raise plumpy.CancelledError('task_kill_job for calculation<{}> cancelled'.format(node.pk)) - # Call the proper kill method for the job ID of this calculation - result = scheduler.kill(job_id) + LOGGER.info('killing calculation<{}>'.format(node.pk)) - # Raise error if something went wrong - if not result: - raise RemoteOperationError( - "An error occurred while trying to kill calculation {} (jobid {}), see log " - "(maybe the calculation already finished?)".format(calc_node.pk, job_id)) + try: + corout = functools.partial(execmanager.kill_calculation, node, transport) + result = yield exponential_backoff_retry(corout, initial_interval, max_attempts, logger=node.logger) + except Exception: + LOGGER.warning('killing calculation<{}> failed:\n{}'.format(node.pk, traceback.format_exc())) + node._set_state(calc_states.FAILED) + raise TransportTaskException('kill_calculation failed {} times consecutively'.format(max_attempts)) else: - calc_node._set_state(calc_states.FAILED) - calc_node._set_scheduler_state(JOB_STATES.DONE) - calc_node.logger.warning('Calculation<{}> killed by the user'.format(calc_node.pk)) - - raise tornado.gen.Return(result) + LOGGER.info('killing calculation<{}> successful'.format(node.pk)) + raise Return(result) class Waiting(plumpy.Waiting): @@ -219,9 +240,8 @@ def load_instance_state(self, saved_state, load_context): self._cancel_flag = self.CancelFlag(False) self._kill_future = None - @tornado.gen.coroutine + @coroutine def execute(self): - from tornado.gen import Return if self._kill_future: yield self._do_kill() @@ -233,41 +253,45 @@ def execute(self): try: if self.data == SUBMIT_COMMAND: - yield submit_job(calc, transport_queue, self._cancel_flag) + + yield task_submit_job(calc, transport_queue, self._cancel_flag) if self._kill_future: yield self._do_kill() - else: - # Now get scheduler updates - raise Return(self.scheduler_update()) + + # Now get scheduler updates + raise Return(self.scheduler_update()) elif self.data == UPDATE_SCHEDULER_COMMAND: + job_done = False + # Keep getting scheduler updates until done while not job_done: - job_done = yield update_scheduler_state(calc, transport_queue, self._cancel_flag) + job_done = yield task_update_job(calc, transport_queue, self._cancel_flag) + if self._kill_future: yield self._do_kill() - return # Done, go on to retrieve raise Return(self.retrieve()) elif self.data == RETRIEVE_COMMAND: + # Create a temporary folder that has to be deleted by JobProcess.retrieved after successful parsing retrieved_temporary_folder = tempfile.mkdtemp() - yield retrieve_job(calc, transport_queue, retrieved_temporary_folder, self._cancel_flag) + yield task_retrieve_job(calc, transport_queue, self._cancel_flag, retrieved_temporary_folder) if self._kill_future: yield self._do_kill() - else: - raise Return(self.retrieved(retrieved_temporary_folder)) + + raise Return(self.retrieved(retrieved_temporary_folder)) else: - raise RuntimeError("Unknown waiting command") + raise RuntimeError('Unknown waiting command') - except TransportTaskException as exception: - exit_status = JobCalculationExitStatus[exception.calc_state].value + except TransportTaskException: + exit_status = JobCalculationExitStatus[calc.get_state()].value raise Return(self.create_state(processes.ProcessState.FINISHED, exit_status, exit_status is 0)) except plumpy.CancelledError: # A task was cancelled because the state (and process) is being killed @@ -317,10 +341,10 @@ def retrieved(self, retrieved_temporary_folder): self.process.retrieved, retrieved_temporary_folder) - @tornado.gen.coroutine + @coroutine def _do_kill(self): try: - yield kill_job(self.process.calc, self.process.runner.transport, self._cancel_flag) + yield task_kill_job(self.process.calc, self.process.runner.transport, self._cancel_flag) except (InvalidOperation, RemoteOperationError): pass @@ -328,14 +352,13 @@ def _do_kill(self): self._kill_future.set_result(True) self._kill_future = None - raise tornado.gen.Return(self.create_state(processes.ProcessState.KILLED, 'Got killed yo')) + raise Return(self.create_state(processes.ProcessState.KILLED, 'Got killed yo')) def kill(self, msg=None): if self._kill_future is not None: return self._kill_future else: - if self.process.calc.get_state() in \ - [calc_states.NEW, calc_states.TOSUBMIT, calc_states.WITHSCHEDULER]: + if self.process.calc.get_state() in [calc_states.NEW, calc_states.TOSUBMIT, calc_states.WITHSCHEDULER]: self._kill_future = plumpy.Future() # Cancel the task self._cancel_flag.cancelled = True @@ -450,14 +473,12 @@ def on_excepted(self): """The Process excepted so we set the calculation and scheduler state.""" super(JobProcess, self).on_excepted() self.calc._set_state(calc_states.FAILED) - self.calc._set_scheduler_state(JOB_STATES.DONE) @override def on_killed(self): """The Process was killed so we set the calculation and scheduler state.""" super(JobProcess, self).on_excepted() self.calc._set_state(calc_states.FAILED) - self.calc._set_scheduler_state(JOB_STATES.DONE) @override def update_outputs(self): @@ -560,9 +581,7 @@ def retrieved(self, retrieved_temporary_folder=None): try: shutil.rmtree(retrieved_temporary_folder) except OSError as exception: - if exception.errno == 2: - pass - else: + if exception.errno != 2: raise # Finally link up the outputs and we're done diff --git a/aiida/work/persistence.py b/aiida/work/persistence.py index 93589e7ad9..8ee0c6a04e 100644 --- a/aiida/work/persistence.py +++ b/aiida/work/persistence.py @@ -56,7 +56,7 @@ def save_checkpoint(self, process, tag=None): :param tag: optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process :raises: :class:`plumpy.PersistenceError` Raised if there was a problem saving the checkpoint """ - LOGGER.info('Persisting process<%d>', process.pid) + LOGGER.debug('Persisting process<%d>', process.pid) if tag is not None: raise NotImplementedError('Checkpoint tags not supported yet') diff --git a/aiida/work/utils.py b/aiida/work/utils.py index fbe6cc6e51..e1139dae96 100644 --- a/aiida/work/utils.py +++ b/aiida/work/utils.py @@ -8,17 +8,12 @@ # For further information please visit http://www.aiida.net # ########################################################################### # pylint: disable=invalid-name -########################################################################### -# Copyright (c), The AiiDA team. All rights reserved. # -# This file is part of the AiiDA code. # -# # -# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # -# For further information on the license, see the LICENSE.txt file # -# For further information please visit http://www.aiida.net # -########################################################################### """Utilities for the workflow engine.""" import contextlib +import logging +import traceback import tornado.ioloop +from tornado.gen import coroutine, sleep, Return from aiida.common.links import LinkType from aiida.orm.calculation import Calculation, WorkCalculation, FunctionCalculation @@ -26,11 +21,70 @@ __all__ = [] +LOGGER = logging.getLogger(__name__) PROCESS_STATE_CHANGE_KEY = 'process|state_change|{}' PROCESS_STATE_CHANGE_DESCRIPTION = 'The last time a process of type {}, changed state' PROCESS_CALC_TYPES = (WorkCalculation, FunctionCalculation) +def ensure_coroutine(fct): + """ + Ensure that the given function ``fct`` is a coroutine + + If the passed function is not already a coroutine, it will be made to be a coroutine + + :param fct: the function + :returns: the coroutine + """ + if tornado.gen.is_coroutine_function(fct): + return fct + + @tornado.gen.coroutine + def wrapper(*args, **kwargs): + raise tornado.gen.Return(fct(*args, **kwargs)) + + return wrapper + + +@coroutine +def exponential_backoff_retry(fct, initial_interval=10.0, max_attempts=5, logger=None): + """ + Coroutine to call a function, recalling it with an exponential backoff in the case of an exception + + This coroutine will loop ``max_attempts`` times, calling the ``fct`` function, breaking immediately when the call + finished without raising an exception, at which point the returned result will be raised, wrapped in a + ``tornado.gen.Result`` instance. If an exception is caught, the function will yield a ``tornado.gen.sleep`` with a + time interval equal to the ``initial_interval`` multiplied by ``2*N`` where ``N`` is the number of excepted calls. + + :param fct: the function to call, which will be turned into a coroutine first if it is not already + :param initial_interval: the time to wait after the first caught exception before calling the coroutine again + :param max_attempts: the maximum number of times to call the coroutine before re-raising the exception + :raises: ``tornado.gen.Result`` if the ``coro`` call completes within ``max_attempts`` retries without raising + """ + if logger is None: + logger = LOGGER + + result = None + coro = ensure_coroutine(fct) + interval = initial_interval + + for iteration in range(max_attempts): + try: + result = yield coro() + break # Finished successfully + except Exception: # pylint: disable=broad-except + if iteration == max_attempts - 1: + logger.warning('maximum attempts %d of calling %s, exceeded', max_attempts, coro.__name__) + raise + else: + logger.warning('iteration %d of %s excepted, retrying after %d seconds\n%s', iteration + 1, + coro.__name__, interval, traceback.format_exc()) + yield sleep(interval) + interval *= 2 + + raise Return(result) + + def is_work_calc_type(calc_node): """ Check if the given calculation node is of the new type.