Skip to content

[RFC] Transformation Features

Romain Dorgueil edited this page Jan 7, 2018 · 6 revisions
Subject: Transformation features
Authors: Romain Dorgueil
Created: Oct 3, 2017
Modified: Oct 8, 2017
Target: 0.6? 0.7?
Status: DRAFT

This is a DRAFT

Behaviors

There are common behaviours that can be reused on any node. There should be an API for that.

Examples:

  • Retry (if failure, retry n times before giving up)
  • Timeout (if more than x milliseconds, give up)
  • Parallel (we don't care about FIFO, run n in parallel, FOFO)
  • Map (map an iterable to the same transfomration, for example Map(HttpRead(), urls), eventually parallel (we need to address how it combines with other features).
  • Validate{Input,Output} (firewall/safety)
  • Trace (inject debug/trace infos in output)

Special Bags

  • Loopback (inject this result into myself, recursion-like)

Parallel

Bonobo is running all nodes in parallel using the current execution strategy, which means that no node will block another node, but for one given node, it uses a first-in-first-out, one at a time, execution strategy.

This is often the desired behaviour, but for a few cases, this is not necessary and even not wanted.

Example cases includes everything where the FIFO property is not required per-se, and where the amount of time necessary to process something is quite dependant on a non-local resource (http request, flakky webservice, humans, ...).

Proposed implementation

I propose we use coroutines and event loops for this feature.

For the user, it could make the code look like that (pseudocode):

@use('http')
@set_max_concurrency(4)
async def query_user_api(http, uid):
    yield await http.get(URL + '?uid=' + str(uid))

def handle_response(resp):
    return json_from_response(resp)

def get_graph():
    return bonobo.Graph(
        query_user_api,
        handle_response
    )

if __name__ == '__main__':
    bonobo.run(get_graph())

On bonobo side, that would mean that the node execution context would handle coroutines differently. Each coroutine-based node execution context would build an event loop (not a global one, one which is local to the node execution context) and execute the underlying node implementation (aka the coroutine user provided) using futures, within the event loop.

The result would be put on the output queue "FOFO" (first-out, first-out), once out of asyncio world.

Generalization

Once this exist, it's also possible to generalize this using a concurent.futures executor and a Parallel decorator.

User would be able to make non-FIFO anything using the following syntax:

@use('http')
def query_user_api_sync(http, uid):
    yield http.get(URL + '?uid=' + str(uid))

query_user_api_async = Parallel(query_user_api_sync, max_concurrency=4)

Parallel decorator would wrap the inner execution in a thread or any parallel execution mechanism (which one and how it is chosen, along with who's responsible to provide it and whether or not it should be the same as the one used by the current execution strategy is yet to be determined) and return a coroutine-based version of the same code, so the previous described mechanism can take control and use the event loop like with an user defined coroutine.

Compatibility

One problem is that asynchronous generators (https://www.python.org/dev/peps/pep-0525/) exists starting with python 3.6, which means python 3.5 users need to have a fallback solution.