Skip to content

Commit

Permalink
expanded workflow status/prune deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed May 13, 2021
1 parent 993c1c7 commit 418e2f0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
28 changes: 24 additions & 4 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class DataStoreMgr:
INIT_DATA_WAIT_TIME = 5. # seconds
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds
PENDING_DELTA_CHECK_INTERVAL = 0.5

def __init__(self, workflows_mgr):
self.workflows_mgr = workflows_mgr
Expand All @@ -70,11 +71,13 @@ def __init__(self, workflows_mgr):
self.executors = {}
self.delta_queues = {}

def update_contact(self, w_id, contact_data=None):
def update_contact(
self, w_id, contact_data=None, status=None, pruned=False):
delta = DELTAS_MAP[ALL_DELTAS]()
delta.workflow.time = time.time()
flow = delta.workflow.updated
flow.id = w_id
flow.stamp = f'{w_id}@{delta.workflow.time}'
if contact_data:
# update with contact file data
flow.name = contact_data['name']
Expand All @@ -90,7 +93,12 @@ def update_contact(self, w_id, contact_data=None):
flow.port = 0
# flow.pub_port = 0
flow.api_version = 0
flow.status = WorkflowStatus.STOPPED.value

if status is not None:
flow.status = status
if pruned:
flow.pruned = True
delta.workflow.pruned = w_id

# Apply to existing workflow data
if 'delta_times' not in self.data[w_id]:
Expand Down Expand Up @@ -141,11 +149,23 @@ async def register_workflow(self, w_id):
self.data[w_id] = data

# create new entry in the delta store
self.update_contact(w_id)
self.update_contact(w_id, status=WorkflowStatus.INSTALLED.value)

async def unregister_workflow(self, w_id):
logger.debug(f'unregister_workflow({w_id})')
self.update_contact(w_id, pruned=True)
if w_id in self.delta_queues:
while any(
not delta_queue.empty()
for delta_queue in self.delta_queues[w_id].values()
):
await asyncio.sleep(self.PENDING_DELTA_CHECK_INTERVAL)
self.purge_workflow(w_id)

def stop_workflow(self, w_id):
logger.debug(f'stop_workflow({w_id})')
self.update_contact(w_id)
self.purge_workflow(w_id, data=False)
self.update_contact(w_id, status=WorkflowStatus.STOPPED.value)

def purge_workflow(self, w_id, data=True):
"""Purge the manager of a workflow's subscription and data."""
Expand Down
8 changes: 5 additions & 3 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,19 @@ async def test_unregister(
workflow from the data store attributes, and call the necessary
functions."""
workflow_name = 'unregister-me'
workflow_id = f'{getuser()}{ID_DELIM}{workflow_name}'
await uiserver.workflows_mgr._register(workflow_id, None)

uiserver.workflows_mgr._scan_pipe = empty_aiter()
uiserver.workflows_mgr.inactive.add(workflow_name)
uiserver.workflows_mgr.inactive.add(workflow_id)
# NOTE: here we will yield a workflow that is not running, it does
# not have the contact data and is inactive.
# This is what forces the .update() to call unregister()!

await uiserver.workflows_mgr.update()

# now the workflow is not active, nor inactive, it is unregistered
assert workflow_name not in uiserver.workflows_mgr.inactive
assert workflow_id not in uiserver.workflows_mgr.inactive


@pytest.mark.asyncio
Expand All @@ -400,7 +402,7 @@ async def test_connect(
If a workflow is running, but in the inactive state,
then the connect method will be called."""
workflow_name = 'connect'
workflow_id = f'{getuser()}|{workflow_name}'
workflow_id = f'{getuser()}{ID_DELIM}{workflow_name}'
uiserver.workflows_mgr.inactive.add(workflow_id)

assert workflow_id not in uiserver.workflows_mgr.active
Expand Down
6 changes: 3 additions & 3 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,13 @@ async def _disconnect(self, wid):

async def _unregister(self, wid):
"""Unregister a workflow from the data store."""
self.uiserver.data_store_mgr.purge_workflow(wid)
await self.uiserver.data_store_mgr.unregister_workflow(wid)

async def _stop(self, wid):
"""Mark a workflow as stopped.
The workflow can't do this itself, because it's not running.
"""
self.uiserver.data_store_mgr.purge_workflow(wid, data=False)
self.uiserver.data_store_mgr.stop_workflow(wid)

async def update(self):
Expand Down Expand Up @@ -266,7 +265,8 @@ async def update(self):
if before == 'active':
self.active.pop(wid)
elif before == 'inactive':
self.inactive.remove(wid)
if wid in self.inactive:
self.inactive.remove(wid)
if after == 'active':
self.active[wid] = flow
elif after == 'inactive':
Expand Down

0 comments on commit 418e2f0

Please sign in to comment.