diff --git a/stormpiper/requirements_unpinned.txt b/stormpiper/requirements_unpinned.txt index a397b66d..6e904e64 100644 --- a/stormpiper/requirements_unpinned.txt +++ b/stormpiper/requirements_unpinned.txt @@ -40,9 +40,9 @@ alembic celery[redis] -fastapi +fastapi<0.100.0 starlette -pydantic +pydantic<2.0.0 httpx orjson uvicorn diff --git a/stormpiper/stormpiper/__init__.py b/stormpiper/stormpiper/__init__.py index 183401ef..e1aca0ba 100644 --- a/stormpiper/stormpiper/__init__.py +++ b/stormpiper/stormpiper/__init__.py @@ -4,7 +4,7 @@ All rights reserved. """ -__version__ = "1.0.1" +__version__ = "1.0.2" __author__ = "Austin Orr" __email__ = "aorr@geosyntec.com" diff --git a/stormpiper/stormpiper/bg_worker.py b/stormpiper/stormpiper/bg_worker.py index d9691aa4..294954ab 100644 --- a/stormpiper/stormpiper/bg_worker.py +++ b/stormpiper/stormpiper/bg_worker.py @@ -247,14 +247,13 @@ class Workflows: can_raise=True, msg="update results" ) - # refresh_all = group( # chains of chords cannot propagate errors in celery - # chain( - # _group1, - # _group2, - # _group3, - # # check_results.s(msg="refresh all tables"), - # ) - # ) | check_results.s(msg="finalize task") + refresh_all = group( # chains of chords cannot propagate errors in celery + chain( + _group1, + _group2, + _group3, + ) + ) | check_results.s(msg="finalize task") # test_refresh = group( # these run out of order for some reason. # chain( @@ -267,16 +266,7 @@ class Workflows: @celery_app.task(base=Singleton, acks_late=True, track_started=True) def run_refresh_task(): - # calling 'get' risks deadlocks in the backend. But since Celery won't propagate - # errors or pass signatures from chords, it appears to be impossible to work - # around this for a series workflow chains of chords rather than chains of tasks. - _ = Workflows._group1().get(disable_sync_subtasks=False, timeout=360) - - _ = Workflows._group2().get(disable_sync_subtasks=False, timeout=1200) - - _ = Workflows._group3().get(disable_sync_subtasks=False, timeout=360) - - return True + return Workflows.refresh_all() @celery_app.task(base=Singleton, acks_late=True, track_started=True)