Skip to content

Commit

Permalink
Added StreamDataFrame and plotting optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Oct 23, 2017
1 parent b63a384 commit 62a7dd1
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 7 deletions.
12 changes: 8 additions & 4 deletions holoviews/plotting/bokeh/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ...core import Store, DynamicMap, CompositeOverlay, Element, Dimension
from ...core.options import abbreviated_exception, SkipRendering
from ...core import util
from ...streams import Stream
from ...streams import Stream, StreamDataFrame
from ..plot import GenericElementPlot, GenericOverlayPlot
from ..util import dynamic_update
from .plot import BokehPlot, TOOLS
Expand Down Expand Up @@ -180,6 +180,8 @@ def __init__(self, element, plot=None, **params):
self.static = len(self.hmap) == 1 and len(self.keys) == len(self.hmap)
self.callbacks = self._construct_callbacks()
self.static_source = False
dfstream = [s for s in self.streams if isinstance(s, StreamDataFrame)]
self.streaming = dfstream[0] if any(dfstream) else None

# Whether axes are shared between plots
self._shared = {'x': False, 'y': False}
Expand Down Expand Up @@ -565,9 +567,9 @@ def _update_ranges(self, element, ranges):
if any(isinstance(ax_range, FactorRange) for ax_range in [x_range, y_range]):
xfactors, yfactors = self._get_factors(element)
framewise = self.framewise
if not self.drawn or (not self.model_changed(x_range) and framewise) or xfactors:
if not self.drawn or (not self.model_changed(x_range) and framewise or self.streaming) or xfactors:
self._update_range(x_range, l, r, xfactors, self.invert_xaxis, self._shared['x'], self.logx)
if not self.drawn or (not self.model_changed(y_range) and framewise) or yfactors:
if not self.drawn or (not self.model_changed(y_range) and framewise or self.streaming) or yfactors:
self._update_range(y_range, b, t, yfactors, self.invert_yaxis, self._shared['y'], self.logy)


Expand Down Expand Up @@ -850,7 +852,9 @@ def update_frame(self, key, ranges=None, plot=None, element=None):
if hasattr(renderer, 'visible'):
renderer.visible = bool(element)

if (self.batched and not element) or element is None or (not self.dynamic and self.static):
if ((self.batched and not element) or element is None or (not self.dynamic and self.static) or
(self.streaming and self.streaming.data is self.current_frame.data
and not self.streaming._triggering)):
return

if self.batched:
Expand Down
6 changes: 5 additions & 1 deletion holoviews/plotting/bokeh/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ def _update_datasource(self, source, data):
"""
Update datasource with data for a new frame.
"""
source.data.update(data)
if self.streaming and self.streaming.data is self.current_frame.data:
data = {k: v[-self.streaming._chunk_length:] for k, v in data.items()}
source.stream(data, self.streaming.backlog)
else:
source.data.update(data)

@property
def state(self):
Expand Down
8 changes: 6 additions & 2 deletions holoviews/plotting/bokeh/tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
from ...core import Dataset
from ...element import ItemTable
from ...streams import StreamDataFrame
from ..plot import GenericElementPlot
from .plot import BokehPlot
from .util import bokeh_version
Expand Down Expand Up @@ -32,6 +33,8 @@ def __init__(self, element, plot=None, **params):
element_ids = self.hmap.traverse(lambda x: id(x), [Dataset, ItemTable])
self.static = len(set(element_ids)) == 1 and len(self.keys) == len(self.hmap)
self.callbacks = [] # Callback support on tables not implemented
dfstream = [s for s in self.streams if isinstance(s, StreamDataFrame)]
self.streaming = dfstream[0] if any(dfstream) else None


def _execute_hooks(self, element):
Expand Down Expand Up @@ -114,8 +117,9 @@ def update_frame(self, key, ranges=None, plot=None):
current_id = element._plot_id
self.handles['previous_id'] = current_id
self.static_source = (self.dynamic and (current_id == previous_id))

if self.static_source:
if (element is None or (not self.dynamic and self.static) or
(self.streaming and self.streaming.data is self.current_frame.data
and not self.streaming._triggering) or self.static_source):
return
source = self.handles['source']
style = self.lookup_options(element, 'style')[self.cyclic_index]
Expand Down
51 changes: 51 additions & 0 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,57 @@ def hashkey(self):
return {'hash': uuid.uuid4().hex}


class StreamDataFrame(StreamData):
"""
StreamDataFrame provides an adaptor to attach streamz
StreamingDataFrame to a HoloViews stream. The stream will accumulate
a DataFrame up to the specified ``backlog`` of rows. The accumulated
dataframe is then made available via the ``data`` parameter.
"""

def __init__(self, sdf, backlog=1000, **params):
try:
from streamz.dataframe import StreamingDataFrame, StreamingSeries
except ImportError:
raise ImportError("StreamDataFrame requires streamz library to be available")
if isinstance(sdf, StreamingSeries):
sdf = sdf.to_frame()
elif not isinstance(sdf, StreamingDataFrame):
raise ValueError("StreamDataFrame must be instantiated with a "
"streamz.StreamingDataFrame or streamz.StreamingSeries")

if 'data' not in params:
params['data'] = sdf.example.reset_index()
super(StreamDataFrame, self).__init__(**params)
self.sdf = sdf
self.backlog = backlog
self._chunk_length = 0
self._count = 0


def update(self, **kwargs):
"""
Overrides update to concatenate streamed data up to backlog.
"""
data = kwargs.get('data')
if data is not None:
data_length = len(data)
data = data.reset_index()
if data_length < self.backlog:
prev_chunk = self.data.iloc[-(self.backlog-data_length):]
data = util.pd.concat([prev_chunk, data])
self._chunk_length = data_length
kwargs['data'] = data
self._count += 1
super(StreamDataFrame, self).update(**kwargs)


@property
def hashkey(self):
return {'hash': self._count}



class LinkedStream(Stream):
"""
A LinkedStream indicates is automatically linked to plot interactions
Expand Down

0 comments on commit 62a7dd1

Please sign in to comment.