diff --git a/Project.toml b/Project.toml index 87a5969..73752c4 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "WebSockets" uuid = "104b5d7c-a370-577a-8038-80a2059c5097" -version = "1.5.10" +version = "1.6.0" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" @@ -10,8 +10,8 @@ Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] -HTTP = "0.8, 0.9, 1" -julia = "0.7, 1" +HTTP = "1.1.0, 1.5" +julia = "1.6" [extras] Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" diff --git a/appveyor.yml b/appveyor.yml index 5150cd5..cffdf7a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,7 +1,7 @@ environment: matrix: - - julia_version: 1.0 - - julia_version: 1 + - julia_version: 1.6.3 + - julia_version: 1.8.3 - julia_version: nightly platform: diff --git a/src/HTTP.jl b/src/HTTP.jl index a3dc195..0f52fba 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -7,18 +7,14 @@ import HTTP:Response, # For upgrade setstatus, # For upgrade startwrite, # For upgrade startread, # For _openstream - handle, # For _servercoroutine - Connection, # For handshaketest - Transaction,# For handshaketest URI, # For open - Handler, # For WebSocketHandler, ServerWS, error_test - RequestHandlerFunction, # For ServerWS StatusError, # For open Servers, # For further imports Streams, # For further imports ConnectionPool # For further imports import HTTP.ConnectionPool: - getrawstream # For _openstream + getrawstream, # For _openstream + Connection # For handshaketest import HTTP.Streams: Stream # For is_upgrade, handshaketest import HTTP.Servers: MbedTLS # For further imports @@ -95,9 +91,9 @@ function open(f::Function, url; verbose=false, subprotocol = "", kw...) "GET", uri, headers; reuse_limit=0, verbose=verbose ? 2 : 0, kw...) catch err - if typeof(err) <: HTTP.IOExtras.IOError - throw(WebSocketClosedError(" while open ws|client: $(string(err.e.msg))")) - elseif typeof(err) <: HTTP.StatusError + if typeof(err) <: HTTP.Exceptions.ConnectError + throw(WebSocketClosedError(" while open ws|client: $(string(err.error))")) + elseif typeof(err) <: HTTP.StatusError return err.response else rethrow(err) @@ -270,6 +266,48 @@ target(req::Request) = req.target subprotocol(req::Request) = header(req, "Sec-WebSocket-Protocol") origin(req::Request) = header(req, "Origin") +#functions which are not present in HTTP 1.0 +function handle end + +abstract type Handler end + +""" + RequestHandler + +Abstract type representing objects that handle `HTTP.Request` and return `HTTP.Response` objects. + +See `?HTTP.RequestHandlerFunction` for an example of a concrete implementation. +""" +abstract type RequestHandler <: Handler end + +""" + StreamHandler + +Abstract type representing objects that handle `HTTP.Stream` objects directly. + +See `?HTTP.StreamHandlerFunction` for an example of a concrete implementation. +""" +abstract type StreamHandler <: Handler end + +""" + RequestHandlerFunction(f) + +A function-wrapper type that is a subtype of `RequestHandler`. Takes a single function as an argument +that should be of the form `f(::HTTP.Request) => HTTP.Response` +""" +struct RequestHandlerFunction{F} <: RequestHandler + func::F # func(req) +end + +#handle(h::RequestHandlerFunction, req::Request, args...) = h.func(req, args...) +function handle(h::RequestHandlerFunction, stream::HTTP.Streams.Stream, args...) + request = stream.message + request.response = h.func(request, args...) + request.response.request = request + write(stream, request.response.body) +end + + """ WSHandlerFunction(f::Function) <: Handler The provided argument should be one of the forms @@ -372,6 +410,7 @@ function serve(serverws::ServerWS, host, port, verbose) handle(serverws.handler, stream) end catch err + @error "WebSocket: _servercoroutine CRASH\n$(sprint(showerror, err))" put!(serverws.out, err) put!(serverws.out, stacktrace(catch_backtrace())) end diff --git a/test/client_listen_test.jl b/test/client_listen_test.jl index d8943e2..87081b8 100644 --- a/test/client_listen_test.jl +++ b/test/client_listen_test.jl @@ -13,32 +13,32 @@ @info "Listen: Open, http response, close. Repeat three times. Takes a while." for i = 1:3 let - server = startserver(url=SURL,port=PORT) + server, servertask = startserver(url=SURL,port=PORT) status = HTTP.request("GET", "http://$SURL:$PORT").status println("Status($(i)): $(status)") @test 200 == status - close(server) + close(server, servertask) end end @info "Listen: Client side initiates message exchange." let - server = startserver(url=SURL,port=PORT) + server, servertask = startserver(url=SURL,port=PORT) WebSockets.open(initiatingws, "ws://$SURL:$PORT") - close(server) + close(server, servertask) end @info "Listen: Server side initiates message exchange." let - server = startserver(url=SURL,port=PORT) + server, servertask = startserver(url=SURL,port=PORT) WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL) - close(server) + close(server, servertask) end @info "Listen: Server side initiates message exchange. Close from within server side handler." let - server = startserver(url=SURL,port=PORT) + server, servertask = startserver(url=SURL,port=PORT) WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL_CLOSE) - close(server) + close(server, servertask) end nothing diff --git a/test/client_serverWS_test.jl b/test/client_serverWS_test.jl index 567d9e0..95938d3 100644 --- a/test/client_serverWS_test.jl +++ b/test/client_serverWS_test.jl @@ -38,30 +38,31 @@ end for i = 1:3 let ip = parse(IPAddr, SURL) - server = startserver(url=ip) + server, servertask = startserver(url=ip) @test 200 == WebSockets.HTTP.request("GET", "http://$SURL:$PORT").status - close(server) + close(server, servertask) end end @info "ServerWS: Client side initiates message exchange." let - server = startserver() + server, servertask = startserver() WebSockets.open(initiatingws, "ws://$SURL:$PORT") - close(server) + close(server, servertask) end @info "ServerWS: Server side initiates message exchange." let - server = startserver() + server, servertask = startserver() WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL) - close(server) + close(server, servertask) end + @info "ServerWS: Server side initiates message exchange. Close from within server side handler." let - server = startserver() + server, servertask = startserver() WebSockets.open(echows, "ws://$SURL:$PORT", subprotocol = SUBPROTOCOL_CLOSE) - close(server) + close(server, servertask) end nothing diff --git a/test/client_server_functions.jl b/test/client_server_functions.jl index 0efe4ca..7aba8a7 100644 --- a/test/client_server_functions.jl +++ b/test/client_server_functions.jl @@ -43,6 +43,53 @@ function test_wshandler(req::HTTP.Request, ws::WebSocket) end end +""" +experiment with nonblocking reads +""" +function readguarded_nonblocking(ws; sleep = 2) + chnl= Channel{Tuple{Vector{UInt8}, Bool}}(1) + # Read, output put to Channel for type stability + function _readinterruptable(c::Channel{Tuple{Vector{UInt8}, Bool}}) + try + @error "preparing to readguarded..." + #sleep !=0 && sleep(sleep) + put!(chnl, readguarded(ws)) + @error "preparing to readguarded done" + catch err + @debug sprint(showerror, err) + errtyp = typeof(err) + ok = !(errtyp != InterruptException && + errtyp != Base.IOError && + errtyp != HTTP.IOExtras.IOError && + errtyp != Base.BoundsError && + errtyp != Base.EOFError && + errtyp != Base.ArgumentError) + # Output a dummy frame that is not a control frame. + put!(chnl, (Vector{UInt8}(), ok)) + end + end + # Start reading as a task. Will not return if there is nothing to read + rt = @async _readinterruptable(chnl) + bind(chnl, rt) + yield() + # Define a task for throwing interrupt exception to the (possibly blocked) read task. + # We don't start this task because it would never return + killta = @task try + sleep(30) + @error "will be killing _readinterruptable" + throwto(rt, InterruptException()) + catch + end + # We start the killing task. When it is scheduled the second time, + # we pass an InterruptException through the scheduler. + try + schedule(killta, InterruptException(), error = false) + catch + end + # We now have content on chnl, and no additional tasks. + take!(chnl) +end + """ `echows` is called by - `test_wshandler` (in which case ws will be a server side websocket) @@ -57,22 +104,48 @@ The tests will be captured if the function is run on client side. If started by the server side, this is called as part of a coroutine. Therefore, test results will not propagate to the enclosing test scope. """ + function echows(ws::WebSocket) + #give a chance for server to write anything + #@debug "will wait before starting echows..." + #sleep(5) + @debug "starting echows now" while isopen(ws) - data, ok = readguarded(ws) + try + @debug "try to peek..." + peek(ws.socket, UInt8) + @debug "got data" + catch err + @debug "nothing to read on the socket yet, $(sprint(showerror, err))" + sleep(1) + continue + end + @debug "reading from socket" + data, ok = readguarded_nonblocking(ws) + #data, ok = readguarded(ws) + if isempty(data) + @error("empty data ok=$(ok)") + else + @error "data\n$(String(copy(data)))" + end if ok + @debug "writing to socket $(length(copy(data))) bytes of \"$(String(copy(data)))\"" if writeguarded(ws, data) @test true else break end else + @debug "failed to read" if !isopen(ws) + @debug "socket is not open" break else + @debug "yet socket is open" break end end + sleep(0.01) end end @@ -104,9 +177,14 @@ function initiatingws(ws::WebSocket; msglengths = MSGLENGTHS, closebeforeexit = for slen in msglengths test_str = Random.randstring(slen) forcecopy_str = test_str |> collect |> copy |> join - if writeguarded(ws, test_str) + @error "server will write $(slen) bytes \"$(test_str)\"" + ok_write = writeguarded(ws, test_str) + @error "written ok = $(ok_write)" + if ok_write yield() + @error "reading on the server side..." readback, ok = readguarded(ws) + @error "reading on the server side done \"$(String(copy(readback)))\" ok = $(ok)" if ok # if run by the server side, this test won't be captured. if String(readback) == forcecopy_str @@ -134,7 +212,7 @@ function initiatingws(ws::WebSocket; msglengths = MSGLENGTHS, closebeforeexit = end test_serverws = WebSockets.ServerWS( - HTTP.RequestHandlerFunction(test_handler), + WebSockets.RequestHandlerFunction(test_handler), WebSockets.WSHandlerFunction(test_wshandler)) function startserver(serverws=test_serverws;url=SURL, port=PORT, verbose=false) @@ -144,5 +222,13 @@ function startserver(serverws=test_serverws;url=SURL, port=PORT, verbose=false) # capture errors, if any were made during the definition. @error take!(serverws.out) end - serverws + serverws, servertask end + +function Base.close(serverws::WebSockets.ServerWS, servertask::Task) + close(serverws) + @info "waiting for servertask to finish" + wait(servertask) + @info "servertask done" + return +end \ No newline at end of file diff --git a/test/client_test.jl b/test/client_test.jl index d27df44..f89703a 100644 --- a/test/client_test.jl +++ b/test/client_test.jl @@ -43,7 +43,7 @@ catch err global caughterr = err end @test typeof(caughterr) <: WebSocketClosedError -@test caughterr.message == " while open ws|client: connect: connection refused (ECONNREFUSED)" +@test caughterr.message == " while open ws|client: Base.IOError(\"connect: connection refused (ECONNREFUSED)\", -4078)" @info "Try open with unknown scheme." sleep(1) @@ -82,7 +82,7 @@ res = WebSockets.open((_)->nothing, URL); @info "Open with a ws client handler that throws a domain error." sleep(1) -@test_throws DomainError WebSockets.open((_)->sqrt(-2), URL); +@test_throws HTTP.Exceptions.RequestError WebSockets.open((_)->sqrt(-2), URL); @info "Stop the server in morse code." sleep(1) @@ -100,7 +100,7 @@ sethd(resp, "Upgrade" => "websocket") sethd(resp, "Sec-WebSocket-Accept" => WebSockets.generate_websocket_key(key)) sethd(resp, "Connection" => "Upgrade") servsock = BufferStream() -s = HTTP.Stream(resp, HTTP.Transaction(HTTP.Connection(servsock))) +s = HTTP.Stream(resp, HTTP.Connection(servsock)) write(servsock, resp) function dummywsh(dws::WebSockets.WebSocket{BufferStream}) close(dws.socket) diff --git a/test/error_test.jl b/test/error_test.jl index a754c9f..5659987 100644 --- a/test/error_test.jl +++ b/test/error_test.jl @@ -69,8 +69,8 @@ try read(ws_client) end catch err - @test typeof(err) <: ErrorException - @test err.msg == "Attempt to read from closed WebSocket|client. First isopen(ws), or use readguarded(ws)!" + @test typeof(err) <: HTTP.Exceptions.RequestError + @test err.error == ErrorException("Attempt to read from closed WebSocket|client. First isopen(ws), or use readguarded(ws)!") end sleep(1) @@ -85,8 +85,8 @@ try end catch err show(err) - @test typeof(err) <: WebSocketClosedError - @test err.message == " while open ws|client: stream is closed or unusable" + @test typeof(err) <: HTTP.Exceptions.RequestError + @test err.error == Base.IOError("stream is closed or unusable", 0) end close(s) diff --git a/test/handshaketest_functions.jl b/test/handshaketest_functions.jl index f00877f..72709c2 100644 --- a/test/handshaketest_functions.jl +++ b/test/handshaketest_functions.jl @@ -28,8 +28,7 @@ end function handshakeresponse(request::HTTP.Request) buf = BufferStream() c = HTTP.Connection(buf) - t = HTTP.Transaction(c) - s = HTTP.Stream(request, t) + s = HTTP.Stream(request, c) WebSockets.upgrade(dummywshandler, s) close(buf) takefirstline(buf) diff --git a/test/show_test.jl b/test/show_test.jl index a94d952..7da3f82 100644 --- a/test/show_test.jl +++ b/test/show_test.jl @@ -6,7 +6,7 @@ end let kws = [], msgs =[] ds = DummyStream(IOBuffer(), 0, 0) - for s = 0:9, h in [Base.C_NULL, Ptr{UInt64}(3)] + for s = 0:9, h in [Base.C_NULL, Ptr{Cvoid}(3)] ds.handle = h ds.status = s kwarg, msg = WebSockets._uv_status_tuple(ds) @@ -50,7 +50,7 @@ rm("temptemp") output = String(take!(io)) @test output == "✓" -ds = DummyStream(IOBuffer(), 0, 0x00000001) +ds = DummyStream(IOBuffer(), 0, Ptr{Cvoid}(1)) io = IOBuffer() WebSockets._show(io, ds) # The handle type depends on operating system, skip that @@ -222,7 +222,7 @@ let chnlout, sws, sws1, sws2 io = IOBuffer() show(io, sws) output = String(take!(io)) - @test output == "WebSockets.ServerWS(handler=h(r), wshandler=w(s)).out:Channel{Any}(sz_max:2,sz_curr:2) " + @test output == "WebSockets.ServerWS(handler=h(r), wshandler=w(s)).out:Channel{Any}(2) " sws1 = WebSockets.ServerWS(h, w) sws2 = WebSockets.ServerWS(h, w)