Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: support iterating stream lines #413

Closed
Hanaasagi opened this issue Sep 30, 2019 · 4 comments · Fixed by #575
Closed

Feature Request: support iterating stream lines #413

Hanaasagi opened this issue Sep 30, 2019 · 4 comments · Fixed by #575

Comments

@Hanaasagi
Copy link
Contributor

I think it is better to implement a api for iterating stream line by line. This scene is very common (e.g., kubernetes watch api). Now we need to create a wrapper for this.

import json
import httpx


class StreamWrapper(object):
    def __init__(self, stream):
        self._stream = stream

    def __iter__(self):
        pending = ""

        for chunk in self._stream:
            chunk = pending + chunk
            lines = chunk.splitlines()

            if chunk and lines and lines[-1] and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = ""

            for line in lines:
                yield line

        if pending:
            yield pending


timeout = httpx.TimeoutConfig(
    connect_timeout=5, read_timeout=None, write_timeout=5
)


resp = httpx.get(
    "http://127.0.0.1:18081/api/v1/watch/namespaces/default/pods",
    stream=True,
    timeout=timeout,
)

for chunk in StreamWrapper(resp.stream_text()):
    print(json.loads(chunk))
@pquentin
Copy link
Contributor

(For your immediate need, is it an option to use the official Kubernetes Python client?)

Note that your wrapping algorithm is accidentally quadratic. Say your stream gives you N bytes, one byte at a time, with only a newline at the end: you'll be calling splitlines() N times, which is undesired because splitlines is O(n) too.

It so happens that transforming a stream into lines is tricky to get right! While the kubernetes watch API is well behaved, your wrapper can't be included as is in httpx.

It also happens that this kind of line splitting is a generic problem that needs to be solved in different places. We're still trying to figure out as a community how to extract this kind of functionality in reusable and composable parts, see python-trio/trio#796 and https://bugs.python.org/issue38242 for detail. In the meantime, even if I don't speak for httpx maintainers, I don't think adding this functionality in httpx itself would be a good move.

@florimondmanca
Copy link
Member

florimondmanca commented Sep 30, 2019

Thanks for sharing this snippet @Hanaasagi. I think you’ll understand though that this particular piece of functionality is out of scope for HTTPX, as supported by the reasons @pquentin pointed out.

Edit: see below for nuancing.

@florimondmanca
Copy link
Member

florimondmanca commented Sep 30, 2019

Woops, clicked the wrong button. Responding to issues on a phone can be tricky!

I was saying: parsing the stream iterator into a stream of lines might be something our users want to do, though. Your use case may not be isolated.

As noted in python-trio/trio#796 though, most async libraries expose a way to do this. For example, StreamReader.readline() in asyncio.

So we could in theory add a new .readline() method to our concurrency backends, accept “lines” (the string) along with True/False on the stream parameter, and then use that method instead of .read() to yield lines instead of raw chunks in the .stream_text() iterator.

I’m really not sure whether this would work well with protocol layer though, and also not sure this is something we want to support inside HTTPX at all.

One immediate sensible point of action out of this would be to document our handling of this use case.

@tomchristie
Copy link
Member

Related to #24, #183. I'm fairly sure myself and @sethmlarson had some more conversation about this too, but I can't dig it out right now?

Not sure what we want the API to look like, but I'd agree in general with the composable implementation (take a stream iterator, return a linewise iterator)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants