Skip to content

Commit

Permalink
net: make run_server multi-domain capable
Browse files Browse the repository at this point in the history
  • Loading branch information
bikallem committed Jan 23, 2023
1 parent b711a9b commit c0573c7
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 16 deletions.
36 changes: 30 additions & 6 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,21 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "connecting to %S:%s" host service

let run_server ?(max_connections=Int.max_int) ?shutdown ?(on_error=raise) listening_socket connection_handler =
let accept_fork_domain ~on_error (domain_mgr, domains) (t : #listening_socket) connection_handler () =
Switch.run @@ fun sw ->
let flow, addr = accept ~sw t in
Fun.protect
(fun () ->
Semaphore.acquire domains ;
Domain_manager.run domain_mgr (fun () ->
match connection_handler (flow :> stream_socket) addr with
| x -> x
| exception ex -> on_error (Exn.add_context ex "handling connection from %a" Sockaddr.pp addr)
)
)
~finally:(fun () -> Semaphore.release domains)

let run_server ?(max_connections=Int.max_int) ?shutdown ?(additional_domains) ?(on_error=raise) listening_socket connection_handler =
(if max_connections < 0 then invalid_arg "max_connections");
let connections = Semaphore.make max_connections in
let shutdown =
Expand All @@ -282,12 +296,22 @@ let run_server ?(max_connections=Int.max_int) ?shutdown ?(on_error=raise) listen
connection_handler flow addr ;
Semaphore.release connections ;
in
let rec loop sw =
Switch.run @@ fun sw ->
let acceptor : unit -> unit =
match additional_domains with
| Some (domain_mgr, domains) ->
(if domains < 1 then invalid_arg "additional_domains");
let domains = Semaphore.make domains in
accept_fork_domain (domain_mgr, domains) listening_socket ~on_error connection_handler
| None -> (fun () -> accept_fork ~sw listening_socket ~on_error connection_handler)
in
let rec loop () =
Fiber.first
(fun () ->
Semaphore.acquire connections ;
accept_fork ~sw listening_socket ~on_error connection_handler ;
loop sw )
Semaphore.acquire connections;
acceptor ();
loop ()
)
(fun () -> Promise.await shutdown )
in
Switch.run (fun sw -> loop sw)
loop ()
32 changes: 22 additions & 10 deletions lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,37 @@ val accept_sub :
val run_server :
?max_connections:int ->
?shutdown:unit Promise.t ->
?additional_domains:(#Domain_manager.t * int) ->
?on_error:(exn -> unit) ->
#listening_socket ->
connection_handler ->
unit
(** [run_server sock conn_handler] establishes a concurrent socket server [s]. [s] runs on a {e single}
OCaml {!module:Stdlib.Domain}. It listens to incoming client connections as specified by socket [sock].
On a successful establishment of client connection with [s], [conn_handler] is executed. Otherwise [on_error]
is executed.
(** [run_server sock connection_handler] establishes a concurrent socket server [s]. It listens to incoming client
connections as specified by socket [sock]. On a successful establishment of client connection with [s],
[connection_handler] is executed. Otherwise [on_error] is executed.
{b Running Parallel Server}
By default [s] runs on a {e single} OCaml {!module:Domain}. However, if [additional_domains:(domain_mgr, domains)]
parameter is given, then [s] will run [connection_handler] in parallel over the specified number of [domains]. In
such cases ensure that [connection_handler] only accesses thread-safe values. Addtionally, it is recommended that
[domains] value not exceed the value that is reported by {!val:Domain.recommended_domain_count} minus 1, i.e.
[domains < Domain.recommended_domain_count - 1]. It has been observed that doing so results in a performance
regression.
@param on_error is a connection error handler. By defailt it is set to {!val:raise}.
@param max_connections determines the maximum number of concurrent connections accepted by [s] at any time.
The default is [Int.max_int].
@param shutdown is a promise instance awaiting a [unit] value of [()]. Fulfillment of this promise notifies [s]
to stop accepting incoming client connection requests. If this parameter is not
given and/or is never fulfilled - the default setting - [s] keeps accepting client connections
indefinitely.
to stop accepting incoming client connection requests. If this parameter is not given and/or is
never fulfilled - the default setting - [s] keeps accepting client connections indefinitely. Note
however that fulfilling [shutdown] promise instance doesn't cancel currently executing fibers.
@param additional_domains is [(domain_mgr, domains)] where [domains] denotes the additional domains that [s]
will to execute [connection_handler].
@param on_error is a connection error handler. By defailt it is set to {!val:raise}.
@raise Invalid_argument if [max_connections < 0]. *)
@raise Invalid_argument if [max_connections < 0].
if [additional_domains = (domain_mgr, domains)] is used and [domains < 1]. *)

(** {2 Datagram Sockets} *)

Expand Down

0 comments on commit c0573c7

Please sign in to comment.