Skip to content

Commit

Permalink
merge binary-chunks-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jul 7, 2014
2 parents c36aa48 + bc0bed0 commit ee74c44
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 63 deletions.
3 changes: 2 additions & 1 deletion aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ class ClientResponse:
content = None # Payload stream

connection = None # current connection
flow_control_class = FlowControlStreamReader # reader flow control
_reader = None # input stream
_response_parser = aiohttp.HttpResponseParser()
_connection_wr = None # weakref to self for releasing connection on del
Expand Down Expand Up @@ -555,7 +556,7 @@ def waiting_for_continue(self):
def _setup_connection(self, connection):
self._reader = connection.reader
self.connection = connection
self.content = FlowControlStreamReader(
self.content = self.flow_control_class(
connection.reader, loop=connection.loop)

msg = ('ClientResponse has to be closed explicitly! {}:{}:{}'
Expand Down
23 changes: 19 additions & 4 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
__all__ = ['EofStream', 'EOF_MARKER',
'StreamReader', 'DataQueue',
'FlowControlStreamReader', 'FlowControlDataQueue']
__all__ = ['EofStream',
'StreamReader', 'DataQueue', 'ChunksQueue',
'FlowControlStreamReader', 'FlowControlDataQueue',
'FlowControlChunksQueue']

import asyncio
import asyncio.streams
import collections

EOF_MARKER = b''
Expand Down Expand Up @@ -351,3 +351,18 @@ def read(self):
return (yield from super().read())
finally:
self._stream.pause_stream()


class ChunksQueue(DataQueue):
"""Like a :class:`DataQueue`, but for binary chunked data transfer."""

@asyncio.coroutine
def read(self):
try:
return (yield from super().read())
except EofStream:
return EOF_MARKER


class FlowControlChunksQueue(FlowControlDataQueue, ChunksQueue):
"""FlowControlChunksQueue resumes and pauses an underlying stream."""
7 changes: 7 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ def second_call(*args, **kwargs):
self.assertEqual(res, {'тест': 'пройден'})
self.assertTrue(self.response.close.called)

def test_override_flow_control(self):
class MyResponse(ClientResponse):
flow_control_class = aiohttp.FlowControlDataQueue
response = MyResponse('get', 'http://python.org')
response._setup_connection(self.connection)
self.assertIsInstance(response.content, aiohttp.FlowControlDataQueue)


class ClientRequestTests(unittest.TestCase):

Expand Down
140 changes: 82 additions & 58 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,131 +465,118 @@ class DataQueueTests(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
self.buffer = streams.DataQueue(loop=self.loop)

def tearDown(self):
self.loop.close()

def test_is_eof(self):
q = streams.DataQueue(loop=self.loop)
self.assertFalse(q.is_eof())
q.feed_eof()
self.assertTrue(q.is_eof())
self.assertFalse(self.buffer.is_eof())
self.buffer.feed_eof()
self.assertTrue(self.buffer.is_eof())

def test_at_eof(self):
q = streams.DataQueue(loop=self.loop)
self.assertFalse(q.at_eof())
q.feed_eof()
self.assertTrue(q.at_eof())
q._buffer.append(object())
self.assertFalse(q.at_eof())
self.assertFalse(self.buffer.at_eof())
self.buffer.feed_eof()
self.assertTrue(self.buffer.at_eof())
self.buffer._buffer.append(object())
self.assertFalse(self.buffer.at_eof())

def test_feed_data(self):
buffer = streams.DataQueue(loop=self.loop)

item = object()
buffer.feed_data(item)
self.assertEqual([item], list(buffer._buffer))
self.buffer.feed_data(item)
self.assertEqual([item], list(self.buffer._buffer))

def test_feed_eof(self):
buffer = streams.DataQueue(loop=self.loop)
buffer.feed_eof()
self.assertTrue(buffer._eof)
self.buffer.feed_eof()
self.assertTrue(self.buffer._eof)

def test_read(self):
item = object()
buffer = streams.DataQueue(loop=self.loop)
read_task = asyncio.Task(buffer.read(), loop=self.loop)
read_task = asyncio.Task(self.buffer.read(), loop=self.loop)

def cb():
buffer.feed_data(item)
self.buffer.feed_data(item)
self.loop.call_soon(cb)

data = self.loop.run_until_complete(read_task)
self.assertIs(item, data)

def test_read_eof(self):
buffer = streams.DataQueue(loop=self.loop)
read_task = asyncio.Task(buffer.read(), loop=self.loop)
read_task = asyncio.Task(self.buffer.read(), loop=self.loop)

def cb():
buffer.feed_eof()
self.buffer.feed_eof()
self.loop.call_soon(cb)

self.assertRaises(
streams.EofStream, self.loop.run_until_complete, read_task)

def test_read_cancelled(self):
buffer = streams.DataQueue(loop=self.loop)
read_task = asyncio.Task(buffer.read(), loop=self.loop)
read_task = asyncio.Task(self.buffer.read(), loop=self.loop)
test_utils.run_briefly(self.loop)
self.assertIsInstance(buffer._waiter, asyncio.Future)
self.assertIsInstance(self.buffer._waiter, asyncio.Future)

read_task.cancel()
self.assertRaises(
asyncio.CancelledError,
self.loop.run_until_complete, read_task)
self.assertTrue(buffer._waiter.cancelled())
self.assertTrue(self.buffer._waiter.cancelled())

buffer.feed_data(b'test')
self.assertIsNone(buffer._waiter)
self.buffer.feed_data(b'test')
self.assertIsNone(self.buffer._waiter)

def test_read_until_eof(self):
item = object()
buffer = streams.DataQueue(loop=self.loop)
buffer.feed_data(item)
buffer.feed_eof()
self.buffer.feed_data(item)
self.buffer.feed_eof()

data = self.loop.run_until_complete(buffer.read())
data = self.loop.run_until_complete(self.buffer.read())
self.assertIs(data, item)

self.assertRaises(
streams.EofStream, self.loop.run_until_complete, buffer.read())
streams.EofStream,
self.loop.run_until_complete, self.buffer.read())

def test_read_exception(self):
buffer = streams.DataQueue(loop=self.loop)
buffer.set_exception(ValueError())
self.buffer.set_exception(ValueError())

self.assertRaises(
ValueError, self.loop.run_until_complete, buffer.read())
ValueError, self.loop.run_until_complete, self.buffer.read())

def test_read_exception_with_data(self):
val = object()
buffer = streams.DataQueue(loop=self.loop)
buffer.feed_data(val)
buffer.set_exception(ValueError())
self.buffer.feed_data(val)
self.buffer.set_exception(ValueError())

self.assertIs(val, self.loop.run_until_complete(buffer.read()))
self.assertIs(val, self.loop.run_until_complete(self.buffer.read()))
self.assertRaises(
ValueError, self.loop.run_until_complete, buffer.read())
ValueError, self.loop.run_until_complete, self.buffer.read())

def test_read_exception_on_wait(self):
buffer = streams.DataQueue(loop=self.loop)
read_task = asyncio.Task(buffer.read(), loop=self.loop)
read_task = asyncio.Task(self.buffer.read(), loop=self.loop)
test_utils.run_briefly(self.loop)
self.assertIsInstance(buffer._waiter, asyncio.Future)
self.assertIsInstance(self.buffer._waiter, asyncio.Future)

buffer.feed_eof()
buffer.set_exception(ValueError())
self.buffer.feed_eof()
self.buffer.set_exception(ValueError())

self.assertRaises(
ValueError, self.loop.run_until_complete, read_task)

def test_exception(self):
buffer = streams.DataQueue(loop=self.loop)
self.assertIsNone(buffer.exception())
self.assertIsNone(self.buffer.exception())

exc = ValueError()
buffer.set_exception(exc)
self.assertIs(buffer.exception(), exc)
self.buffer.set_exception(exc)
self.assertIs(self.buffer.exception(), exc)

def test_exception_waiter(self):
buffer = streams.DataQueue(loop=self.loop)

@asyncio.coroutine
def set_err():
buffer.set_exception(ValueError())
self.buffer.set_exception(ValueError())

t1 = asyncio.Task(buffer.read(), loop=self.loop)
t1 = asyncio.Task(self.buffer.read(), loop=self.loop)
t2 = asyncio.Task(set_err(), loop=self.loop)

self.loop.run_until_complete(asyncio.wait([t1, t2], loop=self.loop))
Expand All @@ -603,19 +590,56 @@ def setUp(self):
self.stream = unittest.mock.Mock()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
self.buffer = streams.FlowControlDataQueue(self.stream, loop=self.loop)

def tearDown(self):
self.loop.close()

def test_stream(self):
item = object()
buffer = streams.FlowControlDataQueue(self.stream, loop=self.loop)
read_task = asyncio.Task(buffer.read(), loop=self.loop)
read_task = asyncio.Task(self.buffer.read(), loop=self.loop)

def cb():
buffer.feed_data(item)
self.buffer.feed_data(item)
self.loop.call_soon(cb)
self.loop.run_until_complete(read_task)

self.assertTrue(self.stream.resume_stream.called)
self.assertTrue(self.stream.pause_stream.called)


class ChunksQueueTests(DataQueueTests):

def setUp(self):
super().setUp()
self.buffer = streams.ChunksQueue(loop=self.loop)

def test_read_eof(self):
read_task = asyncio.Task(self.buffer.read(), loop=self.loop)

def cb():
self.buffer.feed_eof()
self.loop.call_soon(cb)

self.loop.run_until_complete(read_task)
self.assertTrue(self.buffer.at_eof())

def test_read_until_eof(self):
item = object()
self.buffer.feed_data(item)
self.buffer.feed_eof()

data = self.loop.run_until_complete(self.buffer.read())
self.assertIs(data, item)

thing = self.loop.run_until_complete(self.buffer.read())
self.assertEqual(thing, b'')
self.assertTrue(self.buffer.at_eof())


class FlowControlChunksQueueTests(FlowControlDataQueueTests):

def setUp(self):
super().setUp()
self.buffer = streams.FlowControlChunksQueue(self.stream,
loop=self.loop)

0 comments on commit ee74c44

Please sign in to comment.