diff --git a/lib_eio/net.ml b/lib_eio/net.ml index 4be610ad3..03e98713c 100644 --- a/lib_eio/net.ml +++ b/lib_eio/net.ml @@ -270,7 +270,21 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f = let bt = Printexc.get_raw_backtrace () in Exn.reraise_with_context ex bt "connecting to %S:%s" host service -let run_server ?(max_connections=Int.max_int) ?shutdown ?(on_error=raise) listening_socket connection_handler = + let accept_fork_domain ~on_error (domain_mgr, domains) (t : #listening_socket) connection_handler () = + Switch.run @@ fun sw -> + let flow, addr = accept ~sw t in + Fun.protect + (fun () -> + Semaphore.acquire domains ; + Domain_manager.run domain_mgr (fun () -> + match connection_handler (flow :> stream_socket) addr with + | x -> x + | exception ex -> on_error (Exn.add_context ex "handling connection from %a" Sockaddr.pp addr) + ) + ) + ~finally:(fun () -> Semaphore.release domains) + +let run_server ?(max_connections=Int.max_int) ?shutdown ?(additional_domains) ?(on_error=raise) listening_socket connection_handler = (if max_connections < 0 then invalid_arg "max_connections"); let connections = Semaphore.make max_connections in let shutdown = @@ -282,12 +296,22 @@ let run_server ?(max_connections=Int.max_int) ?shutdown ?(on_error=raise) listen connection_handler flow addr ; Semaphore.release connections ; in - let rec loop sw = + Switch.run @@ fun sw -> + let acceptor : unit -> unit = + match additional_domains with + | Some (domain_mgr, domains) -> + (if domains < 1 then invalid_arg "additional_domains"); + let domains = Semaphore.make domains in + accept_fork_domain (domain_mgr, domains) listening_socket ~on_error connection_handler + | None -> (fun () -> accept_fork ~sw listening_socket ~on_error connection_handler) + in + let rec loop () = Fiber.first (fun () -> - Semaphore.acquire connections ; - accept_fork ~sw listening_socket ~on_error connection_handler ; - loop sw ) + Semaphore.acquire connections; + acceptor (); + loop () + ) (fun () -> Promise.await shutdown ) in - Switch.run (fun sw -> loop sw) + loop () diff --git a/lib_eio/net.mli b/lib_eio/net.mli index 59e10dfb9..0b17578f3 100644 --- a/lib_eio/net.mli +++ b/lib_eio/net.mli @@ -210,25 +210,37 @@ val accept_sub : val run_server : ?max_connections:int -> ?shutdown:unit Promise.t -> + ?additional_domains:(#Domain_manager.t * int) -> ?on_error:(exn -> unit) -> #listening_socket -> connection_handler -> unit -(** [run_server sock conn_handler] establishes a concurrent socket server [s]. [s] runs on a {e single} - OCaml {!module:Stdlib.Domain}. It listens to incoming client connections as specified by socket [sock]. - On a successful establishment of client connection with [s], [conn_handler] is executed. Otherwise [on_error] - is executed. +(** [run_server sock connection_handler] establishes a concurrent socket server [s]. It listens to incoming client + connections as specified by socket [sock]. On a successful establishment of client connection with [s], + [connection_handler] is executed. Otherwise [on_error] is executed. + + {b Running Parallel Server} + + By default [s] runs on a {e single} OCaml {!module:Domain}. However, if [additional_domains:(domain_mgr, domains)] + parameter is given, then [s] will run [connection_handler] in parallel over the specified number of [domains]. In + such cases ensure that [connection_handler] only accesses thread-safe values. Addtionally, it is recommended that + [domains] value not exceed the value that is reported by {!val:Domain.recommended_domain_count} minus 1, i.e. + [domains < Domain.recommended_domain_count - 1]. It has been observed that doing so results in a performance + regression. - @param on_error is a connection error handler. By defailt it is set to {!val:raise}. @param max_connections determines the maximum number of concurrent connections accepted by [s] at any time. The default is [Int.max_int]. - @param shutdown is a promise instance awaiting a [unit] value of [()]. Fulfillment of this promise notifies [s] - to stop accepting incoming client connection requests. If this parameter is not - given and/or is never fulfilled - the default setting - [s] keeps accepting client connections - indefinitely. + to stop accepting incoming client connection requests. If this parameter is not given and/or is + never fulfilled - the default setting - [s] keeps accepting client connections indefinitely. Note + however that fulfilling [shutdown] promise instance doesn't cancel currently executing fibers. + + @param additional_domains is [(domain_mgr, domains)] where [domains] denotes the additional domains that [s] + will to execute [connection_handler]. + @param on_error is a connection error handler. By defailt it is set to {!val:raise}. - @raise Invalid_argument if [max_connections < 0]. *) + @raise Invalid_argument if [max_connections < 0]. + if [additional_domains = (domain_mgr, domains)] is used and [domains < 1]. *) (** {2 Datagram Sockets} *)