Skip to content

Commit

Permalink
make the token into a str
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft committed Aug 26, 2020
1 parent cfcb5f9 commit ec87e8f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def __init__(
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
continuation_token = eval(continuation_token) if continuation_token else None

if continuation_token and container_client.primary_hostname != continuation_token['UrlHost']:
raise ValueError('The token is not for the current storage account.')
self.results_per_page = results_per_page or 5000
Expand All @@ -75,7 +77,7 @@ def _get_next_cf(self, continuation_token): # pylint:disable=unused-argument
def _extract_data_cb(self, event_list):
self.current_page = event_list

return self._change_feed.cursor, self.current_page
return str(self._change_feed.cursor), self.current_page


class ChangeFeed(object):
Expand All @@ -85,11 +87,19 @@ def __init__(self, client, page_size, start_time=None, end_time=None, cf_cursor=
self._segment_paths_generator = None
self.current_segment = None
self.start_time = start_time

# the end time is in str format
end_time_in_cursor = cf_cursor['EndTime'] if cf_cursor else None
# convert the end time in str format to a datetime object
end_time_in_cursor_obj = \
datetime.strptime(end_time_in_cursor, '%Y-%m-%dT%H:%M:%S+00:00') if end_time_in_cursor else None
# self.end_time is in datetime format
self.end_time = end_time or end_time_in_cursor_obj

cur_segment_cursor = cf_cursor['CurrentSegmentCursor'] if cf_cursor else None
self.end_time = end_time or end_time_in_cursor

self.cursor = {"CursorVersion": 1,
"EndTime": self.end_time,
"EndTime": self.end_time.strftime('%Y-%m-%dT%H:%M:%S+00:00') if self.end_time else "",
"UrlHost": self.client.primary_hostname}
self._initialize(cur_segment_cursor=cur_segment_cursor)

Expand Down
35 changes: 19 additions & 16 deletions sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,12 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, resource_group, lo
print(event)
token = change_feed.continuation_token

self.assertEqual(token['CursorVersion'], 1)
self.assertIsNotNone(token['UrlHost'])
self.assertEqual(len(token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(token['CurrentSegmentCursor']['CurrentShardPath'])
dict_token = eval(token)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

if self.is_live:
sleep(120)
Expand Down Expand Up @@ -172,11 +173,12 @@ def test_read_change_feed_tail_where_only_1_shard_has_data(self, resource_group,
aaaaaa = event
token = change_feed.continuation_token

self.assertEqual(token['CursorVersion'], 1)
self.assertIsNotNone(token['UrlHost'])
self.assertEqual(len(token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(token['CurrentSegmentCursor']['CurrentShardPath'])
dict_token = eval(token)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

# if self.is_live:
# sleep(120)
Expand Down Expand Up @@ -206,12 +208,13 @@ def test_read_change_feed_with_3_shards_in_a_time_range(self, resource_group, lo

token = change_feed.continuation_token

self.assertEqual(token['CursorVersion'], 1)
self.assertIsNotNone(token['EndTime'])
self.assertIsNotNone(token['UrlHost'])
self.assertEqual(len(token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(token['CurrentSegmentCursor']['CurrentShardPath'])
dict_token = eval(token)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['EndTime'])
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

change_feed2 = cf_client.list_changes().by_page(continuation_token=token)
events = list(next(change_feed2))
Expand Down

0 comments on commit ec87e8f

Please sign in to comment.