Skip to content

Commit

Permalink
WorkerPool: Wait for previous task in _try_send_to_primary_thread
Browse files Browse the repository at this point in the history
In order to prevent tasks from running in a non-main thread,
wait for the previous task inside _try_send_to_primary_thread,
then schedule the next task. Add a main_thread_only execmodel
to distinguish this new behavior from the existing thread
execmodel, since users of the thread execmodel expect that
tasks can run in multiple threads concurrently.

Closes: pytest-dev#96
  • Loading branch information
zmedico committed Feb 16, 2024
1 parent 372168e commit ec5a2f7
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 15 deletions.
8 changes: 4 additions & 4 deletions doc/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()``
yourself and specify a larger or not timeout.


threading models: gevent, eventlet, thread
threading models: gevent, eventlet, thread, main_thread_only
===========================================

.. versionadded:: 1.2 (status: experimental!)

execnet supports "thread", "eventlet" and "gevent" as thread models
on each of the two sides. You need to decide which model to use
before you create any gateways::
execnet supports "main_thread_only", "thread", "eventlet" and "gevent"
as thread models on each of the two sides. You need to decide which
model to use before you create any gateways::

# content of threadmodel.py
import execnet
Expand Down
40 changes: 34 additions & 6 deletions src/execnet/gateway_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def Event(self):
def get_execmodel(backend):
if hasattr(backend, "backend"):
return backend
if backend == "thread":
if backend in ("thread", "main_thread_only"):
return ThreadExecModel()
elif backend == "eventlet":
return EventletExecModel()
Expand Down Expand Up @@ -322,7 +322,7 @@ def __init__(self, execmodel, hasprimary=False):
self._shuttingdown = False
self._waitall_events = []
if hasprimary:
if self.execmodel.backend != "thread":
if self.execmodel.backend not in ("thread", "main_thread_only"):
raise ValueError("hasprimary=True requires thread model")
self._primary_thread_task_ready = self.execmodel.Event()
else:
Expand All @@ -332,7 +332,7 @@ def integrate_as_primary_thread(self):
"""integrate the thread with which we are called as a primary
thread for executing functions triggered with spawn().
"""
assert self.execmodel.backend == "thread", self.execmodel
assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel
primary_thread_task_ready = self._primary_thread_task_ready
# interacts with code at REF1
while 1:
Expand All @@ -345,7 +345,11 @@ def integrate_as_primary_thread(self):
with self._running_lock:
if self._shuttingdown:
break
primary_thread_task_ready.clear()
# Only clear if _try_send_to_primary_thread has not
# yet set the next self._primary_thread_task reply
# after waiting for this one to complete.
if reply is self._primary_thread_task:
primary_thread_task_ready.clear()

def trigger_shutdown(self):
with self._running_lock:
Expand Down Expand Up @@ -376,6 +380,19 @@ def _try_send_to_primary_thread(self, reply):
# wake up primary thread
primary_thread_task_ready.set()
return True
elif (
self.execmodel.backend == "main_thread_only"
and self._primary_thread_task is not None
):
self._primary_thread_task.waitfinish()
self._primary_thread_task = reply
# wake up primary thread (it's okay if this is already set
# because we waited for the previous task to finish above
# and integrate_as_primary_thread will not clear it when
# it enters self._running_lock if it detects that a new
# task is available)
primary_thread_task_ready.set()
return True
return False

def spawn(self, func, *args, **kwargs):
Expand Down Expand Up @@ -1106,7 +1123,18 @@ def join(self, timeout=None):
class WorkerGateway(BaseGateway):
def _local_schedulexec(self, channel, sourcetask):
sourcetask = loads_internal(sourcetask)
self._execpool.spawn(self.executetask, (channel, sourcetask))
if self.execmodel.backend == "main_thread_only":
# TODO: Maybe use something like queue.Queue to queue an asynchronous
# spawn here in order to avoid using another thread.
import threading

t = threading.Thread(
target=self._execpool.spawn,
args=(self.executetask, (channel, sourcetask)),
)
t.start()
else:
self._execpool.spawn(self.executetask, (channel, sourcetask))

def _terminate_execution(self):
# called from receiverthread
Expand All @@ -1132,7 +1160,7 @@ def serve(self):
def trace(msg):
self._trace("[serve] " + msg)

hasprimary = self.execmodel.backend == "thread"
hasprimary = self.execmodel.backend in ("thread", "main_thread_only")
self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary)
trace("spawning receiver thread")
self._initreceive()
Expand Down
6 changes: 4 additions & 2 deletions testing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def anypython(request):
pytest.skip(f"no {name} found")
if "execmodel" in request.fixturenames and name != "sys.executable":
backend = request.getfixturevalue("execmodel").backend
if backend != "thread":
if backend not in ("thread", "main_thread_only"):
pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}")
return executable

Expand Down Expand Up @@ -173,7 +173,9 @@ def gw(request, execmodel, group):
return gw


@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session")
@pytest.fixture(
params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session"
)
def execmodel(request):
if request.param != "thread":
pytest.importorskip(request.param)
Expand Down
2 changes: 1 addition & 1 deletion testing/test_termination.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def doit():


def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel):
if execmodel.backend != "thread":
if execmodel.backend not in ("thread", "main_thread_only"):
pytest.xfail("test and execnet not compatible to greenlets yet")
gw = makegateway("popen")
q = execmodel.queue.Queue()
Expand Down
4 changes: 2 additions & 2 deletions testing/test_threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def wait_then_put():


def test_primary_thread_integration(execmodel):
if execmodel.backend != "thread":
if execmodel.backend not in ("thread", "main_thread_only"):
with pytest.raises(ValueError):
WorkerPool(execmodel=execmodel, hasprimary=True)
return
Expand All @@ -188,7 +188,7 @@ def func():


def test_primary_thread_integration_shutdown(execmodel):
if execmodel.backend != "thread":
if execmodel.backend not in ("thread", "main_thread_only"):
pytest.skip("can only run with threading")
pool = WorkerPool(execmodel=execmodel, hasprimary=True)
queue = execmodel.queue.Queue()
Expand Down

0 comments on commit ec5a2f7

Please sign in to comment.