Skip to content

Commit

Permalink
Added test for custom Stream.hashkey
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Oct 23, 2017
1 parent 9bc7efe commit 0757d8c
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions tests/testdynamic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from collections import deque
import time

Expand Down Expand Up @@ -572,6 +573,35 @@ def history_callback(x, y, history=deque(maxlen=10)):
self.assertEqual(xresets, 2)
self.assertEqual(yresets, 2)

def test_dynamic_callable_stream_hashkey(self):
# Enable transient stream meaning memoization only happens when
# stream is inactive, should have sample for each call to
# stream.update
def history_callback(x, history=deque(maxlen=10)):
if x is not None:
history.append(x)
return Curve(list(history))

class NoMemoize(PointerX):
@property
def hashkey(self): return {'hash': uuid.uuid4().hex}

x = NoMemoize()
dmap = DynamicMap(history_callback, kdims=[], streams=[x])

# Add stream subscriber mocking plot
x.add_subscriber(lambda **kwargs: dmap[()])

for i in range(2):
x.event(x=1)
self.assertEqual(dmap[()], Curve([1, 1, 1]))

for i in range(2):
x.event(x=2)

self.assertEqual(dmap[()], Curve([1, 1, 1, 2, 2, 2]))



class TestPeriodicStreamUpdate(ComparisonTestCase):

Expand Down

0 comments on commit 0757d8c

Please sign in to comment.