Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make process functions submittable #4539

Merged
merged 2 commits into from
Nov 8, 2020
Merged

Conversation

unkcpz
Copy link
Member

@unkcpz unkcpz commented Nov 3, 2020

Feature #2965 to make ProcessFunction submittable, which avoid process functions(which may run for a long time) from interrupt to excepted state when the daemon is shutdown.
However, this means function process(submittable one) should be persisted and can be recreate_from just like CalcJob and WorkChain, i.e. can be imported from the package.
So I guess this kind of process function must be declared in the entry_points? Does this violate the basic design of the function process?

pinning you for the comments. @sphuber @muhrin @giovannipizzi

# The local calcfunction can not be create, since class can
# not be found when create persistence from local declare function
# inputs = {'x': orm.Int(2)}
# node = launch.submit(local_calcfunction, **inputs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submit can not apply to the locally announced calcfunction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to be expected though. The same holds exactly for all other processes, like WorkChains. If you define a WorkChain locally, either in a file that is not in the PYTHONPATH or let's say an interactive shell, when you submit it, it will fail, because the daemon cannot import the corresponding class.

@greschd
Copy link
Member

greschd commented Nov 3, 2020

I think this is great, thanks @unkcpz

As you mention, there are restrictions on which process functions can be submitted. In general I don't see a problem with that, it's the same as for other processes.
However, there may be process functions already "in the wild" that are not submittable. I think it would make sense to give the end user (whoever runs or submits the process function) some way of distinguishing whether a given process function can be submitted. That could be e.g. a utils.is_submittable(process_function), or a property of the process functions themselves.

Can we determine in code whether a process function is submittable, or is it something the developer needs to tell us?

For some context, in aiida-optimize I have the following code, that automatically selects whether to run or submit a sub-process (link):

    def run_or_submit(self, proc: Process, **kwargs: ty.Any) -> orm.ProcessNode:
        if utils.is_process_function(proc):
            _, node = run_get_node(proc, **kwargs)
            return node
        return self.submit(proc, **kwargs)

I wouldn't know how to re-write that to take advantage of being able to submit only some process functions.

@sphuber
Copy link
Contributor

sphuber commented Nov 3, 2020

Can we determine in code whether a process function is submittable, or is it something the developer needs to tell us?

Really, for something to be "submittable", simply means that a daemon worker can properly execute the Python code, whether that be a class in the case of a WorkChain or CalcJob, or a function in the case of a process function. That means that if the resource can be imported, there should be no problem. The easiest is through entry points, but it can also be done through regular old imports. As long as the function is in the PYTHONPATH and the daemon has it correctly defined, it should be able to read it.

For some context, in aiida-optimize I have the following code, that automatically selects whether to run or submit a sub-process

Note that once process functions are submittable, this will no longer be necessary. Then you can always just call self.submit to launch a sub-process. By calling the submit of the parent process, which is required (calling aiida.engine.submit will raise), it will use the submit method of the underlying Runner, which will automatically replicate the behavior with which the top process was launched. If the top process was launched with run, all subprocesses will also be run in the sense that they won't be sent to RabbitMQ but run in the same event loop of the same runner. Likewise, if the top process was submitted, each sub process launched through self.submit will also be submitted over RabbitMQ and be picked up by a (possibly different) daemon runner.

@greschd
Copy link
Member

greschd commented Nov 3, 2020

Right, my concern is what happens with process functions that are not top-level importable and executable - such as the locally-defined example made by @unkcpz in the tests.

I don't really know how common these are - but since that was perfectly legal up until now, I would like my code to continue working with them. Ideally it would also take advantage of being able to submit process functions (when possible), so that's where I would like to distinguish them.

@unkcpz
Copy link
Member Author

unkcpz commented Nov 3, 2020

The easiest is through entry points, but it can also be done through regular old imports. As long as the function is in the PYTHONPATH and the daemon has it correctly defined, it should be able to read it.

@sphuber thanks for pointing it out. I just check and find there is are no hints for users when the process is not importable. Is that necessary to make the exceptional info more readable?

@sphuber
Copy link
Contributor

sphuber commented Nov 3, 2020

I don't really know how common these are - but since that was perfectly legal up until now, I would like my code to continue working with them. Ideally it would also take advantage of being able to submit process functions (when possible), so that's where I would like to distinguish them.

Existing code should not be affected, because currently they can only be run directly in the local interpreter. Since they are defined there, they are importable by definition. Or am I missing something?

@unkcpz
Copy link
Member Author

unkcpz commented Nov 3, 2020

Then I will just add this feature to the docs. Do you have any suggestions of designs or changes I should add in code?

@sphuber
Copy link
Contributor

sphuber commented Nov 3, 2020

thanks for pointing it out. I just check and find there is are no hints for users when the process is not importable. Is that necessary to make the exceptional info more readable?

It is really difficult to do this on the side that is submitting the function. The reason is that it may very well be importable in that code, but the interpreter of the daemon worker that will actually pick it up, may have a different environment where that function is not importable. It is practically impossible to dynamically inspect the environment of a Python interpreter on a different system process to check whether a resource is importable. The problem then is that the exception will appear in the daemon worker as soon as it picks up the process and attempts to start running it. Again, the same problem applies to WorkChains and users have faced this often in the past, and we have put quite some effort into the exception handling code to make sure users understand the problem. The exception will end up in the daemon log of course.

Copy link
Contributor

@sphuber sphuber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @unkcpz . I think the code is fine, straightforward enough. We should just add tests as I described in a code comment

Comment on lines 146 to 148
inputs = {'term_a': orm.Int(2), 'term_b': orm.Int(4)}
node = launch.submit(add, **inputs)
self.assertIsInstance(node, orm.CalcFunctionNode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This merely tests that the submit method does not except and returns CalcFunctionNode, which is a good start, but we should also test that the process can actually be executed by a daemon runner. You can try to add this test here, but you will need to fake the daemon runner of course. You can try and take inspiration of the tests/cmdline/commands/test_process.py tests, or you can add submitting a process function to the .ci/test_daemon.py script. This will also run in the CI but with a normal daemon running, making it easier to test this kind of stuff. You should test submitting a process function that is defined normally and is importable through the PYTHONPATH as well as through an entry point. For the latter you can use aiida.workflows:arithmetic.add_multiply which comes with aiida-core so you don't have to create one especially for the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I add tests in .ci/test_daemon.py. That's true these 'real' tests bring out a subtle issue, that recreate_from method should be given to the process function.

@@ -96,7 +96,7 @@ def submit(process, **inputs):
:return: the calculation node of the process
:rtype: :class:`aiida.orm.ProcessNode`
"""
assert not is_process_function(process), 'Cannot submit a process function'
# assert not is_process_function(process), 'Cannot submit a process function'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# assert not is_process_function(process), 'Cannot submit a process function'

@greschd
Copy link
Member

greschd commented Nov 3, 2020

Existing code should not be affected, because currently they can only be run directly in the local interpreter. Since they are defined there, they are importable by definition. Or am I missing something?

It needs to be defined in the interpreter that launches it, not necessarily the local one. That is, it can be defined inside a daemon worker; doesn't mean it is necessarily importable by a different daemon worker.

Totally un-advisable example (you can submit(SomeWorkChain) without issue):

from aiida import orm
from aiida.engine import WorkChain, run_get_node, utils, workfunction


class RunOrSubmitWorkChain(WorkChain):
    """
    Adds a 'run_or_submit' method to the WorkChain class, which uses
    'run' for process functions and 'submit' else.
    """
    def run_or_submit(self, proc, **kwargs):
        if utils.is_process_function(proc):
            _, node = run_get_node(proc, **kwargs)
            return node
        return self.submit(proc, **kwargs)


class Sub1(WorkChain):
    @classmethod
    def define(cls, spec):
        super().define(spec)
        spec.outline(cls.do_stuff)

    def do_stuff(self):
        self.report('Sub1 doing stuff.')


class SomeWorkChain(RunOrSubmitWorkChain):
    @classmethod
    def define(cls, spec):
        super().define(spec)
        spec.outline(cls.do_stuff)

    def do_stuff(self):
        @workfunction
        def sub_2():
            # Capturing 'self' from the 'SomeWorkChain' for added "fun".
            self.report("sub 2 doing stuff.")

        self.to_context(sub_1=self.run_or_submit(Sub1),
                        sub_2=self.run_or_submit(sub_2))

To be clear: If it isn't possible to construct a utils.is_submittable this PR is still an improvement (the run_or_submit code would continue to work), but it would be nice being able to distinguish them.

@sphuber
Copy link
Contributor

sphuber commented Nov 3, 2020

I don't think it is possible to create the utils.is_submittable function as I described in a comment above. It simply is impossible to reliably determine whether a resource will be importable in a Python interpreter in another system process. Let alone one that might not even be running (you might want to submit a process function without the daemon even running). I don't see the problem of that though, because we have always had the exact same problem with all other process types. So nothing new on the horizon

@greschd
Copy link
Member

greschd commented Nov 3, 2020

It simply is impossible to reliably determine whether a resource will be importable in a Python interpreter in another system process.

Yeah, agreed - whether the current interpreter can import it is a good indicator, but not a reliable test. It would work in all the examples given here, but fail as soon as sys.path is different for the daemon. I think I'd probably be willing to put that into my plugin code, but it shouldn't go into aiida-core as it'll be a hack at best.

Alright now, I'll stop derailing this discussion with my obscure use cases 😅

@unkcpz unkcpz force-pushed the feature/2965 branch 3 times, most recently from 7ce4a6b to a8b50a4 Compare November 4, 2020 12:27
@codecov
Copy link

codecov bot commented Nov 4, 2020

Codecov Report

Merging #4539 (61a92b0) into develop (e5c2d0e) will increase coverage by 0.01%.
The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff             @@
##           develop    #4539      +/-   ##
===========================================
+ Coverage    79.49%   79.50%   +0.01%     
===========================================
  Files          482      482              
  Lines        35323    35326       +3     
===========================================
+ Hits         28078    28081       +3     
  Misses        7245     7245              
Flag Coverage Δ
django 73.67% <100.00%> (+0.02%) ⬆️
sqlalchemy 72.84% <100.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
aiida/engine/launch.py 97.44% <100.00%> (-0.06%) ⬇️
aiida/engine/processes/functions.py 93.04% <100.00%> (+0.09%) ⬆️
aiida/engine/utils.py 88.33% <100.00%> (+0.18%) ⬆️
aiida/engine/daemon/runner.py 79.32% <0.00%> (-3.44%) ⬇️
aiida/engine/daemon/client.py 72.42% <0.00%> (-1.14%) ⬇️
aiida/manage/external/rmq.py 71.92% <0.00%> (+3.38%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e5c2d0e...61a92b0. Read the comment docs.

@unkcpz
Copy link
Member Author

unkcpz commented Nov 4, 2020

@sphuber I add the tests and remove the 'process function can not be submitted' warning from the docs.
I have a weak impression that there is an instruction in documentation (or maybe in one of the tutorial) says WorkChains needed to be implemented in a package or importable in the PYTHONPATH. If so, the same feature of process function should also be mentioned there.

@sphuber
Copy link
Contributor

sphuber commented Nov 4, 2020

Yes, the exact same criterion applies to process functions as for WorkChain or CalcJob classes after this PR. The function needs to be importable, meaning that it either a) has an associated entry point or b) its module path is included in the PYTHONPATH that the daemon workers will have. Please also add this to the documentation on process functions, where it now says that they are submittable

@greschd
Copy link
Member

greschd commented Nov 4, 2020

a) has an associated entry point or b) its module path is included in the PYTHONPATH that the daemon workers will have

Even if it has an entry point, its module still needs to be on the PYTHONPATH, right?

@sphuber
Copy link
Contributor

sphuber commented Nov 4, 2020

Even if it has an entry point, its module still needs to be on the PYTHONPATH, right?

I don't think so. As long as there is an entry point that is properly loaded in the reentry cache, then it is loadable by the daemon worker, even if that module is not in the PYTHONPATH. Not a 100% sure but we could easily test this.

@greschd
Copy link
Member

greschd commented Nov 4, 2020

I think it needs to end up on sys.path, whether through a "regular" install or via PYTHONPATH environment variable; Python cannot import modules that aren't on sys.path.

Whether there is an entry point is a somewhat orthogonal question: If there is one, the aiida.plugins loaders can be used with the entry point names.

(I did a tiny bit of testing, but feel free to double-check this)

Copy link
Contributor

@sphuber sphuber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @unkcpz a few minor changes requested

try:
actual_result = calc.outputs.result
except exceptions.NotExistent:
print(f'Could not retrieve `output_parameters` node for Calculation<{pk}>')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print(f'Could not retrieve `output_parameters` node for Calculation<{pk}>')
print(f'Could not retrieve `result` output for process<{pk}>')

Comment on lines 414 to 425
print(f'Submitting {NUMBER_CALCULATIONS} calcfunction to the daemon')
for counter in range(1, NUMBER_CALCULATIONS + 1):
inputval = counter
proc, expected_result = launch_calcfunction(counter=counter, inputval=inputval)
expected_results_calcfunctions[proc.pk] = expected_result

# Submitting the workfunction through the launchers
print(f'Submitting {NUMBER_CALCULATIONS} workfunction to the daemon')
for counter in range(1, NUMBER_CALCULATIONS + 1):
inputval = counter
proc, expected_result = launch_workfunction(counter=counter, inputval=inputval)
expected_results_workfunctions[proc.pk] = expected_result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think launching just one of each here is fine. Don't think running multiple process functions tests anything of additional value

@@ -142,6 +142,7 @@ When a process is 'submitted', an instance of the ``Process`` is created, along
This is called a 'process checkpoint', more information on which :ref:`will follow later<topics:processes:concepts:checkpoints>`.
Subsequently, the process instance is shut down and a 'continuation task' is sent to the process queue of RabbitMQ.
This task is simply a small message that just contains an identifier for the process.
In order to reconstruct the process from 'process checkpoint', the process needs to be importable in the daemon environment by a) giving it an associated entry point or b) including its module path in the ``PYTHONPATH`` that the daemon workers will have.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In order to reconstruct the process from 'process checkpoint', the process needs to be importable in the daemon environment by a) giving it an associated entry point or b) including its module path in the ``PYTHONPATH`` that the daemon workers will have.
In order to reconstruct the process from a `checkpoint`, the process needs to be importable in the daemon environment by a) giving it an :ref:`associated entry point<how-to:plugin-codes:entry-points>` or b) :ref:`including its module path<how-to:faq:process-not-importable-daemon>` in the ``PYTHONPATH`` that the daemon workers will have.

You will have to add the line

.. _how-to:faq:process-not-importable-daemon:`

to the file docs/source/howto/faq.rst on line 65 for the second link

@@ -330,7 +330,7 @@ For example, when we want to run an instance of the :py:class:`~aiida.calculatio
The function will submit the calculation to the daemon and immediately return control to the interpreter, returning the node that is used to represent the process in the provenance graph.

.. warning::
Process functions, i.e. python functions decorated with the ``calcfunction`` or ``workfunction`` decorators, **cannot be submitted** but can only be run.
To submit process, the process needs to be importable, meaning that it either a) has an associated entry point or b) its module path is included in the ``PYTHONPATH`` that the daemon workers will have.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To submit process, the process needs to be importable, meaning that it either a) has an associated entry point or b) its module path is included in the ``PYTHONPATH`` that the daemon workers will have.
For a process to be submittable, the class or function needs to be importable in the daemon environment by a) giving it an :ref:`associated entry point<how-to:plugin-codes:entry-points>` or b) :ref:`including its module path<how-to:faq:process-not-importable-daemon>` in the ``PYTHONPATH`` that the daemon workers will have.

@@ -141,6 +141,24 @@ def test_submit_store_provenance_false(self):
with self.assertRaises(exceptions.InvalidOperation):
launch.submit(AddWorkChain, term_a=self.term_a, term_b=self.term_b, metadata={'store_provenance': False})

def test_submit_process_function(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this was supposed to test submitting a process function, but it not actually does (and this is done isntead in test_daemon.py, this entire test can be removed.

Comment on lines 353 to 354
# process function can be submitted but the process should be importable
# in python interpreter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# process function can be submitted but the process should be importable
# in python interpreter
# Process function can be submitted and will be run by a daemon worker as long as the function is importable
# Note that the actual running is not tested here but is done so in `.ci/test_daemon.py`.

submit(self.function_return_true)
# process function can be submitted but the process should be importable
# in python interpreter
from aiida.workflows.arithmetic.add_multiply import add_multiply
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move import to top of file

# process function can be submitted but the process should be importable
# in python interpreter
from aiida.workflows.arithmetic.add_multiply import add_multiply
submit(add_multiply, x=orm.Int(1), y=orm.Int(2), z=orm.Int(3))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
submit(add_multiply, x=orm.Int(1), y=orm.Int(2), z=orm.Int(3))
node = submit(add_multiply, x=orm.Int(1), y=orm.Int(2), z=orm.Int(3))
assert isinstance(node, WorkFunctionNode)

@unkcpz unkcpz force-pushed the feature/2965 branch 2 times, most recently from 6e1b645 to 6673cbf Compare November 7, 2020 04:01
@unkcpz unkcpz requested a review from sphuber November 7, 2020 04:03
Copy link
Contributor

@sphuber sphuber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid I still need one more change, you put the anchor for the docs on the wrong line. Once you correct that I will merge this.

@@ -77,3 +77,5 @@ Run the command ``verdi daemon logshow`` in a separate terminal to see the loggi
If the root cause is indeed due to an import problem, it will probably appear as an ``ImportError`` exception in the daemon log.
To solve these issues, make sure that all the Python code that is being run is properly importable, which means that it is part of the `PYTHONPATH <https://docs.python.org/3/using/cmdline.html#envvar-PYTHONPATH>`_.
Make sure that the PYTHONPATH is correctly defined automatically when starting your shell, so for example if you are using bash, add it to your ``.bashrc``.

.. _how-to:faq:process-not-importable-daemon:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This anchor should be on line 65, just before the header "Why would...."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed. No much experience about rst ;-p

Fix aiidateam#2965, let the process function can be submit to the runner, means shutdown the daemon will not fail the function process (`calcfunction` or the `workfunction`). So they can restart when the daemon is restart. Since the process function still can only running on the local machine, it blocking the daemon even we 'submit' it to the daemon.
@sphuber sphuber changed the title WIP: make process function submittable Make process functions submittable Nov 8, 2020
@sphuber sphuber merged commit ac4c881 into aiidateam:develop Nov 8, 2020
@sphuber
Copy link
Contributor

sphuber commented Nov 8, 2020

Thanks a lot @unkcpz !

@unkcpz unkcpz deleted the feature/2965 branch November 9, 2020 01:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants