From 22973a7420864bc9ac40b3a58944ed2366ee681c Mon Sep 17 00:00:00 2001 From: Arlo White Date: Fri, 27 Sep 2024 12:45:03 +1000 Subject: [PATCH 1/2] prioritized route proof of concept --- serve-test/Project.toml | 4 ++++ serve-test/blocked-serve.jl | 43 +++++++++++++++++++++++++++++++++++++ src/core.jl | 28 ++++++++++++++++++++---- 3 files changed, 71 insertions(+), 4 deletions(-) create mode 100644 serve-test/Project.toml create mode 100644 serve-test/blocked-serve.jl diff --git a/serve-test/Project.toml b/serve-test/Project.toml new file mode 100644 index 00000000..700bde28 --- /dev/null +++ b/serve-test/Project.toml @@ -0,0 +1,4 @@ +[deps] +Infiltrator = "5903a43b-9cc3-4c30-8d17-598619ec4e9b" +Oxygen = "df9a0d86-3283-4920-82dc-4555fc0d1d8b" +Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" diff --git a/serve-test/blocked-serve.jl b/serve-test/blocked-serve.jl new file mode 100644 index 00000000..c0dc2736 --- /dev/null +++ b/serve-test/blocked-serve.jl @@ -0,0 +1,43 @@ +using Base.Threads +using Infiltrator +using Revise +using Oxygen +ENV["JULIA_DEBUG"] = "Oxygen" + +# Run this with -t 1,1 + +println("$(nthreadpools()) threadpools. default=$(nthreads(:default)) interactive=$(nthreads(:interactive))") + +function block(n::Int) + start_time = time() + while (time() - start_time) < n + end + return "Blocking complete after $n seconds" +end + + +@get "/health" function() + @info "entered /health threadid=$(Threads.threadid())" + json(Dict(:status => "healthy")) +end + +@get "/block" function () + block_time = 10 + @info "blocking for $block_time seconds. threadid=$(Threads.threadid())" + # using block because sleep() would yield + block(block_time) + @info "done blocking threadid=$(Threads.threadid())" + text("done") +end + +@get "/throw" function() + throw( + ErrorException("error from route impl") + ) +end + +function start() + serveparallel() +end + +start() diff --git a/src/core.jl b/src/core.jl index 1cc0195d..bacde660 100644 --- a/src/core.jl +++ b/src/core.jl @@ -197,6 +197,8 @@ function stream_handler(middleware::Function) # extract the caller's ip address ip, _ = Sockets.getpeername(stream) # build up a streamhandler to handle our incoming requests + @debug "decorating with $ip" + handle_stream = HTTP.streamhandler(middleware |> decorate_request(ip, stream)) # handle the incoming request return handle_stream(stream) @@ -212,11 +214,27 @@ Inside this task, `@async` is used for cooperative multitasking, allowing the ta """ function parallel_stream_handler(handle_stream::Function) function (stream::HTTP.Stream) - task = Threads.@spawn begin - handle = @async handle_stream(stream) - wait(handle) + # parse request before spawning handler + @debug "parallel_stream_handler threadid=$(Threads.threadid())" + + req::HTTP.Request = stream.message + + # if prioritized path + if (req.target == "/health") + @debug "prioritized request to $(req.target), running in Main interactive thread" + handle_stream(stream) + else + task = Threads.@spawn begin + @debug "within @spawn begin threadid=$(Threads.threadid())" + handle = @async handle_stream(stream) + @debug "wait on @async handle_stream threadid=$(Threads.threadid())" + wait(handle) + @debug "after wait threadid=$(Threads.threadid())" + end + @debug "root wait threadid=$(Threads.threadid())" + wait(task) + @debug "after root wait threadid=$(Threads.threadid())" end - wait(task) end end @@ -227,6 +245,7 @@ users to 'chain' middleware functions like `serve(handler1, handler2, handler3)` application and have them execute in the order they were passed (left to right) for each incoming request """ function setupmiddleware(ctx::Context; middleware::Vector=[], docs::Bool=true, metrics::Bool=true, serialize::Bool=true, catch_errors::Bool=true, show_errors=true)::Function + @debug "setupmiddleware" # determine if we have any special router or route-specific middleware custom_middleware = !isempty(ctx.service.custommiddleware) ? [compose(ctx.service.router, middleware, ctx.service.custommiddleware, ctx.service.middleware_cache)] : reverse(middleware) @@ -337,6 +356,7 @@ Create a default serializer function that handles HTTP requests and formats the function DefaultSerializer(catch_errors::Bool; show_errors::Bool) return function (handle) return function (req::HTTP.Request) + @debug "default serializer impl $(string(req.target)) tid=$(Threads.threadid())" return handlerequest(catch_errors; show_errors) do response = handle(req) format_response!(req, response) From d4bb50194ccda60e406017b7e7efc237d3f102cc Mon Sep 17 00:00:00 2001 From: Arlo White Date: Fri, 27 Sep 2024 12:57:11 +1000 Subject: [PATCH 2/2] serve(is_prioritized) idea --- serve-test/Project.toml | 1 + serve-test/blocked-serve.jl | 5 ++++- src/core.jl | 7 ++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/serve-test/Project.toml b/serve-test/Project.toml index 700bde28..e17812af 100644 --- a/serve-test/Project.toml +++ b/serve-test/Project.toml @@ -1,4 +1,5 @@ [deps] +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" Infiltrator = "5903a43b-9cc3-4c30-8d17-598619ec4e9b" Oxygen = "df9a0d86-3283-4920-82dc-4555fc0d1d8b" Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" diff --git a/serve-test/blocked-serve.jl b/serve-test/blocked-serve.jl index c0dc2736..b7dfc41e 100644 --- a/serve-test/blocked-serve.jl +++ b/serve-test/blocked-serve.jl @@ -2,6 +2,7 @@ using Base.Threads using Infiltrator using Revise using Oxygen +using HTTP ENV["JULIA_DEBUG"] = "Oxygen" # Run this with -t 1,1 @@ -37,7 +38,9 @@ end end function start() - serveparallel() + serveparallel( + is_prioritized = (req::HTTP.Request) -> req.target == "/health" + ) end start() diff --git a/src/core.jl b/src/core.jl index bacde660..df87650a 100644 --- a/src/core.jl +++ b/src/core.jl @@ -87,6 +87,7 @@ function serve(ctx::Context; docs_path = "/docs", schema_path = "/schema", external_url = nothing, + is_prioritized = nothing, kwargs...) :: Server # set the external url if it's passed @@ -121,7 +122,7 @@ function serve(ctx::Context; end # wrap top level handler with parallel handler - handle_stream = parallel_stream_handler(handle_stream) + handle_stream = parallel_stream_handler(handle_stream, is_prioritized) end # The cleanup of resources are put at the topmost level in `methods.jl` @@ -212,7 +213,7 @@ end This function uses `Threads.@spawn` to schedule a new task on any available thread. Inside this task, `@async` is used for cooperative multitasking, allowing the task to yield during I/O operations. """ -function parallel_stream_handler(handle_stream::Function) +function parallel_stream_handler(handle_stream::Function, is_prioritized::Function) function (stream::HTTP.Stream) # parse request before spawning handler @debug "parallel_stream_handler threadid=$(Threads.threadid())" @@ -220,7 +221,7 @@ function parallel_stream_handler(handle_stream::Function) req::HTTP.Request = stream.message # if prioritized path - if (req.target == "/health") + if (is_prioritized(req)) @debug "prioritized request to $(req.target), running in Main interactive thread" handle_stream(stream) else