Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compatible with HTTP [1.1.0 - 1.6.0) #182

Merged
merged 13 commits into from
Nov 25, 2022
6 changes: 3 additions & 3 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "WebSockets"
uuid = "104b5d7c-a370-577a-8038-80a2059c5097"
version = "1.5.10"
version = "1.6.0"

[deps]
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Expand All @@ -10,8 +10,8 @@ Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
HTTP = "0.8, 0.9, 1"
julia = "0.7, 1"
HTTP = "1.1.0, 1.5"
julia = "1.6"
hustf marked this conversation as resolved.
Show resolved Hide resolved

[extras]
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Expand Down
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
environment:
matrix:
- julia_version: 1.0
- julia_version: 1
- julia_version: 1.6.3
- julia_version: 1.8.3
- julia_version: nightly

platform:
Expand Down
51 changes: 42 additions & 9 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@ import HTTP:Response, # For upgrade
setstatus, # For upgrade
startwrite, # For upgrade
startread, # For _openstream
handle, # For _servercoroutine
Connection, # For handshaketest
Transaction,# For handshaketest
URI, # For open
Handler, # For WebSocketHandler, ServerWS, error_test
RequestHandlerFunction, # For ServerWS
StatusError, # For open
Servers, # For further imports
Streams, # For further imports
ConnectionPool # For further imports
import HTTP.ConnectionPool:
getrawstream # For _openstream
getrawstream, # For _openstream
Connection # For handshaketest
import HTTP.Streams:
Stream # For is_upgrade, handshaketest
import HTTP.Servers: MbedTLS # For further imports
Expand Down Expand Up @@ -95,9 +91,9 @@ function open(f::Function, url; verbose=false, subprotocol = "", kw...)
"GET", uri, headers;
reuse_limit=0, verbose=verbose ? 2 : 0, kw...)
catch err
if typeof(err) <: HTTP.IOExtras.IOError
throw(WebSocketClosedError(" while open ws|client: $(string(err.e.msg))"))
elseif typeof(err) <: HTTP.StatusError
if typeof(err) <: HTTP.Exceptions.ConnectError
throw(WebSocketClosedError(" while open ws|client: $(string(err.error))"))
elseif typeof(err) <: HTTP.StatusError
return err.response
else
rethrow(err)
Expand Down Expand Up @@ -270,6 +266,43 @@ target(req::Request) = req.target
subprotocol(req::Request) = header(req, "Sec-WebSocket-Protocol")
origin(req::Request) = header(req, "Origin")

#functions which are not present in HTTP 1.0
function handle end

abstract type Handler end

"""
RequestHandler

Abstract type representing objects that handle `HTTP.Request` and return `HTTP.Response` objects.

See `?HTTP.RequestHandlerFunction` for an example of a concrete implementation.
"""
abstract type RequestHandler <: Handler end

"""
StreamHandler

Abstract type representing objects that handle `HTTP.Stream` objects directly.

See `?HTTP.StreamHandlerFunction` for an example of a concrete implementation.
"""
abstract type StreamHandler <: Handler end

"""
RequestHandlerFunction(f)

A function-wrapper type that is a subtype of `RequestHandler`. Takes a single function as an argument
that should be of the form `f(::HTTP.Request) => HTTP.Response`
"""
struct RequestHandlerFunction{F} <: RequestHandler
func::F # func(req)
end

#handle(h::RequestHandlerFunction, req::Request, args...) = h.func(req, args...)
handle(h::RequestHandlerFunction, stream::HTTP.Streams.Stream, args...) = h.func(stream, args...)


"""
WSHandlerFunction(f::Function) <: Handler
The provided argument should be one of the forms
Expand Down
16 changes: 8 additions & 8 deletions test/client_listen_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,32 @@
@info "Listen: Open, http response, close. Repeat three times. Takes a while."
for i = 1:3
let
server = startserver(url=SURL,port=PORT)
server, servertask = startserver(url=SURL,port=PORT)
status = HTTP.request("GET", "http://$SURL:$PORT").status
println("Status($(i)): $(status)")
@test 200 == status
close(server)
close(server, servertask)
end
end

@info "Listen: Client side initiates message exchange."
let
server = startserver(url=SURL,port=PORT)
server, servertask = startserver(url=SURL,port=PORT)
WebSockets.open(initiatingws, "ws://$SURL:$PORT")
close(server)
close(server, servertask)
end

@info "Listen: Server side initiates message exchange."
let
server = startserver(url=SURL,port=PORT)
server, servertask = startserver(url=SURL,port=PORT)
WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL)
close(server)
close(server, servertask)
end

@info "Listen: Server side initiates message exchange. Close from within server side handler."
let
server = startserver(url=SURL,port=PORT)
server, servertask = startserver(url=SURL,port=PORT)
WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL_CLOSE)
close(server)
close(server, servertask)
end
nothing
17 changes: 9 additions & 8 deletions test/client_serverWS_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,31 @@ end
for i = 1:3
let
ip = parse(IPAddr, SURL)
server = startserver(url=ip)
server, servertask = startserver(url=ip)
@test 200 == WebSockets.HTTP.request("GET", "http://$SURL:$PORT").status
close(server)
close(server, servertask)
end
end

@info "ServerWS: Client side initiates message exchange."
let
server = startserver()
server, servertask = startserver()
WebSockets.open(initiatingws, "ws://$SURL:$PORT")
close(server)
close(server, servertask)
end

@info "ServerWS: Server side initiates message exchange."
let
server = startserver()
server, servertask = startserver()
WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL)
close(server)
close(server, servertask)
end


@info "ServerWS: Server side initiates message exchange. Close from within server side handler."
let
server = startserver()
server, servertask = startserver()
WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL_CLOSE)
close(server)
close(server, servertask)
end
nothing
101 changes: 96 additions & 5 deletions test/client_server_functions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ end
`test_handler` is called by WebSockets inner function `_servercoroutine` for all accepted http requests
that are not upgrades. We don't check what's actually requested.
"""
test_handler(req::HTTP.Request) = HTTP.Response(200, "OK")
function test_handler(stream::HTTP.Streams.Stream)
request = stream.message
request.response = HTTP.Response(200, "OK")
request.response.request = request
write(stream, request.response.body)
end

"""
`test_wshandler` is called by WebSockets inner function
Expand All @@ -43,6 +48,53 @@ function test_wshandler(req::HTTP.Request, ws::WebSocket)
end
end

"""
experiment with nonblocking reads
"""
function readguarded_nonblocking(ws; sleep = 2)
chnl= Channel{Tuple{Vector{UInt8}, Bool}}(1)
# Read, output put to Channel for type stability
function _readinterruptable(c::Channel{Tuple{Vector{UInt8}, Bool}})
try
@error "preparing to readguarded..."
#sleep !=0 && sleep(sleep)
put!(chnl, readguarded(ws))
@error "preparing to readguarded done"
catch err
@debug sprint(showerror, err)
errtyp = typeof(err)
ok = !(errtyp != InterruptException &&
errtyp != Base.IOError &&
errtyp != HTTP.IOExtras.IOError &&
errtyp != Base.BoundsError &&
errtyp != Base.EOFError &&
errtyp != Base.ArgumentError)
# Output a dummy frame that is not a control frame.
put!(chnl, (Vector{UInt8}(), ok))
end
end
# Start reading as a task. Will not return if there is nothing to read
rt = @async _readinterruptable(chnl)
bind(chnl, rt)
yield()
# Define a task for throwing interrupt exception to the (possibly blocked) read task.
# We don't start this task because it would never return
killta = @task try
sleep(30)
@error "will be killing _readinterruptable"
throwto(rt, InterruptException())
catch
end
# We start the killing task. When it is scheduled the second time,
# we pass an InterruptException through the scheduler.
try
schedule(killta, InterruptException(), error = false)
catch
end
# We now have content on chnl, and no additional tasks.
take!(chnl)
end

"""
`echows` is called by
- `test_wshandler` (in which case ws will be a server side websocket)
Expand All @@ -57,22 +109,48 @@ The tests will be captured if the function is run on client side.
If started by the server side, this is called as part of a coroutine.
Therefore, test results will not propagate to the enclosing test scope.
"""

function echows(ws::WebSocket)
#give a chance for server to write anything
#@debug "will wait before starting echows..."
#sleep(5)
@debug "starting echows now"
while isopen(ws)
data, ok = readguarded(ws)
try
@debug "try to peek..."
peek(ws.socket, UInt8)
@debug "got data"
catch err
@debug "nothing to read on the socket yet, $(sprint(showerror, err))"
sleep(1)
continue
end
@debug "reading from socket"
data, ok = readguarded_nonblocking(ws)
#data, ok = readguarded(ws)
if isempty(data)
@error("empty data ok=$(ok)")
else
@error "data\n$(String(copy(data)))"
end
if ok
@debug "writing to socket $(length(copy(data))) bytes of \"$(String(copy(data)))\""
if writeguarded(ws, data)
@test true
else
break
end
else
@debug "failed to read"
if !isopen(ws)
@debug "socket is not open"
break
else
@debug "yet socket is open"
break
end
end
sleep(0.01)
end
end

Expand Down Expand Up @@ -104,9 +182,14 @@ function initiatingws(ws::WebSocket; msglengths = MSGLENGTHS, closebeforeexit =
for slen in msglengths
test_str = Random.randstring(slen)
forcecopy_str = test_str |> collect |> copy |> join
if writeguarded(ws, test_str)
@error "server will write $(slen) bytes \"$(test_str)\""
ok_write = writeguarded(ws, test_str)
@error "written ok = $(ok_write)"
if ok_write
yield()
@error "reading on the server side..."
readback, ok = readguarded(ws)
@error "reading on the server side done \"$(String(copy(readback)))\" ok = $(ok)"
if ok
# if run by the server side, this test won't be captured.
if String(readback) == forcecopy_str
Expand Down Expand Up @@ -134,7 +217,7 @@ function initiatingws(ws::WebSocket; msglengths = MSGLENGTHS, closebeforeexit =
end

test_serverws = WebSockets.ServerWS(
HTTP.RequestHandlerFunction(test_handler),
WebSockets.RequestHandlerFunction(test_handler),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change. Too many things were called prefixed HTTP. I would also think it's OK to export RequestHandlerFunction and similar, and refer to them wihout module prefix here in the tests.

WebSockets.WSHandlerFunction(test_wshandler))

function startserver(serverws=test_serverws;url=SURL, port=PORT, verbose=false)
Expand All @@ -144,5 +227,13 @@ function startserver(serverws=test_serverws;url=SURL, port=PORT, verbose=false)
# capture errors, if any were made during the definition.
@error take!(serverws.out)
end
serverws
serverws, servertask
end

function Base.close(serverws::WebSockets.ServerWS, servertask::Task)
close(serverws)
@info "waiting for servertask to finish"
wait(servertask)
@info "servertask done"
return
end
6 changes: 3 additions & 3 deletions test/client_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ catch err
global caughterr = err
end
@test typeof(caughterr) <: WebSocketClosedError
@test caughterr.message == " while open ws|client: connect: connection refused (ECONNREFUSED)"
@test caughterr.message == " while open ws|client: Base.IOError(\"connect: connection refused (ECONNREFUSED)\", -4078)"

@info "Try open with unknown scheme."
sleep(1)
Expand Down Expand Up @@ -82,7 +82,7 @@ res = WebSockets.open((_)->nothing, URL);

@info "Open with a ws client handler that throws a domain error."
sleep(1)
@test_throws DomainError WebSockets.open((_)->sqrt(-2), URL);
@test_throws HTTP.Exceptions.RequestError WebSockets.open((_)->sqrt(-2), URL);

@info "Stop the server in morse code."
sleep(1)
Expand All @@ -100,7 +100,7 @@ sethd(resp, "Upgrade" => "websocket")
sethd(resp, "Sec-WebSocket-Accept" => WebSockets.generate_websocket_key(key))
sethd(resp, "Connection" => "Upgrade")
servsock = BufferStream()
s = HTTP.Stream(resp, HTTP.Transaction(HTTP.Connection(servsock)))
s = HTTP.Stream(resp, HTTP.Connection(servsock))
write(servsock, resp)
function dummywsh(dws::WebSockets.WebSocket{BufferStream})
close(dws.socket)
Expand Down
8 changes: 4 additions & 4 deletions test/error_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ try
read(ws_client)
end
catch err
@test typeof(err) <: ErrorException
@test err.msg == "Attempt to read from closed WebSocket|client. First isopen(ws), or use readguarded(ws)!"
@test typeof(err) <: HTTP.Exceptions.RequestError
@test err.error == ErrorException("Attempt to read from closed WebSocket|client. First isopen(ws), or use readguarded(ws)!")
end
sleep(1)

Expand All @@ -85,8 +85,8 @@ try
end
catch err
show(err)
@test typeof(err) <: WebSocketClosedError
@test err.message == " while open ws|client: stream is closed or unusable"
@test typeof(err) <: HTTP.Exceptions.RequestError
@test err.error == Base.IOError("stream is closed or unusable", 0)
end

close(s)
Expand Down
Loading