-
Notifications
You must be signed in to change notification settings - Fork 557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mux prediction events #1405
Merged
Merged
Mux prediction events #1405
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
technillogue
force-pushed
the
syl/mux
branch
4 times, most recently
from
November 29, 2023 23:33
b835dbe
to
41f93a1
Compare
technillogue
force-pushed
the
syl/mux
branch
6 times, most recently
from
December 2, 2023 20:56
51a15b2
to
a4dba69
Compare
yorickvP
reviewed
Dec 4, 2023
technillogue
force-pushed
the
syl/mux
branch
7 times, most recently
from
December 5, 2023 08:45
6a4f27f
to
688b152
Compare
outstanding questions:
|
technillogue
force-pushed
the
syl/mux
branch
2 times, most recently
from
December 6, 2023 09:26
59ee830
to
976fba8
Compare
nickstenning
reviewed
Dec 6, 2023
nickstenning
reviewed
Dec 6, 2023
nickstenning
reviewed
Dec 6, 2023
yorickvP
reviewed
Dec 6, 2023
python/cog/server/worker.py
Outdated
trace("recv", event) | ||
except asyncio.CancelledError: | ||
return | ||
if id == "LOG" and "SETUP" in self._mux.outs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe check self._state here instead? then we can get rid of _mux.outs
technillogue
force-pushed
the
syl/mux
branch
2 times, most recently
from
December 7, 2023 23:40
9073dc6
to
294d603
Compare
…logs Signed-off-by: technillogue <technillogue@gmail.com>
Signed-off-by: technillogue <technillogue@gmail.com>
Signed-off-by: technillogue <technillogue@gmail.com>
…ad event loop Signed-off-by: technillogue <technillogue@gmail.com>
Signed-off-by: technillogue <technillogue@gmail.com>
Signed-off-by: technillogue <technillogue@gmail.com>
…er have capacity Signed-off-by: technillogue <technillogue@gmail.com>
previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there Signed-off-by: technillogue <technillogue@gmail.com>
…aphore Signed-off-by: technillogue <technillogue@gmail.com>
Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 13, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 13, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 13, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 13, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 21, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 21, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Feb 21, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
This was referenced May 17, 2024
Merged
technillogue
added a commit
that referenced
this pull request
Jun 19, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
Closed
technillogue
added a commit
that referenced
this pull request
Jun 19, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
technillogue
added a commit
that referenced
this pull request
Jul 18, 2024
* race utility for racing awaitables * start mux, tag events with id, read pipe in a task, get events from mux * use async pipe for async child loop * _shutting_down vs _terminating * race with shutdown event * keep reading events during shutdown, but call terminate after the last Done * emit heartbeats from mux.read * don't use _wait. instead, setup reads event from the mux too * worker semaphore and prediction ctx * where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly * fix event loop errors for <3.9 * keep track of predictions in flight explicitly and use that to route logs * don't wait for executor shutdown * progress: check for cancelation in task done_handler * let mux check if child is alive and set mux shutdown after leaving read event loop * close pipe when exiting * predict requires IDLE or PROCESSING * try adding a BUSY state distinct from PROCESSING when we no longer have capacity * move resetting events to setup() instead of _read_events() previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there * state_from_predictions_in_flight instead of checking the value of semaphore * make prediction_ctx "private" Signed-off-by: technillogue <technillogue@gmail.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
A critical part of concurrent predictions is multiplexing several prediction outputs over the same pipe. This takes a stab at that. Once this is done, we might be able to drop some parts of runner.
We tag each _PublicEventType with a prediction id, introduce a
Mux
, and have a_read_events
task responsible for reading events from the pipe and writing them to the mux. The mux adds it to the right queue, and then the places that previously called_wait
instead callMux.read
.We also add a semaphore and keep track of predictions in flight. READY is renamed to IDLE, but that may need to be reworked further.
Some challenges
[x] mux events
[x] doesn't deadlock
[x] hypothesis tests mostly pass
[ ] serious pipe implementation (future PR?)
[ ] cancellation
[x] READY / PROCESSING semaphore
[~] route predict logs to prediction if only one prediction is running