Trailing Data in WebSocket Upgrade Requests #871
-
Hello! First of all, thank you for this amazing OSS! There are two issues in this discussion:
While trying the code from the HTTPCore documentation's Upgrade requests, I noticed an issue where the first message does not arrive. The server and client code reproducing this issue is provided below:
import socket
from wsproto import ConnectionType, WSConnection
from wsproto.events import AcceptConnection, CloseConnection, Message, Request
RECEIVE_BYTES = 4096
def main():
"""
Simple low-level WebSocket server
1. handshake and send the first message to the network
2. sends a second message and a close frame to the network
"""
with socket.create_server(("127.0.0.1", 8000)) as server:
while True:
stream, _ = server.accept()
with stream:
ws = WSConnection(ConnectionType.SERVER)
while True:
in_data = stream.recv(RECEIVE_BYTES)
print("Received {} bytes".format(len(in_data)))
ws.receive_data(in_data)
out_data = b""
for event in ws.events():
if isinstance(event, Request):
print(
"Accepting WebSocket upgrade and sending first message"
)
out_data += ws.send(AcceptConnection())
out_data += ws.send(Message(data="first message"))
if out_data:
print("Sending {} bytes".format(len(out_data)))
stream.sendall(out_data)
print("Sending second message and closing connection")
out_data = b""
out_data += ws.send(Message(data="second message"))
out_data += ws.send(CloseConnection(code=1000))
print("Sending {} bytes".format(len(out_data)))
stream.sendall(out_data)
break
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
import base64
import os
import httpcore
import wsproto
url = "http://127.0.0.1:8000/"
headers = {
b"Connection": b"Upgrade",
b"Upgrade": b"WebSocket",
b"Sec-WebSocket-Key": base64.b64encode(os.urandom(16)),
b"Sec-WebSocket-Version": b"13",
}
with httpcore.ConnectionPool() as http:
with http.stream("GET", url, headers=headers) as response:
if response.status != 101:
raise Exception("Failed to upgrade to websockets", response)
# Get the raw network stream.
network_steam = response.extensions["network_stream"]
# Wait for a response.
ws_connection = wsproto.Connection(wsproto.ConnectionType.CLIENT)
incoming_data = network_steam.read(max_bytes=4096)
ws_connection.receive_data(incoming_data)
for event in ws_connection.events():
if isinstance(event, wsproto.events.TextMessage):
print("Got data:", event.data) This server sends "first message" and "second message" after the handshake, then disconnects. However, it simulates network delays where the accept and the first message bytes arrive simultaneously. When running this client code, only Upon investigation, it seems that there are unprocessed bytes remaining in the h11 object used by HTTPCore. It is necessary to pass The following code resolves this issue:
import base64
import os
import httpcore
import wsproto
url = "http://127.0.0.1:8000/"
headers = {
b"Connection": b"Upgrade",
b"Upgrade": b"WebSocket",
b"Sec-WebSocket-Key": base64.b64encode(os.urandom(16)),
b"Sec-WebSocket-Version": b"13",
}
with httpcore.ConnectionPool() as http:
with http.stream("GET", url, headers=headers) as response:
if response.status != 101:
raise Exception("Failed to upgrade to websockets", response)
# Get the trailing data.
trailing_data, _ = response.stream._stream._connection._h11_state.trailing_data
if trailing_data:
print("trailing_data:", trailing_data)
# Get the raw network stream.
network_steam = response.extensions["network_stream"]
# Wait for a response.
ws_connection = wsproto.Connection(
wsproto.ConnectionType.CLIENT, trailing_data=trailing_data
)
incoming_data = network_steam.read(max_bytes=4096)
ws_connection.receive_data(incoming_data)
for event in ws_connection.events():
if isinstance(event, wsproto.events.TextMessage):
print("Got data:", event.data) While the Upgrade requests code in the documentation provides a simple client example, it may be worthwhile to consider incorporating this fix into the documentation.
I am attempting to implement a WebSocket client using HTTPCore and HTTPX. Therefore, I would like to handle the mentioned
import base64
import os
import httpx
import wsproto
url = "http://127.0.0.1:8000/"
headers = {
b"Connection": b"Upgrade",
b"Upgrade": b"WebSocket",
b"Sec-WebSocket-Key": base64.b64encode(os.urandom(16)),
b"Sec-WebSocket-Version": b"13",
}
with httpx.Client() as client:
with client.stream("GET", url, headers=headers) as response:
if response.status_code != 101:
raise Exception("Failed to upgrade to websockets", response)
# Get the trailing data.
(
trailing_data,
_,
) = (
response.stream._stream._httpcore_stream._status.connection._connection._h11_state.trailing_data
)
if trailing_data:
print("trailing_data:", trailing_data)
# Get the raw network stream.
network_steam = response.extensions["network_stream"]
# Wait for a response.
ws_connection = wsproto.Connection(
wsproto.ConnectionType.CLIENT, trailing_data=trailing_data
)
incoming_data = network_steam.read(max_bytes=4096)
ws_connection.receive_data(incoming_data)
for event in ws_connection.events():
if isinstance(event, wsproto.events.TextMessage):
print("Got data:", event.data) Given that I am accessing numerous private variables, there is a risk that I may lose access to |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Oh interesting yup. The If we wanted to deal with this in public API we'd need to handle the We could then either...
The second one of these is neatest from the user-perspective. We'd probably implement that as a proxy class onto the underlying network stream, that's able to additionally deal with pushing the trailing data in the # This kinda thing...
class UpgradeNetworkStream():
def __init__(self, leading_data, network_stream):
# We need to push any data that's already been read back onto the stream.
self._leading_data = leading_data
self._network_stream = network_stream
def read(...)
if self._leading_data:
initial = self._leading_data
self._leading_data = b''
return initial
else:
return self._network_stream.read() If anyone is interested in this functionality I'd be v happy to help them work through a PR if needed. |
Beta Was this translation helpful? Give feedback.
Oh interesting yup. The
h11
documentation on this is helpful.If we wanted to deal with this in public API we'd need to handle the
CONNECT
andUpgrade
cases in ourHTTP11Connection
/AsyncHTTP11Connection
code.We could then either...
trailing_data
available through a publicly documented response extension.trailing_data
is returned by the network stream, on the first.read()
.The second one of these is neatest from the user-perspective.
We'd probably implement that as a proxy class onto the underlying network stream, that's able to additionally deal with pushing the trailing data in the
h11
event back onto the stream...