Skip to content

Commit

Permalink
net: make run_server multi-domain capable
Browse files Browse the repository at this point in the history
  • Loading branch information
bikallem committed Jan 25, 2023
1 parent b711a9b commit c3c41f0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 95 deletions.
36 changes: 18 additions & 18 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,24 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "connecting to %S:%s" host service

let run_server ?(max_connections=Int.max_int) ?shutdown ?(on_error=raise) listening_socket connection_handler =
(if max_connections < 0 then invalid_arg "max_connections");
let run_server ?(max_connections=Int.max_int) ?(additional_domains) ~sw ~on_error listening_socket connection_handler =
if max_connections <= 0 then invalid_arg "max_connections";
let connections = Semaphore.make max_connections in
let shutdown =
match shutdown with
| Some p -> p
| None -> fst (Promise.create ())
in
let connection_handler flow addr =
connection_handler flow addr ;
Semaphore.release connections ;
in
let rec loop sw =
Fiber.first
let rec accept sw : unit =
Fun.protect
(fun () ->
Semaphore.acquire connections ;
accept_fork ~sw listening_socket ~on_error connection_handler ;
loop sw )
(fun () -> Promise.await shutdown )
Semaphore.acquire connections;
accept_fork ~sw ~on_error listening_socket connection_handler;
)
~finally:(fun () -> Semaphore.release connections);
accept sw
in
Switch.run (fun sw -> loop sw)
(match additional_domains with
| Some (domain_mgr, domains) ->
if domains < 0 then invalid_arg "additional_domains";
for _ = 2 to domains do
Fiber.fork ~sw @@ fun () ->
Domain_manager.run domain_mgr (fun () -> Switch.run @@ fun sw -> accept sw);
done;
| None -> ());
Switch.run @@ fun sw -> accept sw
34 changes: 20 additions & 14 deletions lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -209,26 +209,32 @@ val accept_sub :

val run_server :
?max_connections:int ->
?shutdown:unit Promise.t ->
?on_error:(exn -> unit) ->
?additional_domains:(#Domain_manager.t * int) ->
sw:Switch.t ->
on_error:(exn -> unit) ->
#listening_socket ->
connection_handler ->
unit
(** [run_server sock conn_handler] establishes a concurrent socket server [s]. [s] runs on a {e single}
OCaml {!module:Stdlib.Domain}. It listens to incoming client connections as specified by socket [sock].
On a successful establishment of client connection with [s], [conn_handler] is executed. Otherwise [on_error]
is executed.
(** [run_server ~sw ~on_error sock connection_handler] establishes a concurrent socket server [s]. It listens to
incoming client connections as specified by socket [sock]. On a successful establishment of client connection
with [s], [connection_handler] is executed. Otherwise [on_error] is executed.
@param on_error is a connection error handler. By defailt it is set to {!val:raise}.
@param max_connections determines the maximum number of concurrent connections accepted by [s] at any time.
The default is [Int.max_int].
{b Running Parallel Server}
@param shutdown is a promise instance awaiting a [unit] value of [()]. Fulfillment of this promise notifies [s]
to stop accepting incoming client connection requests. If this parameter is not
given and/or is never fulfilled - the default setting - [s] keeps accepting client connections
indefinitely.
By default [s] runs on a {e single} OCaml {!module:Domain}. However, if [additional_domains:(domain_mgr, domains)]
parameter is given, then [s] will run [connection_handler] in parallel over the specified number of [domains]. In
such cases ensure that [connection_handler] only accesses thread-safe values. Addtionally, it is recommended that
[domains] value not exceed the value that is reported by {!val:Domain.recommended_domain_count} minus 1, i.e.
[domains < Domain.recommended_domain_count - 1]. It has been observed that doing so results in a performance
regression.
@raise Invalid_argument if [max_connections < 0]. *)
@param max_connections determines the maximum number of concurrent connections accepted by [s] at any time.
The default is [Int.max_int].
@param additional_domains is [(domain_mgr, domains)] where [domains] denotes the additional domains that [s]
will use to execute [connection_handler].
@param on_error is a connection error handler.
@raise Invalid_argument if [max_connections <= 0].
if [additional_domains = (domain_mgr, domains)] is used and [domains < 0]. *)

(** {2 Datagram Sockets} *)

Expand Down
121 changes: 58 additions & 63 deletions tests/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,79 +406,74 @@ Exception: Failure "Simulated error".
## run_server

```ocaml
# let run_eio_server ~max_conn ~clients env sw =
let shutdown, set_shutdown = Promise.create () in
let run_client id () =
traceln "client: Connecting to server ...";
let flow = Eio.Net.connect ~sw env#net addr in
Eio.Flow.copy_string "Hello from client" flow;
Eio.Flow.shutdown flow `Send;
let msg = read_all flow in
traceln "client received: %S" msg;
if id < clients then () else Promise.resolve set_shutdown ()
in
let connection_handler clock flow _addr =
Fun.protect (fun () ->
let msg = read_all flow in
traceln "Server received: %S" msg;
Eio.Time.sleep clock 0.01
) ~finally:(fun () -> Eio.Flow.copy_string "Bye" flow)
# Eio_mock.Backend.run @@ fun () ->
let mock_env =
let fake_domain_mgr =
object (_ : #Eio.Domain_manager.t)
method run fn =
let cancelled,_ = Eio.Promise.create () in
fn ~cancelled
method run_raw fn = fn ()
end
in
let server_sock = Eio.Net.listen ~reuse_addr:true ~backlog:5 ~sw env#net addr in
let connection_handler = connection_handler env#clock in
let clients = List.init clients (fun id -> run_client (id+1)) in
let server () = Eio.Net.run_server ~max_connections:max_conn ~shutdown server_sock connection_handler in
Fiber.all (server :: clients)
;;
val run_eio_server :
max_conn:int ->
clients:int ->
< clock : #Eio.Time.clock; net : #Eio.Net.t; .. > -> Switch.t -> unit =
<fun>
object
method net = (Eio_mock.Net.make "mock net" :> Eio.Net.t)
method domain_mgr = fake_domain_mgr
end
in
let socket = Eio_mock.Net.listening_socket "tcp/80" in
let flow = Eio_mock.Flow.make "connection" in
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in
Switch.run @@ fun sw ->
Eio_mock.Net.on_accept socket [`Return (flow, addr)];
Eio.Net.run_server ~max_connections:10 ~additional_domains:(mock_env#domain_mgr, 10) ~on_error:raise ~sw
socket (fun _flow _addr -> ());;
+tcp/80: accepted connection from tcp:127.0.0.1:1234
+connection: closed
Exception: Failure "Mock accept handler not configured".
```
max_connections = 1

```ocaml
# Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
run_eio_server ~max_conn:1 ~clients:4 env sw
;;
+client: Connecting to server ...
+client: Connecting to server ...
+client: Connecting to server ...
+client: Connecting to server ...
+Server received: "Hello from client"
+client received: "Bye"
+Server received: "Hello from client"
+client received: "Bye"
+Server received: "Hello from client"
+client received: "Bye"
+Server received: "Hello from client"
+client received: "Bye"
- : unit = ()
```
# let test_run_server env sw =
let connection_handler flow _addr =
traceln "Server accepted connection from client";
Fun.protect
(fun () ->
let msg = read_all flow in
traceln "Server received: %S" msg;
)
~finally:(fun () -> Eio.Flow.copy_string "Bye" flow)
in
let server_sock = Eio.Net.listen ~reuse_addr:true ~backlog:128 ~sw env#net addr in
let server () =
Eio.Net.run_server ~max_connections:10 ~additional_domains:(env#domain_mgr, 10) ~on_error:raise ~sw
server_sock connection_handler
in
server ();;
val test_run_server :
< domain_mgr : #Eio__.Domain_manager.t; net : #Eio.Net.t; .. > ->
Switch.t -> unit = <fun>
max_connections = 5
# let run_client env sw =
let flow = Eio.Net.connect ~sw env#net addr in
Eio.Flow.copy_string "Hello from client" flow;
Eio.Flow.shutdown flow `Send;
let msg = read_all flow in
traceln "client received: %S" msg;;
val run_client : < net : #Eio.Net.t; .. > -> Switch.t -> unit = <fun>
```

```ocaml
# Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
run_eio_server ~max_conn:5 ~clients:5 env sw ;;
+client: Connecting to server ...
+client: Connecting to server ...
+client: Connecting to server ...
+client: Connecting to server ...
+client: Connecting to server ...
+Server received: "Hello from client"
+Server received: "Hello from client"
+Server received: "Hello from client"
+Server received: "Hello from client"
try
Switch.run @@ fun sw ->
Fiber.both
(fun () -> test_run_server env sw)
(fun () -> run_client env sw; raise Graceful_shutdown)
with Graceful_shutdown -> () ;;
+Server accepted connection from client
+Server received: "Hello from client"
+client received: "Bye"
+client received: "Bye"
+client received: "Bye"
+client received: "Bye"
+client received: "Bye"
- : unit = ()
```

Expand Down

0 comments on commit c3c41f0

Please sign in to comment.