Skip to content

Commit

Permalink
Merge pull request #39 from radiantone/0.2.2
Browse files Browse the repository at this point in the history
0.2.2
  • Loading branch information
radiantone authored Jun 17, 2021
2 parents bcf4352 + de7eb1f commit 519a74d
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 25 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
*This version: 0.2.1*
*This version: 0.2.2*

![logo](./images/logo.png)

*Current development version is here: [0.2.2](https://github.com/radiantone/entangle/tree/0.2.2)*
*Current development version is here: [0.2.3](https://github.com/radiantone/entangle/tree/0.2.3)*

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both *control flow* and *dataflow* execution paradigms as well as de-centralized CPU & GPU scheduling.

Expand Down Expand Up @@ -789,7 +789,6 @@ from entangle.process import process
from timeit import default_timer as timer
from numba import vectorize


@process
def dovectors1():

Expand All @@ -807,7 +806,6 @@ def dovectors1():
duration = timer() - start
return duration


@process
def dovectors2():

Expand All @@ -825,15 +823,13 @@ def dovectors2():
duration = timer() - start
return duration


@process
def durations(*args):

times = [arg for arg in args]

return times


dp = durations(
dovectors1(),
dovectors2()
Expand Down
2 changes: 1 addition & 1 deletion entangle/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

__title__ = 'py-entangle'
__description__ = 'A python native parallel processing framework based on simple decorators.'
__version__ = '0.2.1'
__version__ = '0.2.2'
__author__ = 'Darren Govoni'
__author_email__ = 'darren@ontrenet.com'
__license__ = 'MIT'
Expand Down
3 changes: 3 additions & 0 deletions entangle/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
aws.py - Module that provides aws integration behavior
"""
3 changes: 3 additions & 0 deletions entangle/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
batch.py - Module that provides data batch processing behavior
"""
2 changes: 1 addition & 1 deletion entangle/containers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
docker.py - Module that provides docker support decorators for running tasks inside containers
containers.py - Module that provides support decorators for running tasks inside containers
"""
import logging
import inspect
Expand Down
2 changes: 1 addition & 1 deletion entangle/dataflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
workflow.py - Module that provides workflow decorator
dataflow.py - Module that dataflow execution semantics
"""
import logging
import inspect
Expand Down
2 changes: 1 addition & 1 deletion entangle/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
request.py - Module that provides http request tasks
http.py - Module that provides http oriented decorators
"""
from functools import partial
import requests
Expand Down
37 changes: 24 additions & 13 deletions entangle/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def __call__(self, *args, **kwargs) -> Callable:
:param kwargs:
:return:
"""

logging.info("Process:invoke: %s", self.func.__name__)
_func = self.func

Expand Down Expand Up @@ -224,7 +223,7 @@ def assign_cpu(func, cpu, **kwargs):
datetime.timedelta(seconds=end-start))
logging.debug(
"assign_cpu: DURATION: %s", duration)

if 'scheduler' in kwargs:
logging.debug(
"assign_cpu: Putting CPU %s on queue", cpu_mask)
Expand Down Expand Up @@ -459,11 +458,14 @@ def error_wrapper(arg, **kwargs):
return

try:
exceptions = [_arg['result'] for _arg in _args if not isinstance(_arg,Exception)]
args = [_arg['result'] for _arg in _args if _arg]
arg_graph = [_arg['graph'] for _arg in _args if _arg]
exceptions = [
_arg['result'] for _arg in _args if _arg and not isinstance(_arg, Exception)]
args = [_arg['result']
for _arg in _args if _arg and not isinstance(_arg, Exception)]
arg_graph = [
_arg['graph'] for _arg in _args if _arg and not isinstance(_arg, Exception)]
json_graphs = [_arg['json']
for _arg in _args if _arg and 'json' in _arg]
for _arg in _args if _arg and not isinstance(_arg, Exception) and 'json' in _arg]
except:
import traceback
logging.debug("_ARGS: %s", _args)
Expand Down Expand Up @@ -513,13 +515,13 @@ def add_to_graph(gr, argr):

# This no longer needed because processes put their own cpu's back on the queue
# when they have complete, individually

if scheduler:
for _process in processes:
logging.debug(
"Putting CPU1: %s back on scheduler queue.", _process.cookie)
scheduler.put(('0', _process.cookie, 'Y'))

if cpu:
pid = os.getpid()
cpu_mask = [int(cpu)]
Expand All @@ -535,6 +537,8 @@ def add_to_graph(gr, argr):
event = kwargs['event']
del kwargs['event']

retries = 0

if 'queue' in kwargs:
queue = kwargs['queue']
# get the queue and delete the argument
Expand All @@ -561,7 +565,6 @@ def add_to_graph(gr, argr):
try:
if self.execute:
logging.debug("process: execute: %s", self.execute)

if is_proc:

logging.debug(
Expand Down Expand Up @@ -607,7 +610,6 @@ def func_wrapper(_wf, _wq):
target=func_wrapper, args=(_pfunc, _mq, ))
proc.start()
else:
ex_msg = None
if retry and retry > 0:
for i in range(retry):
logging.debug("RETRY: {}".format(i))
Expand All @@ -624,6 +626,9 @@ def func_wrapper(_wf, _wq):

if i == retry-1:
event.set()
retries = i
logging.debug(
"max retries reached %s", retries)
error_messages += [
"maximum retries reached {}".format(retry)]
else:
Expand Down Expand Up @@ -693,11 +698,17 @@ def func_wrapper(_wf, _wq):

_mq = Queue()

def func_wrapper(_wf, _wq):
def func_wrapper(_wf, _wq, retries):
import time
import datetime
import traceback

logging.debug("TRACE: %s", traceback.format_exc())
start = time.time()
logging.debug("func_wrapper: %s", _wf)
logging.debug("func_wrapper: retries %s", retries)
if retries > 0:
raise Exception("Maximum retries reached %s", retries)
result = _wf()
logging.debug("func_wrapper: result: %s", result)
end = time.time()
Expand All @@ -710,7 +721,7 @@ def func_wrapper(_wf, _wq):
if callable(result):
logging.debug(
"func_wrapper: return result of %s", result)
return func_wrapper(result, _wq)
return func_wrapper(result, _wq, retries)

logging.debug("func_wrapper: putting result on queue")

Expand All @@ -737,7 +748,7 @@ def func_wrapper(_wf, _wq):
logging.debug("process: execute2: %s", self.execute)
start = time.time()
proc = multiprocessing.Process(
target=func_wrapper, args=(pfunc, _mq,))
target=func_wrapper, args=(pfunc, _mq, retries))
proc.start()

logging.debug(
Expand Down
2 changes: 1 addition & 1 deletion entangle/thread.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
thread.py - Module that provides native OS thread implementation of function tasks with support for shared memory
thread.py - Module that provides python thread implementation of function tasks with support for shared memory
"""
import asyncio
import logging
Expand Down
3 changes: 2 additions & 1 deletion entangle/workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
workflow.py - Module that provides workflow decorator
workflow.py - Module that provides workflow decorator. Right now this is simple a no-op decorator
but will provide some additional behavior for workflows eventually (e.g. metedata, metrics, QoS, etc)
"""
import logging

Expand Down

0 comments on commit 519a74d

Please sign in to comment.