Skip to content

Commit

Permalink
Merge pull request #38 from radiantone/0.2.1
Browse files Browse the repository at this point in the history
0.2.1
  • Loading branch information
radiantone authored Jun 15, 2021
2 parents dd1fe9b + 1c21480 commit bcf4352
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 73 deletions.
18 changes: 0 additions & 18 deletions DEVELOPMENT

This file was deleted.

91 changes: 84 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
*This version: 0.2.0*
*This version: 0.2.1*

![logo](./images/logo.png)

*Current development version is here: [0.2.1](https://github.com/radiantone/entangle/tree/0.2.1)*
*Current development version is here: [0.2.2](https://github.com/radiantone/entangle/tree/0.2.2)*

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both *control flow* and *dataflow* execution paradigms as well as de-centralized CPU & GPU scheduling.

> For a quick look at what makes Entangle special, take a look at [Design Goals](#design-goals).
## New In This Release

- Bug fixes to process.py, ssh.py
- Distributed dataflow example
- Dataflow decorator re-write. Now works with ssh for distributed dataflow. Fixes prior issues with local dataflows.
- Retry usage example
- Dockerfile provided for quick and easy experimentation.
Expand Down Expand Up @@ -82,6 +84,7 @@ root@13428af4a37b:/# python -m entangle.examples.example3
* [AI Example](#ai-example)
* [Dataflow Examples](#dataflow-examples)
* [Data-Driven Branching](#data-driven-branching)
* [Distributed Dataflow](#distributed-dataflow)
* [Docker Example](#docker-example)
* [Scheduler Example](#scheduler-example)
* [Graph Example](#graph-example)
Expand Down Expand Up @@ -192,7 +195,7 @@ If you are planning to run or use GPU enabled code it is recommended to set up a
* Serverless & Threadless
* True Dataflow Support
* CPU/GPU Scheduling
* Distributed dataflow
* Distributed Dataflow

## Architecture

Expand All @@ -211,7 +214,7 @@ This makes the workflow a truly emergent, dynamic computing construct vs a monol
### Tradeoffs

Every design approach is a balance of tradeoffs. Entangle favors CPU utilization and *true* parallelism over resource managers, centralized (which is to say network centric) schedulers or other shared services.
It favors simplicity over behavior, attempting to be minimal and un-opinionated. It tries to be *invisible* to the end user as much as possible. It strives for the basic principle that, *"if it looks like it should work, it should work."*
It favors simplicity over behavior - leaving specific extensions to you, attempting to be minimal and un-opinionated. It tries to be *invisible* to the end user as much as possible. It strives for the basic principle that, *"if it looks like it should work, it should work."*

Entangle leans on the OS scheduler to prioritize processes based on the behavior of those processes and underlying resource utilizations. It therefore does not provide its own redundant (which is to say *centralized*) scheduler or task manager. Because of this, top-down visibility or control of workflow processes is not as easy as with centralized task managers.

Expand Down Expand Up @@ -685,7 +688,7 @@ If you use `@scheduler` then it will utilize the *scheduler queue* to request CP
*diagram here*

Each time a workflow decorated with `@scheduler` is sent to a remote machine, that scheduler then manages its portion of the workflow and any dependent functions that it might resolve.
This pattern forms a sort of *distributed hierarchy* of schedulers that work in parallel across multiple machines, yet fully resolve to complete the root workflow.
This pattern forms a sort of *distributed tree* of schedulers that work in parallel across multiple machines, yet fully resolve to complete the root workflow.

Let's take a closer look at this example, which uses 3 different machines to solve its workflow.

Expand Down Expand Up @@ -742,6 +745,7 @@ Since the `add()` function has two dependencies that can run in parallel the `@s
* [Docker Example](#docker-example)
* [Dataflow Examples](#dataflow-examples)
* [Data-Driven Branching](#data-driven-branching)
* [Distributed Dataflow](#distributed-dataflow)
* [Scheduler Example](#scheduler-example)
* [Graph Example](#graph-example)
* [Workflow Future Example](#workflow-future-example)
Expand Down Expand Up @@ -769,6 +773,7 @@ $ python -m entangle.examples.aiexample
$ python -m entangle.examples.retry_example
$ python -m entangle.examples.schedulerexample
$ python -m entangle.examples.schedulerexample2
$ python -m entangle.examples.sshdatafloweexample
$ python -m entangle.examples.sshschedulerexample
$ python -m entangle.examples.timeoutexample
```
Expand Down Expand Up @@ -1160,6 +1165,78 @@ triggered: inner Z: X: emit
printy: MainThread
triggered: inner Y: HELLO
```
### Distributed Dataflow

In the example below, we combine `@dataflow` with `@ssh` to get instant distributed dataflow!

```python
import threading
import time

from entangle.logging.debug import logging
from entangle.ssh import ssh
from entangle.process import process
from entangle.dataflow import dataflow

def triggered(func, result):
print("triggered: {} {}".format(func.__name__, result))

@dataflow(callback=triggered)
@ssh(user='darren', host='miko', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@process
def printz(z):
print('printz: {}'.format(threading.current_thread().name))
with open('/tmp/printz.out', 'w') as pr:
pr.write("Z: {}".format(z))
return "Z: {}".format(z)

@dataflow(callback=triggered)
@ssh(user='darren', host='radiant', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@process
def printx(x):
print('printx: {}'.format(threading.current_thread().name))
with open('/tmp/printx.out', 'w') as pr:
pr.write("X: {}".format(x))
return "X: {}".format(x)

@dataflow(callback=triggered)
@process
def printy(y):
print('printy: {}'.format(threading.current_thread().name))
return "Y: {}".format(y)

@dataflow(callback=triggered)
@ssh(user='darren', host='radiant', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@process
def echo(e):
print('echo: {}'.format(threading.current_thread().name))
with open('/tmp/echo.out', 'w') as pr:
pr.write("Echo! {}".format(e))
return "Echo! {}".format(e)

@dataflow(callback=triggered, maxworkers=3)
def emit(value):
print('emit: {}'.format(threading.current_thread().name))
return value+"!"

if __name__ == '__main__':
results = []

# Create the dataflow graph
flow = emit(
printx(
printz(
echo()
)
),
printy(
printz()
),
printy()
)

result = flow('emit')
```
### Scheduler Example

```python
Expand Down Expand Up @@ -1454,7 +1531,7 @@ logging.basicConfig(filename='entangle.log',
You can of course provide your own logging configuration, but be sure to include it at the top of your file so the various entangle modules pick it up.
## Design Tool

A prototype visual design tool for Entangle is shown below.
A prototype visual design tool for Entangle is shown below. More details will be posted on thye wiki [here](https://github.com/radiantone/entangle/wiki/Design-Tool).


![ui](./images/ui1.png)
![ui](./images/ui2.png)
2 changes: 1 addition & 1 deletion entangle/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

__title__ = 'py-entangle'
__description__ = 'A python native parallel processing framework based on simple decorators.'
__version__ = '0.2.0'
__version__ = '0.2.1'
__author__ = 'Darren Govoni'
__author_email__ = 'darren@ontrenet.com'
__license__ = 'MIT'
Expand Down
4 changes: 3 additions & 1 deletion entangle/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ def __call__(self, *args, **kwargs):
if callable(_arg):
_rr = _arg(result, **kwargs)
logging.debug("_rr is %s", _rr)
except:
except Exception as ex:
import traceback
logging.error(traceback.format_exc())
pass
#PROCESSPOOL.submit(_arg, result)

Expand Down
1 change: 0 additions & 1 deletion entangle/examples/sshschedulerexample.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def two():
return 2


@ssh(user='darren', host='phoenix', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/miniconda3/bin/python')
@scheduler(**scheduler_config)
@process
def three():
Expand Down
8 changes: 5 additions & 3 deletions entangle/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,11 @@ def error_wrapper(arg, **kwargs):
return

try:
args = [_arg['result'] for _arg in _args]
arg_graph = [_arg['graph'] for _arg in _args]
exceptions = [_arg['result'] for _arg in _args if not isinstance(_arg,Exception)]
args = [_arg['result'] for _arg in _args if _arg]
arg_graph = [_arg['graph'] for _arg in _args if _arg]
json_graphs = [_arg['json']
for _arg in _args if 'json' in _arg]
for _arg in _args if _arg and 'json' in _arg]
except:
import traceback
logging.debug("_ARGS: %s", _args)
Expand Down Expand Up @@ -720,6 +721,7 @@ def func_wrapper(_wf, _wq):
future_queue.put(result)
logging.debug("func_wrapper: done putting queue")
except Exception:
import traceback
with open('error2.out', 'a') as errfile:
errfile.write(traceback.format_exc())

Expand Down
79 changes: 38 additions & 41 deletions entangle/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,48 +82,14 @@ def find_func(pfunc):
return find_func(pfunc.func)
return pfunc

def remove_ssh_decorator(fsource, username, hostname):
"""
Description
:param fsource:
:param username:
:param hostname:
:return:
"""
_ast = ast.parse(fsource)

ssh_decorators = []

for segment in _ast.body:
if isinstance(segment, FunctionDef):
decorators = segment.decorator_list
for decorator in decorators:
if hasattr(decorator, 'func') and decorator.func.id == 'ssh':
user_keyword = None
host_keyword = None
for keyword in decorator.keywords:
if keyword.arg == 'user' and keyword.value.value == username:
user_keyword = keyword
if keyword.arg == 'host' and keyword.value.value == hostname:
host_keyword = keyword
logging.debug("REMOVE SSH DECORATOR:")

if user_keyword and host_keyword:
ssh_decorators += [decorator]

for ssh_decorator in ssh_decorators:
logging.debug("SSH: Removing decorator: %s", ssh_decorator)
decorators.remove(ssh_decorator)

return astunparse.unparse(_ast)

if 'SOURCE' in os.environ:
sourcefile = os.environ['SOURCE']
else:
sourcefile = sys.argv[0]

logging.debug("ssh: SOURCE:%s", sourcefile)
with open(sourcefile) as source:
logging.debug("SOURCE:%s", source.read())
logging.debug("ssh: SOURCE:%s", source.read())

logging.debug("SSH: user:%s host:%s key: %s",
kwargs['user'], kwargs['host'], kwargs['key'])
Expand Down Expand Up @@ -165,15 +131,44 @@ def setup_virtualenv(host, user, key, env):

with open(sourcefile) as source:
_source = source.read()
_source = re.sub(r"@ssh\(user='{}', host='{}'".format(username,
hostname), "#@ssh(user='{}', host='{}'".format(username,
hostname), _source).strip()
logging.debug("Parsing SOURCE")
_ast = ast.parse(_source)

funcdefs = [funcdef for funcdef in _ast.body if isinstance(
funcdef, ast.FunctionDef)]

_source = re.sub(r"@dataflow","#@dataflow", _source).strip()
logging.debug("Removing decorators from SOURCE")
for funcdef in funcdefs:
__funcname__ = funcdef.name
if __funcname__ == func.__name__:
decorators = funcdef.decorator_list
ssh_decorator = None

for decorator in decorators:
if hasattr(decorator, 'func') and decorator.func.id == 'ssh':
logging.debug("REMOVE SSH DECORATOR:")
ssh_decorator = decorator

if ssh_decorator:
decorators.remove(ssh_decorator)
'''
for funcdef in funcdefs:
__funcname__ = funcdef.name
if __funcname__ == func.__name__:
try:
funcdef.decorator_list.clear()
pass
except:
import traceback
logging.error(traceback.format_exc())
'''
logging.debug("UNparsing SOURCE")
_source = astunparse.unparse(_ast)
logging.debug("Attempting to write SOURCE")
with open('{}.py'.format(sourceuuid), 'w') as appsource:
appsource.write(_source)
logging.debug("Wrote SOURCE")

appuuid = "sshapp"+hashlib.md5(uuid4().bytes).hexdigest()

Expand Down Expand Up @@ -289,14 +284,16 @@ def ssh_function(remotefunc, username, hostname, sshkey, appuuid, sourceuuid, *a
ssh_p.userfunc = f_func.func
frame = sys._getframe(1)
if 'dataflow' in frame.f_locals:
logging.debug("DATAFLOW detected!")
result = ssh_p()
else:
logging.debug("DATAFLOW NOT detected!")
result = ProcessMonitor(ssh_p, timeout=None,
wait=None,
cache=False,
shared_memory=False,
sleep=0)

if callable(result):
_result = result()
else:
Expand Down
Binary file added images/ui2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def run(self):

class CleanCommand(Command):
"""Custom clean command to tidy up the project root."""
CLEAN_FILES = './build ./dist ./__pycache__ **/*/__pycache__ ./*.pyc ./ssh*py ./*.tgz ./entangle.log ./.pytest_cache ./*.egg-info'.split(' ')
CLEAN_FILES = './*.out ./*.log ./build ./dist ./__pycache__ **/*/__pycache__ ./*.pyc ./ssh*py ./*.tgz ./.pytest_cache ./*.egg-info'.split(' ')

user_options = []

Expand Down

0 comments on commit bcf4352

Please sign in to comment.