Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for streaming data #2011

Merged
merged 20 commits into from
Oct 30, 2017
Merged

Support for streaming data #2011

merged 20 commits into from
Oct 30, 2017

Conversation

philippjfr
Copy link
Member

@philippjfr philippjfr commented Oct 21, 2017

This PR builds on #2007 adding a StreamDataFrame Stream, which lets you subscribe to updates from a streamz.StreamingDataFrame. The dependency is entirely optional and with a bit of glue it's very easy to subscribe to generate a streaming plot this way. For now I have just monkey-patched streamz using this code:

from streamz.dataframe import StreamingDataFrame, StreamingSeries, Random
from holoviews.streams import StreamDataFrame

def to_hv(self, function, backlog=2000):
    return hv.DynamicMap(function, streams=[StreamDataFrame(self, backlog=backlog)])

StreamingDataFrame.to_hv = to_hv
StreamingSeries.to_hv = to_hv

The individual chunks emitted by the StreamingDataFrame are fed through a user supplied callback and are then used to update the bokeh plot using the CDS.stream method, which sends just the most recent samples. It may also be worth considering an option where StreamingDataFrame itself accumulates data rather than just emitting chunks, which can also be very useful. The current approach makes something like this possible:

streamz

@philippjfr philippjfr added the type: feature A major new feature label Oct 21, 2017
@jlstevens
Copy link
Contributor

jlstevens commented Oct 22, 2017

After a long discussion with @philippjfr I came around to this approach for now, even if we find a deeper way to integrate streamz support in the long term. There are a few changes that would help:

  • Don't show patches anywhere that the user can see ie. when passing data into an element.
  • Enable the bokeh streaming optimization if the holoviews stream is there AND the data in the element while plotting is the data in the stream (allows optimization even if the data is mutated in the user callback)
  • Issue a warning if the stream is present but the optimization is disabled because the identity test fails - the user may have made a copy, disabling the optimization which they should be made aware of.

@philippjfr
Copy link
Member Author

Ready to review once #2007 is merged.

@philippjfr
Copy link
Member Author

Happy to hold off on merging until I've added a user guide entry and some examples.

@philippjfr
Copy link
Member Author

Issue a warning if the stream is present but the optimization is disabled because the identity test fails - the user may have made a copy, disabling the optimization which they should be made aware of.

Note that I don't think we should warn here, modifying the data can often be useful, e.g. when computing a histogram of the streaming window.

@philippjfr philippjfr force-pushed the streamz_df branch 3 times, most recently from b2f8a26 to 20272f8 Compare October 23, 2017 12:02
@philippjfr
Copy link
Member Author

philippjfr commented Oct 23, 2017

@jbednar Ready for you to make a pass over the user guide. If you wouldn't mind could you also document the chunksize option for the DataFrameStream, which is a feature you requested for smoother updates.

@hsparra
Copy link

hsparra commented Oct 24, 2017

a. These two statements together are pretty dense:

stream.sliding_window(10).map(pd.concat).sink(stream_data.send)
hv.DynamicMap(hv.Scatter, streams=[stream_data]).redim.range(x=(-3, 3), y=(-3, 3))

also, is map(something) required?

b. Reading this, " which will wait for 10 sets of stream updates to accumulate and then apply pd.concat to combine those updates,” it sounds to me like it will send chunk 1-10 together, then send again with chunks 11-20, then again with chunks 21-30, etc. I suggest a rewording of "which will wait for 10 sets of stream updates to accumulate and then apply pd.concat to combine those updates." to make it clearer how it works. The current wording is also not clear if it continues pass the 10th chunk.

Everything else seems clear to me and is relatively easy to follow.

"""

def __init__(self, sdf, backlog=1000, chunksize=None, **params):
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess backlog and chunksize can't be Parameters because Streams use Parameters for their own special purposes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, although I could filter them out. @jlstevens might want to chime in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They certainly seem like they ought to be Parameters, so that they can be set at the class or instance level as appropriate.

@@ -0,0 +1,383 @@
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite a long title; not sure if "Working with" adds anything? Maybe "Streaming Data"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe "Streaming Data Sources", if that helps avoid confusion with hv streams in general.

@jbednar
Copy link
Member

jbednar commented Oct 24, 2017

Note that I don't think we should warn here, modifying the data can often be useful, e.g. when computing a histogram of the streaming window.

You could maybe warn in such a case, if the user hasn't passed a special argument writeable=True? Fine by me either way.

@jbednar
Copy link
Member

jbednar commented Oct 24, 2017

It looks like this notebook needs to be run cell by cell for it to work well, which I don't usually like; I'd rather it just sit there consuming CPU so that I can read each bit as I wish, not necessarily one cell at a time in order. So it would be good to show how to have a stream that ends, but I don't think most of the examples should be like that; seems to me that most should just keep going indefinitely.

In any case, particularly if single-cell running is encouraged, please change variables to be unique. E.g. source and sdf are defined differently in different cells, which causes confusion and apparent errors when run out of order.

@jbednar
Copy link
Member

jbednar commented Oct 25, 2017

In "Applying operations", instead of redefining source and sdf, can you do something like source.restart() or source.start() to resume that stream? I don't see any operation like that on the Random argument, but if that's not supported by streamz; seems like it ought to be...

@jbednar
Copy link
Member

jbednar commented Oct 25, 2017

  1. Should zooming in work, e.g. on the last plot? It doesn't seem to, even if only covering a shorter rather than a longer time; the ranges reset when the next update happens.
  2. Would it be reasonable to set up streams that set the backlog to the right default when appropriate, such as when the backlog maps to an x axis of a plot? This might address item 1.
  3. Can we have at least one datashader plot with a datetime axis? I think that will be the first thing people will say when they see this one.
  4. I think we need a spatial datashader plot as well.

Apart from that, thanks, and it looks great! I'll push my changes to the notebook now.

@philippjfr
Copy link
Member Author

Should zooming in work, e.g. on the last plot? It doesn't seem to, even if only covering a shorter rather than a longer time; the ranges reset when the next update happens.

What would you expect to happen? The axis has to follow the current window, so it's not clear to me how you could avoid setting the range when a new event comes in.

Would it be reasonable to set up streams that set the backlog to the right default when appropriate, such as when the backlog maps to an x axis of a plot? This might address item 1.

I'm not sure what you mean here either, the backlog determines the range of the axis not the other way around.

Can we have at least one datashader plot with a datetime axis? I think that will be the first thing people will say when they see this one.

I'll update once #2023 is merged.

I think we need a spatial datashader plot as well.

Sure, I'll add that if you haven't already.

@jbednar
Copy link
Member

jbednar commented Oct 26, 2017

On zooming in, I would expect the size of the viewport to change, even if the actual location of the viewport is tracking what comes in.

I'm mainly thinking about the x axis here, where zooming amounts to changing the backlog. If the backlog is 1000 points and I zoom into the x axis 2X, I'd expect to see 500 points at a time from then on. From there, if I zoom back to 1.0X, I'd expect to see 1000 points again. If I instead zoom out, I would expect to see a half-empty plot, because I know the backlog isn't keeping that much data, but I might hope that the backlog will dynamically update based on my zooming so that I now see up to 2000 points if I wait for a while, because the backlog has now been updated to 2000 points. This all seems uncontroversial to me, which doesn't mean it's feasible, but I think it does mean it's reasonable to consider.

The y axis is significantly trickier. If I zoom out, I'd expect to see the same data as before, with a buffer around the top and bottom; probably not very useful, but intuitive. If I zoom in, I'd expect to see half the y range as before, but the question is which bit of the y range, since that keeps jumping around all the time. And I guess the answer is the middle bit of the range, if I haven't done any panning, such that if the auto-ranged Y is 0.3 to 0.6, I'd expect to see 0.4 to 0.5 after zooming y 2X. And then I'd expect panning to show me 0.5 to 0.6 or 0.3 to 0.4, depending on how I drag it.

To motivate all this, just imagine a streaming plot that has quite a bit of data that is arriving very slowly, with a long backlog. Anyone looking at it is going to want to be able to zoom in to a bit of it and check it out, and will be surprised and annoyed if such zooming suddenly gets overwritten every time an update appears.

This all also brings up the issue of wanting to be able to scrub back into some portion of the history, e.g. to have a backlog of 5000 points, showing only the last 1000 but allowing scrubbing (panning to the left, in this case) over the previous 4000. This isn't stuff that needs to be handled in this PR, just functionality that I think is clearly desirable without any weight for how feasible it is to implement.

@jbednar
Copy link
Member

jbednar commented Oct 26, 2017

Oh, and I didn't see your note about documenting the chunksize option for the DataFrameStream, so I don't think I focused on that explicitly.

@philippjfr
Copy link
Member Author

philippjfr commented Oct 26, 2017

Okay, I agree none of that is particularly controversial and you can achieve the scrubbing back portion of what you suggested already. The rest may be feasible but I don't think it'll be in this PR. Hunt's suggestion of having some button that lets you stop updates to a plot seems very reasonable here, I could imagine a button that temporarily pauses updates to a plot (while leaving the actual stream intact). That way you could pause the plot, zoom in and then start it again once you've looked at the feature you're interested in.

@jbednar
Copy link
Member

jbednar commented Oct 26, 2017

Oh, yes, I meant to suggest that too, a "go offline" mode, which works only on the data available at that instant, and then rejoins the stream when you click off of it.

@philippjfr
Copy link
Member Author

Should also address #1775 by adding a psutils based example.

@stonebig
Copy link
Contributor

stonebig commented Oct 28, 2017

hi. On Windows the psutil example faile because there is no active/inactive/wired memory:

import psutil;psutil.virtual_memory();

svmem(total=4260392960, available=1139920896, percent=73.2, used=3120472064, free=1139920896)

then it fails, on maybe a missed patch from myself ?

################################################
# Define functions to get memory and CPU usage #
################################################

def get_mem_data():
    vmem = psutil.virtual_memory()
    df = pd.DataFrame(dict(active=vmem.used/vmem.total,
                           inactive=vmem.free/vmem.total,
                           wired=vmem.used/vmem.total),
                        index=[pd.Timestamp.now()])
    return df*100
ipython-input-22-327617f9cf72> in mem_stack(data)
     27     data = pd.melt(data, 'index', var_name='Type', value_name='Usage')
     28     areas = hv.Dataset(data).to(hv.Area, 'index', 'Usage')
---> 29     return hv.Area.stack(areas.overlay()).relabel('Memory')
     30 
     31 def cpu_box(data):

C:\WinPython\basedir36\buildQt5\winpython-64bit-3.6.x.0\python-3.6.3.amd64\lib\site-packages\holoviews\element\chart.py in stack(cls, areas)
    391         method.
    392         """
--> 393         baseline = np.zeros(len(areas.get(0)))
    394         stacked = areas.clone(shared_data=False)
    395         vdims = [areas.get(0).vdims[0], 'Baseline']

TypeError: object of type 'NoneType' has no len()

Out[22]:
:Layout
   .DynamicMap.I  :DynamicMap   []
   .DynamicMap.II :DynamicMap   []

@philippjfr
Copy link
Member Author

hi. On Windows the psutil example faile because there is no active/inactive/wired memory:

That's annoying, would you mind pasting what is available on Windows?

then it fails, on maybe a missed patch from myself ?

Yes, it seems you got an old version of this PR before I rebased fixes to Area.stack into it.

@stonebig
Copy link
Contributor

on windows, there is:

  • total
  • available
  • used
  • free

@philippjfr
Copy link
Member Author

Thanks, that's great! I'll update the example.

@jlstevens
Copy link
Contributor

jlstevens commented Oct 30, 2017

This is looking a lot better! To start off with, I would like to rework the start of the user guide a little. Currently the first sentence is:

"Streaming data" is data that is continuously generated, often by some external source like a remote website, a measuring device, or a simulator. This kind of data is common for financial time series, web server logs, scientific applications, and many other situations.

This sentence bothers me as it needs qualification. I've rewritten the introduction in a way I think is clearer given the material in the earlier user guides:

"Streaming data" is data that is continuously generated, often by some external source like a remote website, a measuring device, or a simulator. This kind of data is common for financial time series, web server logs, scientific applications, and many other situations. We have seen how to visualize any data output by a callable in the Live Data user guide and we have also seen how to use the HoloViews stream system to push events in the user guide sections Responding to Events and Custom Interactivity.

This user guide shows a third way of building an interactive plot, using DynamicMap and streams where instead of pushing plot metadata (such as zoom ranges, user triggered events such as Tap and so on) to a DynamicMap callback, the underlying data in the visualized elements are updated directly using a HoloViews Stream.

In particular, we will show how the HoloViews Pipe and Buffer streams can be used to work with streaming data sources without having to fetch or generate the data from inside the DynamicMap callable. Apart from streaming directly in HoloViews we will also explore working with streaming data coordinated by the separate streamz library from Matt Rocklin, which can make working with complex streaming pipelines much simpler.

This notebook makes use of the streamz library which you can obtain using either:

conda install streamz

or

pip install streamz

NOTE: This notebook uses examples where live updates are started, execute for a short period of time and then are stopped. To follow along with how it works, these example should be run one cell at a time in a Jupyter notebook to observe how each visualization is updated before these updates are stopped in a later cell.

Of course, you don't need to use this suggestion verbatim but I think it is important to explain how this user guide is different from the other ones involving DynamicMap and interactive updates.

@jlstevens
Copy link
Contributor

jlstevens commented Oct 30, 2017

I'll be making edits to this comment as I work through the user guide/code and I'll make a note at the end when I'm done!

Note that I am now making these changes to the notebook as I go.

Comments (continued)

  • I would elaborate...

    Since all Element types accept data of various forms we can use Pipe to push data to an Element through a DynamicMap.

    to ...

    Since all Element types accept data of various forms we can use Pipe to push data directly to the constructor of an Element through a DynamicMap.

  • I would make the following change from draw to update:

    ... providing the pipe as a stream, which will dynamically update a VectorField :

  • Added a paragraph and a small elaboration:

    This approach of using an element constructor directly does not allow you to use anything other than the default key and value dimensions. One simple workaround is to use functools.partial as demonstrated in the Controlling the backlog section.

    Since Pipe is completely general the data can be any custom type it provides a completely general mechanism to stream structured or unstructured data. Due to this generality it cannot provide some of the more complex features provided by the Buffer stream that we will now explore.

  • Switched to %%opts magic in brownian motion example.

API comment

@philippjfr I think we should use Buffer.size instead of Buffer.backlog even though streamz uses the latter.

@jbednar
Copy link
Member

jbednar commented Oct 30, 2017

I'm happy with your new suggested intro, except that "streaming directly in HoloViews" doesn't mean anything to me; isn't it all streaming directly in HoloVIews? Need to make what streamz achieves clearer. The GIF suggestion sounds promising, and it would be great to see a prototype.

@jlstevens
Copy link
Contributor

jlstevens commented Oct 30, 2017

@jbednar I have completed a full pass over the user guide making various edits and formatting fixes. I tried to clarify the vague bit of the introduction that Jim pointed out.

I'm now very happy with this API, functionality and user guide! Of course I haven't looked closely at the code yet but I expect that it is ok. That said, @philippjfr my last main API gripe is the backlog parameter: I really would prefer to talk about the size of a Buffer.

events emitted by a streamz object.

When streaming a DataFrame will use the DataFrame index by
default, this may be disabled by setting index=False.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the DataFrame index how? As in simply retain it or something more sophisticated?

"Since ``Pipe`` is completely general the data can be any custom type it provides a completely general mechanism to stream structured or unstructured data. Due to this generality it cannot provide some of the more complex features provided by the ``Buffer`` stream."
"This approach of using an element constructor directly does not allow you to use anything other than the default key and value dimensions. One simple workaround this limitation is to use ``functools.partial`` as demonstrated in the **Controlling the backlog section** below.\n",
"\n",
"Since ``Pipe`` is completely general the data can be any custom type it provides a completely general mechanism to stream structured or unstructured data. Due to this generality it cannot provide some of the more complex features provided by the ``Buffer`` stream that we will now explore."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't parse this sentence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - I assume you are referring to the last sentence above. I'll rewrite it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified the sentence in 881c70c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second-to-last, starting with Since.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll edit now with some misc fixes and you can fix up this sentence afterwards if needed.

@jlstevens
Copy link
Contributor

I have now looked through the code and I am now happy with this PR, including the user guide. The only sticking point I would like addressed before merging is whether backlog would be better named size.

@jbednar
Copy link
Member

jbednar commented Oct 30, 2017

I'm happy to have it merged with or without the change /s/backlog/size. Good job!

@jlstevens
Copy link
Contributor

After a quick brainstorm with @philippjfr we decided length might be better than size.

Happy to see this PR merged after that final change.

@jlstevens
Copy link
Contributor

Maybe you should make that change quickly before I think of more things to mention! ;-p

I'm just wondering if we should say something about the plotting optimization being disabled if the data in the element changes in the callback from what was supplied by the Buffer....

@philippjfr
Copy link
Member Author

Ready to merge once tests pass.

@jlstevens
Copy link
Contributor

Everything seems to be addressed after that last set of commits. Happy to merge once the tests pass.

@jlstevens
Copy link
Contributor

Tests are passing except for one build that was restarted due to a transient.

Merging.

Copy link

This pull request has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Oct 25, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type: feature A major new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants