Skip to content

Commit

Permalink
Issue 60: leaving state vector empty (#133)
Browse files Browse the repository at this point in the history
* handle invalid data exceptions

dawgie supports two types of exceptions to be thrown: dawgie.NoValidInputDataError and dawgie.NoValidOutputDataError. In these two cases, the job should be considered a success but not move the target forward in the processing tree. This creates a try state condition which will be True (full success), None (invalid data exception), False (all other exceptions).

* handle the tri-state

Add an invalid state to the messaging that is passed about. Updated dawgie.pl.farm to translate the tri-state message from teh workers to the dawgie.pl.jobinfo.State.{failure,invalid,success} as required.

* connect the error step

Added dawgie.pl.schedule.purge() with empty implementation. Walking down the tree and removing targets is going to take some work. Just put the empty implementation to connect the dawgie.pl.farm.

Connect dawgie.pl.farm._res() to dawgie.pl.schedule.purge(). The current behavior is the same.

* purge target from possible work

Simply walk down the children nodes and remove target from do, doing, and todo. While this straight forward, it is not perfect. If another job is in progress that does create data for this target, they will once again cause the target show up in the faulting nodes children if the tree is such. Still, it is the best and the pipeline is robust enough to continue processing depite the error.

* verify purge

Actually test that the purge works as expected. Created a simplified tree and opped one of the tarets as expected. Note that this example also shows that if node a then trips after b as the test inidicates, d will fire off then fail preventing f from being triggered.

* fix problems found in testing

The dawgie.pl.farm.Hand._res() implementation was less than perfect. First, did a small amount of refactoring to make the function static as it really is. Then, only computes some values once rather than serveral times in the function. Most important, the test of which branch to take required updating to verifiy against dawgie.pl.jobinfo.State.success rather than just that is positive looking.

* verify purge via message return

Given the response is normally passed from the worker back, the handler of these responses - dawgie.pl.farm.Hand._res() - has to correctly call purge. Added a test to show that is is actually happening as expected.

Co-authored-by: Al Niessner <Al.Niessner@xxx.xxx>
  • Loading branch information
al-niessner and Al Niessner authored Apr 13, 2020
1 parent 186d6c7 commit 77be1db
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 25 deletions.
56 changes: 31 additions & 25 deletions Python/dawgie/pl/farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@
archive = False

class Hand(twisted.internet.protocol.Protocol):
@staticmethod
def _res (msg):
done = (msg.jobid + '[' +
(msg.incarnation if msg.incarnation else '__all__') + ']')
while 0 < _busy.count (done):
_busy.remove (done)
if done in _time: del _time[done]
pass
print ('msg:', msg)
print ('suc:', Hand._translate (msg.success))
try:
job = dawgie.pl.schedule.find (msg.jobid)
inc = msg.incarnation if msg.incarnation else '__all__'
state = Hand._translate (msg.success)
dawgie.pl.schedule.complete (job, msg.runid, inc, msg.timing, state)

if state == dawgie.pl.schedule.State.success:
dawgie.pl.farm.archive |= any(msg.values)
dawgie.pl.schedule.update (msg.values, job, msg.runid)
else: dawgie.pl.schedule.purge (job, inc)

except IndexError: log.error('Could not find job with ID: ' + msg.jobid)
return

@staticmethod
def _translate (state):
if state is None: return dawgie.pl.schedule.State.invalid
if state: return dawgie.pl.schedule.State.success
return dawgie.pl.schedule.State.failure

def __init__ (self, address):
twisted.internet.protocol.Protocol.__init__(self)
self._abort = dawgie.pl.message.make(typ=dawgie.pl.message.Type.response, suc=False)
Expand All @@ -69,7 +99,7 @@ def __init__ (self, address):

def _process (self, msg):
if msg.type == dawgie.pl.message.Type.register: self._reg(msg)
elif msg.type == dawgie.pl.message.Type.response: self._res(msg)
elif msg.type == dawgie.pl.message.Type.response: Hand._res(msg)
elif msg.type == dawgie.pl.message.Type.status:
if msg.revision != dawgie.context.git_rev or \
not dawgie.context.fsm.is_pipeline_active():
Expand Down Expand Up @@ -97,30 +127,6 @@ def _reg (self, msg):
pass
return

def _res (self, msg):
# pylint: disable=no-self-use
done = (msg.jobid + '[' +
(msg.incarnation if msg.incarnation else '__all__') + ']')
while 0 < _busy.count (done):
_busy.remove (done)
if done in _time: del _time[done]
pass
try:
job = dawgie.pl.schedule.find (msg.jobid)
dawgie.pl.schedule.complete (job,
msg.runid,
msg.incarnation if msg.incarnation else '__all__',
msg.timing,
(dawgie.pl.schedule.State.success
if msg.success else dawgie.pl.schedule.State.failure))

if msg.success:
dawgie.pl.farm.archive |= any(msg.values)
dawgie.pl.schedule.update (msg.values, job, msg.runid)
pass
except IndexError: log.error('Could not find job with ID: ' + msg.jobid)
return

# pylint: disable=signature-differs
def connectionLost (self, reason):
while 0 < _workers.count (self): _workers.remove (self)
Expand Down
1 change: 1 addition & 0 deletions Python/dawgie/pl/jobinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class State(enum.Enum):
delayed = 0 # delayed until a certain time (periodic task)
failure = 1 # task ran but it had an exception causing the pipeline to fail
initial = 5 # initial value
invalid = 6 # worker threw an NoValid{Input,Output}DataError
running = 2 # is currently running somewhere, somehow
success = 3 # task ran and was successful
waiting = 4 # waiting until there are no active parents
Expand Down
8 changes: 8 additions & 0 deletions Python/dawgie/pl/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ def periodics (factories):
defer()
return

def purge (node:dawgie.pl.dag.Node, target:str):
if target in node.get ('do',[]): node.get ('do').remove (target)
if target in node.get ('doing',[]): node.get ('doing').remove (target)
if target in node.get ('todo',[]): node.get ('todo').remove (target)

for child in node: purge (child, target)
return

def state_tree_view(): return dawgie.pl.schedule.ae.svv
def task_tree_view(): return dawgie.pl.schedule.ae.tv

Expand Down
9 changes: 9 additions & 0 deletions Python/dawgie/pl/worker/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def exchange (message): # AWS lambda function
return response

def execute (address:(str,int), inc:int, ps_hint:int, rev:str):
# pylint: disable=too-many-statements
log = None
iid,myid,job = sqs_pop()
try:
Expand Down Expand Up @@ -385,6 +386,14 @@ def execute (address:(str,int), inc:int, ps_hint:int, rev:str):
suc=True,
tim=job.timing,
val=nv)
except (dawgie.NoValidInputDataError,dawgie.NoValidOutputDataError):
logging.getLogger(__name__).exception ('Job "%s" had invalid data for run id %s and target "%s"', str(m.jobid), str(m.runid), str(m.target))
m = dawgie.pl.message.make (typ=dawgie.pl.message.Type.response,
inc=m.target,
jid=m.jobid,
rid=m.runid,
suc=None,
tim=m.timing)
except:
logging.getLogger(__name__).exception ('Job "%s" failed to execute successfully for run id %s and target "%s"', str (job.jobid), str (job.runid), str (job.target))
m = dawgie.pl.message.make (typ=dawgie.pl.message.Type.response,
Expand Down
8 changes: 8 additions & 0 deletions Python/dawgie/pl/worker/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ def execute (address:(str,int), inc:int, ps_hint:int, rev:str):
suc=True,
tim=m.timing,
val=nv)
except (dawgie.NoValidInputDataError, dawgie.NoValidOutputDataError):
logging.getLogger(__name__).exception ('Job "%s" had invalid data for run id %s and target "%s"', str(m.jobid), str(m.runid), str(m.target))
m = dawgie.pl.message.make (typ=dawgie.pl.message.Type.response,
inc=m.target,
jid=m.jobid,
rid=m.runid,
suc=None,
tim=m.timing)
except:
logging.getLogger(__name__).exception ('Job "%s" failed to execute successfully for run id %s and target "%s"', str (m.jobid), str (m.runid), str (m.target))
m = dawgie.pl.message.make (typ=dawgie.pl.message.Type.response,
Expand Down
30 changes: 30 additions & 0 deletions Test/test_06.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import datetime
import dawgie.context
import dawgie.pl.dag
import dawgie.pl.jobinfo
import dawgie.pl.schedule
import os
Expand Down Expand Up @@ -206,6 +207,35 @@ def test_organize(self):
dawgie.pl.schedule.que.clear()
return

def test_purge(self):
a = dawgie.pl.dag.Node('a')
b = dawgie.pl.dag.Node('b')
c = dawgie.pl.dag.Node('c')
d = dawgie.pl.dag.Node('d')
e = dawgie.pl.dag.Node('e')
f = dawgie.pl.dag.Node('f')
for n in [a, b, c, d, e, f]:
for l in ['do', 'doing', 'todo']:
n.set (l, [])
for t in ['A', 'B', 'C']: n.get (l).append (t)
pass
pass
a.add (c)
a.add (d)
b.add (d)
b.add (e)
d.add (f)
dawgie.pl.schedule.purge (b, 'B')
for l in ['do', 'doing', 'todo']:
self.assertEqual (['A', 'B', 'C'], a.get (l))
self.assertEqual (['A', 'C'], b.get (l))
self.assertEqual (['A', 'B', 'C'], c.get (l))
self.assertEqual (['A', 'C'], d.get (l))
self.assertEqual (['A', 'C'], e.get (l))
self.assertEqual (['A', 'C'], f.get (l))
pass
return

def test_tasks(self):
tasks = dawgie.pl.schedule.tasks()
self.assertEqual (12, len (tasks))
Expand Down
36 changes: 36 additions & 0 deletions Test/test_10.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import dawgie.pl.dag
import dawgie.pl.farm
import dawgie.pl.message
import dawgie.pl.schedule
import unittest

class Farm(unittest.TestCase):
Expand All @@ -62,6 +63,41 @@ def test_hand__process(self):
self.assertEqual (['WARNING:dawgie.pl.farm:Worker and pipeline revisions are not the same. Sever version 123 and worker version 321.'], logbook.output)
return

def test_hand__res(self):
a = dawgie.pl.dag.Node('a')
b = dawgie.pl.dag.Node('b')
c = dawgie.pl.dag.Node('c')
d = dawgie.pl.dag.Node('d')
e = dawgie.pl.dag.Node('e')
f = dawgie.pl.dag.Node('f')
for n in [a, b, c, d, e, f]:
for l in ['do', 'doing', 'todo']:
n.set (l, [])
for t in ['A', 'B', 'C']: n.get (l).append (t)
pass
pass
a.add (c)
a.add (d)
b.add (d)
b.add (e)
d.add (f)
dawgie.pl.schedule.que.extend ([a,b,c,d,e,f])
dawgie.pl.farm.Hand._res (dawgie.pl.message.make(inc='B',
jid='b',
rid=42,
suc=None,
tim={},
val=[]))
for l in ['do', 'doing', 'todo']:
self.assertEqual (['A', 'B', 'C'], a.get (l))
self.assertEqual (['A', 'C'], b.get (l))
self.assertEqual (['A', 'B', 'C'], c.get (l))
self.assertEqual (['A', 'C'], d.get (l))
self.assertEqual (['A', 'C'], e.get (l))
self.assertEqual (['A', 'C'], f.get (l))
pass
return

def test_rerunid (self):
n = dawgie.pl.dag.Node('a')
r = dawgie.pl.farm.rerunid (n)
Expand Down

0 comments on commit 77be1db

Please sign in to comment.