Skip to content

Commit

Permalink
Work around line length limit of asyncio.StreamReader (ansible-commun…
Browse files Browse the repository at this point in the history
…ity#53)

* Work around line length limit of asyncio.StreamReader.

* Add limit.

* In case part ends with '\n', already process that.

* Improve tests.

* Revert "In case part ends with '\n', already process that."

This reverts commit f5536e8.
  • Loading branch information
felixfontein authored Apr 14, 2023
1 parent 886bcc7 commit d910be2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
22 changes: 20 additions & 2 deletions src/antsibull_core/subprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import asyncio
import subprocess
from asyncio.exceptions import IncompleteReadError, LimitOverrunError
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, cast

Expand All @@ -29,16 +30,32 @@ async def _stream_log(
name: str, callback: Callable[[str], Any] | None, stream: asyncio.StreamReader,
errors: str,
) -> str:
line = await stream.readline()
# We do not simply use stream.readline() since it has a line length limit.
# While we set this limit already to 8 MB (the default is 64 KB), we still
# want to cover longer lines as well, so we use stream.readuntil('\n')
# and manually handle the case of longer lines.
lines = []
line_parts = []
sep = b'\n'
while True:
try:
line_parts.append(await stream.readuntil(sep))
except IncompleteReadError as e:
line_parts.append(e.partial)
except LimitOverrunError as e:
part = await stream.read(e.consumed)
line_parts.append(part)
if part:
continue

line = b''.join(line_parts)
line_parts.clear()
if not line:
break
text = line.decode('utf-8', errors=errors)
if callback:
callback(f'{name}: {text.strip()}')
lines.append(text)
line = await stream.readline()
return ''.join(lines)


Expand Down Expand Up @@ -81,6 +98,7 @@ async def async_log_run(
logger.debug(f'Running subprocess: {args!r}')
kwargs['stdout'] = asyncio.subprocess.PIPE
kwargs['stderr'] = asyncio.subprocess.PIPE
kwargs['limit'] = 2 ** 23 # Increase line length limit to 8 MB (the default is 64k)
proc = await asyncio.create_subprocess_exec(*args, **kwargs)
stdout, stderr = await asyncio.gather(
# proc.stdout and proc.stderr won't be None with PIPE, hence the cast()
Expand Down
17 changes: 17 additions & 0 deletions tests/units/test_subprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from unittest.mock import MagicMock, call

import pytest

import antsibull_core.subprocess_util


Expand Down Expand Up @@ -48,3 +50,18 @@ def test_log_run_multi() -> None:
assert inp == call('stderr: ' + msg)
assert proc.stdout == '\n'.join(expected_out) + '\n'
assert proc.stderr == '\n'.join(expected_err) + '\n'


@pytest.mark.parametrize('count', [
8 * 1024 * 1024 - 1, # should not trigger long line code
8 * 1024 * 1024, # should not trigger long line code
8 * 1024 * 1024 + 1,
8 * 1024 * 1024 + 10,
9 * 1024 * 1024,
])
def test_log_run_long_line(count: int) -> None:
args = ('sh', '-c', f'dd if=/dev/zero of=/dev/stdout bs={count} count=1 ; echo ; echo foo')
proc = antsibull_core.subprocess_util.log_run(args)
assert proc.args == args
assert proc.returncode == 0
assert proc.stdout == ('\u0000' * count) + '\nfoo\n'

0 comments on commit d910be2

Please sign in to comment.