Skip to content

Commit

Permalink
Merge pull request #85 from njsmith/task-local-storage
Browse files Browse the repository at this point in the history
Task-local storage
  • Loading branch information
dabeaz authored Oct 18, 2016
2 parents 307ace7 + ea401ce commit 564d2c9
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 6 deletions.
2 changes: 2 additions & 0 deletions curio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .workers import *
from .network import *
from .file import *
from .tls import *

__all__ = [ *errors.__all__,
*task.__all__,
Expand All @@ -21,4 +22,5 @@
*workers.__all__,
*network.__all__,
*file.__all__,
*tls.__all__,
]
13 changes: 8 additions & 5 deletions curio/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .errors import _CancelRetry
from .task import Task
from .traps import _read_wait, Traps
from .tls import _enable_tls_for, _copy_tls

# kqueue is the datatype used by the kernel for all of its queuing functionality.
# Any time a task queue is needed, use this type instead of directly hard-coding the
Expand Down Expand Up @@ -415,6 +416,7 @@ def _trap_future_wait(_, future, event):
# Add a new task to the kernel
def _trap_spawn(_, coro, daemon):
task = _new_task(coro, daemon)
_copy_tls(current, task)
_reschedule_task(current, value=task)

# Reschedule one or more tasks from a queue
Expand Down Expand Up @@ -616,11 +618,12 @@ def _trap_get_current(_):
try:
current.state = 'RUNNING'
current.cycles += 1
if current.next_exc is None:
trap = current._send(current.next_value)
else:
trap = current._throw(current.next_exc)
current.next_exc = None
with _enable_tls_for(current):
if current.next_exc is None:
trap = current._send(current.next_value)
else:
trap = current._throw(current.next_exc)
current.next_exc = None

except StopIteration as e:
_cleanup_task(current, value=e.value)
Expand Down
3 changes: 2 additions & 1 deletion curio/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Task(object):
'id', 'daemon', 'coro', '_send', '_throw', 'cycles', 'state',
'cancel_func', 'future', 'sleep', 'timeout', 'exc_info', 'next_value',
'next_exc', 'joining', 'cancelled', 'terminated', '_last_io', '_deadlines',
'__weakref__',
'task_local_storage', '__weakref__',
)
_lastid = 1

Expand All @@ -40,6 +40,7 @@ def __init__(self, coro, daemon=False, taskid=None):
self.joining = None # Optional set of tasks waiting to join with this one
self.cancelled = False # Cancelled?
self.terminated = False # Terminated?
self.task_local_storage = {} # Task local storage
self._last_io = None # Last I/O operation performed
self._send = coro.send # Bound coroutine methods
self._throw = coro.throw
Expand Down
107 changes: 107 additions & 0 deletions curio/tls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# curio/tls.py
#
# Task local storage

__all__ = ["Local"]

# Our public API is intentionally almost identical to that of threading.local:
# the user allocates a curio.Local() object, and then can attach arbitrary
# attributes to it. Reading one of these attributes later will return the last
# value that was assigned to this attribute *by code running inside the same
# Task*.
#
# Internally, each Task has an attribute .task_local_storage, which holds a
# dict. This dict has one entry for each Local object that has been accessed
# from this task context (i.e. entries here are lazily allocated), keyed by
# the Local object instance itself:
#
# {
# local_object_1: {
# "attr": value,
# "attr": value,
# ...
# },
# local_object_2: {
# "attr": value,
# "attr": value,
# ...
# },
# ...
# }
#
# From an async context one could access this dict with
#
# (await curio.current_task()).task_local_storage
#
# But, we also want to be able to access this from synchronous context,
# because one of the major use cases for this is tagging log messages with
# context, and there are lots of legacy third-party libraries that use the
# Python stdlib logging module. And it's synchronous. So if you want to
# capture their logs and feed them into something better and more context-ful,
# then you need to be able to get that context from synchronous-land.
#
# Therefore, whenever we resume a task, we stash a pointer to its
# .task_local_storage dictionary in a global *thread*-local variable. Then
# when we want to find a specific variable (local_obj.attr), we ultimately
# look in
#
# _current_task_local_storage.value[local_obj]["attr"]
#
# An unusual feature of this implementation (and our main deviation from
# threading.Local) is that we implement *TLS inheritance*, i.e., when you
# spawn a new task, then all TLS values set in the parent task are (shallowly)
# copied to the child task. This is a bit experimental, but very handy in
# cases like when a request handler spawns some small short-lived worker tasks
# as part of its processing and those want to do logging as well.

import threading
from contextlib import contextmanager

# The thread-local storage slot that points to the task-local storage dict for
# whatever task is currently running.
_current_task_local_storage = threading.local()
_current_task_local_storage.value = None

@contextmanager
def _enable_tls_for(task):
# Using a full save/restore pattern here is a little paranoid, but
# safe. Even if someone does something silly like calling curio.run() from
# inside a curio coroutine.
old = _current_task_local_storage.value
try:
_current_task_local_storage.value = task.task_local_storage
yield
finally:
_current_task_local_storage.value = old

# Called from _trap_spawn to implement TLS inheritance.
def _copy_tls(parent, child):
# Make a shallow copy of the values associated with each Local object.
for local, values in parent.task_local_storage.items():
child.task_local_storage[local] = dict(values)

# Given a Local object, find its associated dict in the current task (creating
# it if necessary.) Normally would be a method on Local, but __getattribute__
# makes that annoying. This is the simplest workaround.
def _local_dict(local):
return _current_task_local_storage.value.setdefault(local, {})

class Local:
def __getattribute__(self, name):
try:
return _local_dict(self)[name]
except KeyError:
# "from None" preserves the context, but makes it hidden from
# tracebacks by default; see PEP 409
raise AttributeError(name) from None

def __setattr__(self, name, value):
_local_dict(self)[name] = value

def __delattr__(self, name):
try:
del _local_dict(self)[name]
except KeyError:
# "from None" preserves the context, but makes it hidden from
# tracebacks by default; see PEP 409
raise AttributeError(name) from None
19 changes: 19 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ The following public attributes are available of :class:`Task` instances:

A boolean flag that indicates whether or not the task has run to completion.

Task local storage
------------------

Curio supports "task local storage". The API is modeled after the
"thread local storage" provided by :py:class:`threading.local`.

.. class:: Local

A class representing a bundle of task-local values. Objects of this
class have no particular attributes or methods. Instead, they serve
as a blank slate to which you can add whatever attributes you
like. Modifications made from within one task will only be visible
to that task -- with one exception: when you create a new task
using ``curio.spawn``, then any values assigned to
:py:class:`Local` objects in the parent task will be inherited by
the child. This inheritance takes the form of a shallow copy --
further changes in the parent won't affect the child, and further
changes in the child won't affect the parent.

Timeouts
--------
Any blocking operation in curio can be cancelled after a timeout. The following
Expand Down
94 changes: 94 additions & 0 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,100 @@ Curio provides the same synchronization primitives as found in the built-in
``threading`` module. The same techniques used by threads can be used with
curio.

Task-local storage
------------------

Sometimes it happens that you want to store some data that is specific
to a particular Task in place where it can be reached from anywhere,
without having to pass it around everywhere. For example, in a server
that responds to network requests, you might want to assign each
request a unique tag, and then make sure to include that unique tag in
all log messages generated while handling the request. If we were
using threads, the solution would be thread-local storage implemented
with :py:class:`threading.local`. In Curio, we use task-local storage,
implemented by ``curio.Local``. For example::

# tls-example.py

import curio

import random
r = random.Random(0)

request_info = curio.Local()

# Example logging function that tags each line with the request identifier.
def log(msg):
# Read from task-local storage:
request_tag = request_info.tag

print("request {}: {}".format(request_tag, msg))

async def concurrent_helper(job):
log("running helper task {}".format(job))
await curio.sleep(r.random())
log("finished helper task {}".format(job))

async def handle_request(tag):
# Write to task-local storage:
request_info.tag = tag

log("Request received")
await curio.sleep(r.random())
helpers = [
await curio.spawn(concurrent_helper(1)),
await curio.spawn(concurrent_helper(2)),
]
for helper in helpers:
await helper.join()
await curio.sleep(r.random())
log("Request complete")

async def main():
tasks = []
for i in range(3):
tasks.append(await curio.spawn(handle_request(i)))
for task in tasks:
await task.join()

if __name__ == "__main__":
curio.run(main())

which produces output like::

request 0: Request received
request 1: Request received
request 2: Request received
request 2: running helper task 1
request 2: running helper task 2
request 2: finished helper task 1
request 1: running helper task 1
request 1: running helper task 2
request 0: running helper task 1
request 0: running helper task 2
request 2: finished helper task 2
request 0: finished helper task 1
request 1: finished helper task 1
request 0: finished helper task 2
request 2: Request complete
request 1: finished helper task 2
request 1: Request complete
request 0: Request complete

Notice two features in particular:

- Unlike almost all other APIs in curio, accessing task-local storage
does *not* use ``await``. As an example of why this is useful,
imagine you wanted to capture logs written via the standard library
:py:mod:`logging` module, and annotate them with request
identifiers. Because :py:mod:`logging` is synchronous, this would be
impossible if accessing task-local storage required ``await``.

- Unlike :py:class:`threading.local`, Curio task-local variables are
*inherited*. Notice how in our example above, the logs from
``concurrent_helper`` are tagged with the appropriate request.


Programming Advice
------------------

Expand Down
Loading

0 comments on commit 564d2c9

Please sign in to comment.