Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure garbage collection is able to clean up after bokeh server sessions are destroyed #3106

Merged
merged 7 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion holoviews/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import IPython # noqa (API import)
from .ipython import notebook_extension
extension = notebook_extension # noqa (name remapping)
except ImportError as e:
except ImportError:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but started raising flake errors all of a sudden.

class notebook_extension(param.ParameterizedFunction):
def __call__(self, *args, **opts): # noqa (dummy signature)
raise Exception("IPython notebook not available: use hv.extension instead.")
Expand Down
13 changes: 10 additions & 3 deletions holoviews/plotting/bokeh/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,15 @@ def __init__(self, plot, streams, source, **params):


def cleanup(self):
self.reset()
self.handle_ids = None
self.plot = None
self.source = None
self.streams = []
if self.comm:
self.comm.close()
Callback._callbacks = {k: cb for k, cb in Callback._callbacks.items()
if cb is not self}


def reset(self):
Expand Down Expand Up @@ -1143,10 +1150,10 @@ def find_link(cls, plot, link=None):
sources = plot.hmap.traverse(lambda x: x, [ViewableElement])
for src in sources:
if link is None:
if id(src) in Link.registry:
return (plot, Link.registry[id(src)])
if src in Link.registry:
return (plot, Link.registry[src])
else:
if id(link.target) == id(src):
if link.target is src:
return (plot, [link])

def validate(self):
Expand Down
34 changes: 24 additions & 10 deletions holoviews/plotting/bokeh/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ..plot import (DimensionedPlot, GenericCompositePlot, GenericLayoutPlot,
GenericElementPlot, GenericOverlayPlot)
from ..util import attach_streams, displayable, collate
from .callbacks import Callback, LinkCallback
from .callbacks import LinkCallback
from .util import (layout_padding, pad_plots, filter_toolboxes, make_axis,
update_shared_sources, empty_plot, decode_bytes)

Expand Down Expand Up @@ -103,12 +103,20 @@ def root(self):

@document.setter
def document(self, doc):
if (doc and hasattr(doc, 'on_session_destroyed') and
self.root is self.handles.get('plot') and not isinstance(self, AdjointLayoutPlot)):
doc.on_session_destroyed(self._session_destroy)
if self._document:
self._document._session_destroyed_callbacks.pop(self._session_destroy, None)

self._document = doc
if self.subplots:
for plot in self.subplots.values():
if plot is not None:
plot.document = doc

def _session_destroy(self, session_context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only reason this method exists is to drop session_context before calling cleanup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, bokeh checks signatures.

self.cleanup()

def __init__(self, *args, **params):
root = params.pop('root', None)
Expand Down Expand Up @@ -149,7 +157,7 @@ def _construct_callbacks(self):

cb_classes = set()
for _, source in sources:
streams = Stream.registry.get(id(source), [])
streams = Stream.registry.get(source, [])
registry = Stream._callbacks['bokeh']
cb_classes |= {(registry[type(stream)], stream) for stream in streams
if type(stream) in registry and stream.linked}
Expand Down Expand Up @@ -272,19 +280,25 @@ def cleanup(self):
if not isinstance(plot, (GenericCompositePlot, GenericElementPlot, GenericOverlayPlot)):
continue
streams = list(plot.streams)
plot.streams = []
plot._document = None

if plot.subplots:
plot.subplots.clear()

if isinstance(plot, GenericElementPlot):
for callback in plot.callbacks:
streams += callback.streams
callback.cleanup()

for stream in set(streams):
stream._subscribers = [
(p, subscriber) for p, subscriber in stream._subscribers
if get_method_owner(subscriber) not in plots
]
if not isinstance(plot, GenericElementPlot):
continue
for callback in plot.callbacks:
streams += callback.streams
callbacks = {k: cb for k, cb in callback._callbacks.items()
if cb is not callback}
Callback._callbacks = callbacks
callback.cleanup()

if self.comm:
self.comm.close()


def _fontsize(self, key, label='fontsize', common=True):
Expand Down
3 changes: 3 additions & 0 deletions holoviews/plotting/bokeh/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def __init__(self, plot, renderer=None, **params):
self._queue = []
self._active = False

if hasattr(self.plot.document, 'on_session_destroyed'):
self.plot.document.on_session_destroyed(self.plot._session_destroy)


@classmethod
def create_widget(self, dim, holomap=None, editable=False):
Expand Down
32 changes: 20 additions & 12 deletions holoviews/plotting/links.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import weakref
from collections import defaultdict

import param

from ..core import ViewableElement


class Link(param.Parameterized):
"""
Expand All @@ -22,14 +21,8 @@ class Link(param.Parameterized):
directional links between the source and target object.
"""

source = param.ClassSelector(class_=ViewableElement, doc="""
The source object of the link (required).""")

target = param.ClassSelector(class_=ViewableElement, doc="""
The target object of the link (optional).""")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shame to lose these as parameters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, although it did give me the idea of generalizing links at the panel level.

# Mapping from a source id to a Link instance
registry = defaultdict(list)
registry = weakref.WeakKeyDictionary()

# Mapping to define callbacks by backend and Link type.
# e.g. Link._callbacks['bokeh'][Stream] = Callback
Expand All @@ -38,7 +31,11 @@ class Link(param.Parameterized):
def __init__(self, source, target=None, **params):
if source is None:
raise ValueError('%s must define a source' % type(self).__name__)
super(Link, self).__init__(source=source, target=target, **params)

# Source is stored as a weakref to allow it to be garbage collected
self._source = weakref.ref(source) if source else None
self._target = weakref.ref(target) if target else None
super(Link, self).__init__(**params)
self.link()

@classmethod
Expand All @@ -49,17 +46,28 @@ def register_callback(cls, backend, callback):
"""
cls._callbacks[backend][cls] = callback

@property
def source(self):
return self._source() if self._source else None

@property
def target(self):
return self._target() if self._target else None

def link(self):
"""
Registers the Link
"""
self.registry[id(self.source)].append(self)
if self.source in self.registry:
self.registry[self.source].append(self)
else:
self.registry[self.source] = [self]

def unlink(self):
"""
Unregisters the Link
"""
links = self.registry.get(id(self.source))
links = self.registry.get(self.source)
if self in links:
links.pop(links.index(self))

Expand Down
33 changes: 23 additions & 10 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import uuid
import weakref
from numbers import Number
from collections import defaultdict
from contextlib import contextmanager
Expand Down Expand Up @@ -71,8 +72,10 @@ class Stream(param.Parameterized):
respectively.
"""

# Mapping from a source id to a list of streams
registry = defaultdict(list)
# Mapping from a source to a list of streams
# WeakKeyDictionary to allow garbage collection
# of unreferenced sources
registry = weakref.WeakKeyDictionary()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a dictionary type, doesn't this assume the keys are hashable? Using id meant you could always be sure you could use it as a key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik, none of the object types that we support as sources (elements, DynamicMaps, overlays etc.) implement any custom __hash__ method which would interfere with them being used as keys. The reason I used the id originally was to avoid references to the source sticking around, which was actually counter-productive it turns out.


# Mapping to define callbacks by backend and Stream type.
# e.g. Stream._callbacks['bokeh'][Stream] = Callback
Expand Down Expand Up @@ -199,7 +202,10 @@ def __init__(self, rename={}, source=None, subscribers=[], linked=False,
Some streams are configured to automatically link to the source
plot, to disable this set linked=False
"""
self._source = source

# Source is stored as a weakref to allow it to be garbage collected
self._source = weakref.ref(source) if source else None

self._subscribers = []
for subscriber in subscribers:
self.add_subscriber(subscriber)
Expand All @@ -218,7 +224,10 @@ def __init__(self, rename={}, source=None, subscribers=[], linked=False,
super(Stream, self).__init__(**params)
self._rename = self._validate_rename(rename)
if source is not None:
self.registry[id(source)].append(self)
if source in self.registry:
self.registry[source].append(self)
else:
self.registry[source] = [self]


@property
Expand Down Expand Up @@ -296,22 +305,26 @@ def rename(self, **mapping):
"""
params = {k: v for k, v in self.get_param_values() if k != 'name'}
return self.__class__(rename=mapping,
source=self._source,
source=(self._source() if self._source else None),
linked=self.linked, **params)

@property
def source(self):
return self._source
return self._source() if self._source else None


@source.setter
def source(self, source):
if self._source:
source_list = self.registry[id(self._source)]
old_source = self._source() if self._source else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you can't use a property getter when writing a setter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be fine, and if there was any more logic I'd have opted for using the getter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think using self.source is nicer than defining old_source. Doesn't need to hold up this PR though...

if old_source:
source_list = self.registry[old_source]
if self in source_list:
source_list.remove(self)
self._source = source
self.registry[id(source)].append(self)
self._source = weakref.ref(source)
if source in self.registry:
self.registry[source].append(self)
else:
self.registry[source] = [self]


def transform(self):
Expand Down
4 changes: 2 additions & 2 deletions holoviews/tests/teststreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,12 @@ def tearDown(self):
def test_source_registry(self):
points = Points([(0, 0)])
PointerX(source=points)
self.assertIn(id(points), Stream.registry)
self.assertIn(points, Stream.registry)

def test_source_registry_empty_element(self):
points = Points([])
PointerX(source=points)
self.assertIn(id(points), Stream.registry)
self.assertIn(points, Stream.registry)


class TestParameterRenaming(ComparisonTestCase):
Expand Down