Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft committed Aug 26, 2020
1 parent ec87e8f commit 1df943f
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def __init__(

if continuation_token and container_client.primary_hostname != continuation_token['UrlHost']:
raise ValueError('The token is not for the current storage account.')
if continuation_token and continuation_token['CursorVersion'] != 1:
raise ValueError('The CursorVersion is not supported by the current SDK.')
self.results_per_page = results_per_page or 5000
self.current_page = None
self._change_feed = ChangeFeed(container_client, self.results_per_page, start_time=start_time,
Expand Down Expand Up @@ -344,11 +346,11 @@ def __next__(self):
try:
event = next(self.file_reader)
self.cursor['EventIndex'] = self._data_stream.event_index
self.cursor['BlockOffset'] = self._data_stream.event_position
self.cursor['BlockOffset'] = self._data_stream.object_position
return event
except StopIteration:
self.cursor['EventIndex'] = self._data_stream.event_index
self.cursor['BlockOffset'] = self._data_stream.event_position
self.cursor['BlockOffset'] = self._data_stream.object_position
raise StopIteration

next = __next__ # Python 2 compatibility.
Expand Down Expand Up @@ -378,7 +380,7 @@ class ChangeFeedStreamer(object):
def __init__(self, blob_client, chunk_file_start=0):
self._chunk_file_start = chunk_file_start or 0 # this value will never be updated
self._download_offset = self._chunk_file_start # range start of the next download
self.event_position = self._chunk_file_start # track the most recently read sync marker position
self.object_position = self._chunk_file_start # track the most recently read sync marker position
self.event_index = 0
self._point = self._chunk_file_start # file cursor position relative to the whole chunk file, not the buffered
self._chunk_size = 4 * 1024 * 1024
Expand Down Expand Up @@ -448,9 +450,9 @@ def read(self, size):

return data

def track_event_position(self):
self.event_position = self.tell()
def track_object_position(self):
self.object_position = self.tell()

def set_event_index(self, event_index):
def set_object_index(self, event_index):
self.event_index = event_index

Original file line number Diff line number Diff line change
Expand Up @@ -124,36 +124,6 @@ def list_events_in_live_mode(self):
for event in change_feed_page3:
print(event)

def list_events_in_live_mode_continuously_without_waiting_for_a_minute(self):
# Instantiate a ChangeFeedClient
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
# to get continuation token
start_time = datetime(2020, 8, 19, 10)
change_feed = cf_client.list_changes(start_time=start_time).by_page()

for page in change_feed:
for event in page:
print(event)
token = change_feed.continuation_token

print("continue printing events")
# restart using the continuation token while there's no event's added yet
change_feed2 = cf_client.list_changes(results_per_page=56).by_page(continuation_token=token)
for change_feed_page2 in change_feed2:
for event in change_feed_page2:
print(event)

print("continue printing events")
# There's no event's added yet
token2 = change_feed2.continuation_token
change_feed3 = cf_client.list_changes(results_per_page=56).by_page(continuation_token=token2)
try:
change_feed_page3 = next(change_feed3)
for event in change_feed_page3:
print(event)
except StopIteration:
pass

if __name__ == '__main__':
sample = ChangeFeedSamples()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, resource_group, lo
start_time = datetime(2020, 8, 19, 23)
change_feed = cf_client.list_changes(start_time=start_time).by_page()

events = list()
for page in change_feed:
for event in page:
print(event)
events.append(event)
token = change_feed.continuation_token

dict_token = eval(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
Expand All @@ -144,21 +146,32 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, resource_group, lo
print("continue printing events")

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=5).by_page(continuation_token=token)
change_feed2 = cf_client.list_changes(results_per_page=6).by_page(continuation_token=token)
change_feed_page2 = next(change_feed2)
events2 = list()
for event in change_feed_page2:
print(event)
events2.append(event)

self.assertNotEqual(events2, 0)

if self.is_live:
sleep(120)
print("continue printing events")

# restart using the continuation token which has Non-zero EventIndex for 3 shards
token2 = change_feed2.continuation_token
dict_token2 = eval(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'], 0)

change_feed3 = cf_client.list_changes(results_per_page=57).by_page(continuation_token=token2)
change_feed_page3 = next(change_feed3)
events3 = list()
for event in change_feed_page3:
print(event)
events3.append(event)
self.assertNotEqual(events2, 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_tail_where_only_1_shard_has_data(self, resource_group, location, storage_account, storage_account_key):
Expand All @@ -169,11 +182,14 @@ def test_read_change_feed_tail_where_only_1_shard_has_data(self, resource_group,
change_feed = cf_client.list_changes(start_time=start_time, results_per_page=3).by_page()

page = next(change_feed)
events_on_first_page = list()
for event in page:
aaaaaa = event
token = change_feed.continuation_token
events_on_first_page.append(event)

token = change_feed.continuation_token
dict_token = eval(token)

self.assertEqual(len(events_on_first_page), 3)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ def __init__(self, reader, datum_reader, **kwargs):
# get ready to read
self._block_count = 0

# event_position and block_count are to support reading from current position in the future read,
# no need to downloading from the beginning of avro file with these two attr.
if hasattr(self._reader, 'event_position'):
self.reader.track_event_position()
# object_position is to support reading from current position in the future read,
# no need to downloading from the beginning of avro.
if hasattr(self._reader, 'object_position'):
self.reader.track_object_position()

self._cur_event_index = 0
self._cur_object_index = 0
# header_reader indicates reader only has partial content. The reader doesn't have block header,
# so we read use the block count stored last time.
# Also ChangeFeed only has codec==null, so use _raw_decoder is good.
Expand Down Expand Up @@ -228,26 +228,28 @@ def __next__(self):
if self.block_count == 0:
self._skip_sync()

# event_position and block_count are to support reading from current position in the future read,
# no need to downloading from the beginning of avro file with these two attr.
if hasattr(self._reader, 'event_position'):
self.reader.track_event_position()
# object_position is to support reading from current position in the future read,
# no need to downloading from the beginning of avro file with this attr.
if hasattr(self._reader, 'object_position'):
self.reader.track_object_position()
self._cur_object_index = 0

self._read_block_header()
self._cur_event_index = 0

datum = self.datum_reader.read(self.datum_decoder)
self._block_count -= 1
self._cur_event_index += 1
self._cur_object_index += 1

# This only applies for changefeed
if hasattr(self._reader, 'event_position'):
# object_position is to support reading from current position in the future read,
# This will track the index of the next item to be read.
# This will also track the offset before the next sync marker.
if hasattr(self._reader, 'object_position'):
if self.block_count == 0:
# the next event to be read is at index 0 in the new chunk of blocks,
self.reader.track_event_position()
self.reader.set_event_index(0)
self.reader.track_object_position()
self.reader.set_object_index(0)
else:
self.reader.set_event_index(self._cur_event_index)
self.reader.set_object_index(self._cur_object_index)

return datum

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, reader, datum_reader, **kwargs):
self._datum_reader = datum_reader
self.codec = "null"
self._block_count = 0
self._cur_object_index = 0
self._meta = None
self._sync_marker = None

Expand All @@ -68,13 +69,16 @@ async def init(self):
# get ready to read
self._block_count = 0

# object_position is to support reading from current position in the future read,
# no need to downloading from the beginning of avro.
if hasattr(self._reader, 'object_position'):
self.reader.track_object_position()

# header_reader indicates reader only has partial content. The reader doesn't have block header,
# so we read use the block count stored last time.
# Also ChangeFeed only has codec==null, so use _raw_decoder is good.
if self._header_reader is not None:
self._block_count = self._reader.block_count
self._datum_decoder = self._raw_decoder

self.datum_reader.writer_schema = (
schema.parse(self.get_meta(SCHEMA_KEY).decode('utf-8')))
return self
Expand Down Expand Up @@ -176,16 +180,29 @@ async def __anext__(self):
"""Return the next datum in the file."""
if self.block_count == 0:
await self._skip_sync()

# object_position is to support reading from current position in the future read,
# no need to downloading from the beginning of avro file with this attr.
if hasattr(self._reader, 'object_position'):
await self.reader.track_object_position()
self._cur_object_index = 0

await self._read_block_header()

datum = await self.datum_reader.read(self.datum_decoder)
self._block_count -= 1

# event_position and block_count are to support reading from current position in the future read,
# no need to downloading from the beginning of avro file with these two attr.
if hasattr(self._reader, 'event_position'):
self.reader.block_count = self.block_count
await self.reader.track_event_position()
self._cur_object_index += 1

# object_position is to support reading from current position in the future read,
# This will track the index of the next item to be read.
# This will also track the offset before the next sync marker.
if hasattr(self._reader, 'object_position'):
if self.block_count == 0:
# the next event to be read is at index 0 in the new chunk of blocks,
await self.reader.track_object_position()
await self.reader.set_object_index(0)
else:
await self.reader.set_object_index(self._cur_object_index)

return datum

Expand Down
12 changes: 8 additions & 4 deletions sdk/storage/azure-storage-blob/tests/avro/test_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,15 @@ def test_with_hearder_reader(self):
df_reader = DataFileReader(partial_data_stream, DatumReader(), header_reader=header_stream)
records = list(df_reader)
self.assertEqual(CHANGE_FEED_RECORD, records[0])
self.assertIsNot(partial_data_stream.event_position, 0)
self.assertIsNot(partial_data_stream.object_position, 0)


class _HeaderStream(object):
def __init__(self):
self._bytes_stream = BytesIO()
self.event_position = 0
self.object_position = 0
self.block_count = 0
self.event_index = 0

def seek(self, *args, **kwargs):
return self._bytes_stream.seek(*args, **kwargs)
Expand All @@ -182,5 +183,8 @@ def write(self, *args, **kwargs):
def tell(self, *args, **kwargs):
return self._bytes_stream.tell(*args, **kwargs)

def track_event_position(self):
self.event_position = self.tell()
def track_object_position(self):
self.object_position = self.tell()

def set_object_index(self, event_index):
self.event_index = event_index
31 changes: 23 additions & 8 deletions sdk/storage/azure-storage-blob/tests/avro/test_avro_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import asyncio
import inspect
import os
from io import BytesIO
Expand Down Expand Up @@ -65,7 +65,7 @@ def setUpClass(cls):
cls._samples_dir_root = os.path.join(os.path.dirname(test_file_path), 'samples')

@pytest.mark.asyncio
async def test_reader(self):
async def _test_reader(self):
correct = 0
nitems = 10
for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
Expand All @@ -84,8 +84,12 @@ async def test_reader(self):
correct,
len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))

def test_reader(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._test_reader())

@pytest.mark.asyncio
async def test_change_feed(self):
async def _test_change_feed(self):
file_path = os.path.join(AvroReaderTestsAsync._samples_dir_root, 'changeFeed.avro')
with open(file_path, 'rb') as reader:
datum_reader = AsyncDatumReader()
Expand All @@ -98,8 +102,12 @@ async def test_change_feed(self):
expected_record = CHANGE_FEED_RECORD
self.assertEqual(expected_record, data[0])

def test_change_feed(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._test_change_feed())

@pytest.mark.asyncio
async def test_with_hearder_reader(self):
async def _test_with_hearder_reader(self):
# Note: only when the data stream doesn't have header, we need header stream to help
file_path = os.path.join(AvroReaderTestsAsync._samples_dir_root, 'changeFeed.avro')
# this data stream has header
Expand Down Expand Up @@ -128,14 +136,18 @@ async def test_with_hearder_reader(self):
async for record in df_reader:
records.append(record)
self.assertEqual(CHANGE_FEED_RECORD, records[0])
self.assertIsNot(partial_data_stream.event_position, 0)
self.assertIsNot(partial_data_stream.object_position, 0)

def test_with_hearder_reader(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._test_with_hearder_reader())

class _HeaderStream(object):
def __init__(self):
self._bytes_stream = BytesIO()
self.event_position = 0
self.object_position = 0
self.block_count = 0
self.event_index = 0

async def seek(self, *args, **kwargs):
return self._bytes_stream.seek(*args, **kwargs)
Expand All @@ -149,8 +161,11 @@ async def write(self, *args, **kwargs):
async def tell(self, *args, **kwargs):
return self._bytes_stream.tell(*args, **kwargs)

async def track_event_position(self):
self.event_position = self.tell()
async def track_object_position(self):
self.object_position = self.tell()

async def set_object_index(self, event_index):
self.event_index = event_index

async def close(self):
self._bytes_stream.close()

0 comments on commit 1df943f

Please sign in to comment.