From 8cdeba0ed881fdec2b369d584012d4f95242e323 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Fri, 17 Mar 2023 09:09:07 -0700 Subject: [PATCH] fix(rpc): ignore sigpipe on linux (#7319) Signed-off-by: Rudi Grinberg --- CHANGES.md | 4 +- src/csexp_rpc/csexp_rpc.ml | 78 +++++++++++---- src/csexp_rpc/csexp_rpc.mli | 4 + src/csexp_rpc/csexp_rpc_stubs.c | 31 ++++++ src/csexp_rpc/dune | 1 + src/csexp_rpc/io_buffer.ml | 98 +++++++++++++++++++ src/csexp_rpc/io_buffer.mli | 33 +++++++ .../expect-tests/csexp_rpc/io_buffer_tests.ml | 68 +++++++++++++ 8 files changed, 298 insertions(+), 19 deletions(-) create mode 100644 src/csexp_rpc/io_buffer.ml create mode 100644 src/csexp_rpc/io_buffer.mli create mode 100644 test/expect-tests/csexp_rpc/io_buffer_tests.ml diff --git a/CHANGES.md b/CHANGES.md index 7a9559af0fe..55a7cdcfae7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,8 @@ Unreleased ---------- -- RPC: Ignore SIGPIPE when clients suddenly disconnect on OSX (#7299, partially - fixes #6879, @rgrinberg) +- RPC: Ignore SIGPIPE when clients suddenly disconnect (#7299, #7319, fixes + #6879, @rgrinberg) - Always clean up the UI on exit. (#7271, fixes #7142 @rgrinberg) diff --git a/src/csexp_rpc/csexp_rpc.ml b/src/csexp_rpc/csexp_rpc.ml index 6e2b4f8ccda..87937e99f4f 100644 --- a/src/csexp_rpc/csexp_rpc.ml +++ b/src/csexp_rpc/csexp_rpc.ml @@ -128,7 +128,13 @@ module Session = struct type state = | Closed | Open of - { out_channel : out_channel + { out_buf : Io_buffer.t + ; fd : Unix.file_descr + ; (* A mutex for modifying [out_buf]. + + Needed as long as we use threads for async IO. Once we switch to + event based IO, we won't need this mutex anymore *) + write_mutex : Mutex.t ; in_channel : in_channel ; writer : Worker.t ; reader : Worker.t @@ -139,13 +145,22 @@ module Session = struct ; mutable state : state } - let create in_channel out_channel = + let create fd in_channel = let id = Id.gen () in if debug then Log.info [ Pp.textf "RPC created new session %d" (Id.to_int id) ]; let* reader = Worker.create () in let+ writer = Worker.create () in - let state = Open { in_channel; out_channel; reader; writer } in + let state = + Open + { fd + ; in_channel + ; out_buf = Io_buffer.create ~size:8192 + ; write_mutex = Mutex.create () + ; writer + ; reader + } + in { id; state } let string_of_packet = function @@ -159,10 +174,11 @@ module Session = struct let close t = match t.state with | Closed -> () - | Open { in_channel = _; out_channel; reader; writer } -> + | Open { write_mutex = _; fd = _; in_channel; out_buf = _; reader; writer } + -> Worker.stop reader; Worker.stop writer; - close_out_noerr out_channel; + close_in_noerr in_channel; t.state <- Closed let read t = @@ -203,6 +219,30 @@ module Session = struct debug res; res + external send : Unix.file_descr -> Bytes.t -> int -> int -> int = "dune_send" + + let write = if Sys.linux then send else Unix.single_write + + let rec csexp_write_loop fd out_buf token write_mutex = + Mutex.lock write_mutex; + if Io_buffer.flushed out_buf token then Mutex.unlock write_mutex + else + (* We always make sure to try and write the entire buffer. + This should minimize the amount of [write] calls we need + to do *) + let written = + let bytes = Io_buffer.bytes out_buf in + let pos = Io_buffer.pos out_buf in + let len = Io_buffer.length out_buf in + try write fd bytes pos len + with exn -> + Mutex.unlock write_mutex; + reraise exn + in + Io_buffer.read out_buf written; + Mutex.unlock write_mutex; + csexp_write_loop fd out_buf token write_mutex + let write t sexps = if debug then Log.info @@ -218,21 +258,23 @@ module Session = struct | Some sexps -> Code_error.raise "attempting to write to a closed channel" [ ("sexp", Dyn.(list Sexp.to_dyn) sexps) ]) - | Open { writer; out_channel; _ } -> ( + | Open { writer; fd; out_buf; write_mutex; _ } -> ( match sexps with | None -> (try - Unix.shutdown - (Unix.descr_of_out_channel out_channel) - Unix.SHUTDOWN_ALL + (* TODO this hack is temporary until we get rid of dune rpc init *) + Unix.shutdown fd Unix.SHUTDOWN_ALL with Unix.Unix_error (_, _, _) -> ()); close t; Fiber.return () | Some sexps -> ( let+ res = + Mutex.lock write_mutex; + Io_buffer.write_csexps out_buf sexps; + let flush_token = Io_buffer.flush_token out_buf in + Mutex.unlock write_mutex; Worker.task writer ~f:(fun () -> - List.iter sexps ~f:(Csexp.to_channel out_channel); - flush out_channel) + csexp_write_loop fd out_buf flush_token write_mutex) in match res with | Ok () -> () @@ -327,8 +369,7 @@ module Server = struct Transport.accept transport |> Option.map ~f:(fun client -> let in_ = Unix.in_channel_of_descr client in - let out = Unix.out_channel_of_descr client in - (in_, out))) + (client, in_))) in let loop () = let* accept = accept () in @@ -349,8 +390,8 @@ module Server = struct accepted." ]; Fiber.return None - | Ok (Some (in_, out)) -> - let+ session = Session.create in_ out in + | Ok (Some (fd, in_)) -> + let+ session = Session.create fd in_ in Some session in Fiber.Stream.In.create loop @@ -413,9 +454,8 @@ module Client = struct let transport = Transport.create t.sockaddr in t.transport <- Some transport; let client = Transport.connect transport in - let out = Unix.out_channel_of_descr client in let in_ = Unix.in_channel_of_descr client in - (in_, out)) + (client, in_)) in Worker.stop async; match task with @@ -433,3 +473,7 @@ module Client = struct let stop t = Option.iter t.transport ~f:Transport.close end + +module Private = struct + module Io_buffer = Io_buffer +end diff --git a/src/csexp_rpc/csexp_rpc.mli b/src/csexp_rpc/csexp_rpc.mli index cf45f4856a9..792cd8f99f2 100644 --- a/src/csexp_rpc/csexp_rpc.mli +++ b/src/csexp_rpc/csexp_rpc.mli @@ -83,3 +83,7 @@ module Server : sig val listening_address : t -> Unix.sockaddr end + +module Private : sig + module Io_buffer : module type of Io_buffer +end diff --git a/src/csexp_rpc/csexp_rpc_stubs.c b/src/csexp_rpc/csexp_rpc_stubs.c index 600e64baabb..cc66f1d6d6d 100644 --- a/src/csexp_rpc/csexp_rpc_stubs.c +++ b/src/csexp_rpc/csexp_rpc_stubs.c @@ -68,3 +68,34 @@ CAMLprim value dune_pthread_chdir(value unit) { } #endif + +#if __linux__ + +#include +#include +#include +#include + +CAMLprim value dune_send(value v_fd, value v_bytes, value v_pos, value v_len) { + CAMLparam4(v_fd, v_bytes, v_pos, v_len); + int len = Long_val(v_len); + if (len > UNIX_BUFFER_SIZE) { + len = UNIX_BUFFER_SIZE; + } + int pos = Long_val(v_pos); + int fd = Int_val(v_fd); + char iobuf[UNIX_BUFFER_SIZE]; + memmove(iobuf, &Byte(v_bytes, pos), len); + caml_release_runtime_system(); + int ret = send(fd, iobuf, len, MSG_NOSIGNAL); + caml_acquire_runtime_system(); + if (ret == -1) { + uerror("send", Nothing); + }; + CAMLreturn(Val_int(ret)); +} +#else +CAMLprim value dune_send(value v_fd, value v_bytes, value v_pos, value v_len) { + caml_invalid_argument("sendmsg without sigpipe only available on linux"); +} +#endif diff --git a/src/csexp_rpc/dune b/src/csexp_rpc/dune index d770cfb71f6..3a04d57270f 100644 --- a/src/csexp_rpc/dune +++ b/src/csexp_rpc/dune @@ -7,6 +7,7 @@ dune_util csexp fiber + threads.posix (re_export unix)) (foreign_stubs (language c) diff --git a/src/csexp_rpc/io_buffer.ml b/src/csexp_rpc/io_buffer.ml new file mode 100644 index 00000000000..9f0926ace1c --- /dev/null +++ b/src/csexp_rpc/io_buffer.ml @@ -0,0 +1,98 @@ +open Stdune + +type t = + { mutable bytes : Bytes.t (* underlying bytes *) + ; (* the position we can start reading from (until [pos_w]) *) + mutable pos_r : int + ; (* the position we can start writing to (until [Bytes.length bytes - 1]) *) + mutable pos_w : int + ; (* total number of bytes written to this buffer. 2^63 bytes should be + enough for anybody *) + mutable total_written : int + } + +type flush_token = int + +(* We can't use [Out_channel] for writes on Linux because we want to disable + sigpipes. Eventually we'll move to event based IO and ditch the threads, + so we'll need this anyway *) + +let create ~size = + { bytes = Bytes.create size; pos_r = 0; pos_w = 0; total_written = 0 } + +let length t = t.pos_w - t.pos_r + +let max_buffer_size = 65536 + +let maybe_resize_to_fit t write_size = + let buf_len = Bytes.length t.bytes in + let capacity = buf_len - t.pos_w in + if capacity < write_size then ( + let bytes = + let new_size = + let needed = buf_len + write_size - capacity in + max (min max_buffer_size (buf_len * 2)) needed + in + Bytes.create new_size + in + let len = length t in + Bytes.blit ~src:t.bytes ~src_pos:t.pos_r ~dst:bytes ~dst_pos:0 ~len; + t.bytes <- bytes; + t.pos_w <- len; + t.pos_r <- 0) + +let write_char_exn t c = + assert (t.pos_w < Bytes.length t.bytes); + Bytes.set t.bytes t.pos_w c; + t.pos_w <- t.pos_w + 1 + +let write_string_exn t src = + assert (t.pos_w < Bytes.length t.bytes); + let len = String.length src in + Bytes.blit_string ~src ~src_pos:0 ~dst:t.bytes ~dst_pos:t.pos_w ~len; + t.pos_w <- t.pos_w + len + +let read t len = + let pos_r = t.pos_r + len in + if pos_r > t.pos_w then + Code_error.raise "not enough bytes in buffer" + [ ("len", Dyn.int len); ("length", Dyn.int (length t)) ]; + t.pos_r <- pos_r; + t.total_written <- t.total_written + len + +let flush_token t = t.total_written + length t + +let flushed t token = t.total_written >= token + +let write_csexps = + let rec loop t (csexp : Csexp.t) = + match csexp with + | Atom str -> + write_string_exn t (string_of_int (String.length str)); + write_char_exn t ':'; + write_string_exn t str + | List e -> + write_char_exn t '('; + List.iter ~f:(loop t) e; + write_char_exn t ')' + in + fun t csexps -> + let length = + List.fold_left csexps ~init:0 ~f:(fun acc csexp -> + acc + Csexp.serialised_length csexp) + in + maybe_resize_to_fit t length; + List.iter ~f:(loop t) csexps + +let pos t = t.pos_r + +let bytes t = t.bytes + +let to_dyn ({ bytes; pos_r; pos_w; total_written } as t) = + let open Dyn in + record + [ ("total_written", int total_written) + ; ("contents", string (Bytes.sub_string bytes ~pos:pos_r ~len:(length t))) + ; ("pos_w", int pos_w) + ; ("pos_r", int pos_r) + ] diff --git a/src/csexp_rpc/io_buffer.mli b/src/csexp_rpc/io_buffer.mli new file mode 100644 index 00000000000..4ea8d1c38d8 --- /dev/null +++ b/src/csexp_rpc/io_buffer.mli @@ -0,0 +1,33 @@ +(** A resizable IO buffer *) + +type t + +val to_dyn : t -> Dyn.t + +(** create a new io buffer *) +val create : size:int -> t + +(** [read t n] reads [n] bytes *) +val read : t -> int -> unit + +(** [write t csexps] write [csexps] to [t] while resizing [t] as necessary *) +val write_csexps : t -> Csexp.t list -> unit + +(** a flush token is used to determine when a write has been completely flushed *) +type flush_token + +(** [flush_token t] will be flushed whenever everything in [t] will be written *) +val flush_token : t -> flush_token + +(** [flushed t token] will return [true] once all the data that was present in + [t] when [token] was created will be written *) +val flushed : t -> flush_token -> bool + +(** underlying raw buffer *) +val bytes : t -> Bytes.t + +(** [pos t] in [bytes t] to read *) +val pos : t -> int + +(** [length t] the number of bytes to read [bytes t] *) +val length : t -> int diff --git a/test/expect-tests/csexp_rpc/io_buffer_tests.ml b/test/expect-tests/csexp_rpc/io_buffer_tests.ml new file mode 100644 index 00000000000..d6384d6c16a --- /dev/null +++ b/test/expect-tests/csexp_rpc/io_buffer_tests.ml @@ -0,0 +1,68 @@ +open Stdune +module Io_buffer = Csexp_rpc.Private.Io_buffer + +let () = Printexc.record_backtrace false + +let print_dyn x = Io_buffer.to_dyn x |> Dyn.to_string |> print_endline + +let%expect_test "empty buffer is empty" = + print_dyn (Io_buffer.create ~size:4); + [%expect {| { total_written = 0; contents = ""; pos_w = 0; pos_r = 0 } |}] + +let%expect_test "resize" = + let buf = Io_buffer.create ~size:2 in + Io_buffer.write_csexps buf [ Csexp.Atom "xxx" ]; + print_dyn buf; + [%expect + {| + { total_written = 0; contents = "3:xxx"; pos_w = 5; pos_r = 0 } |}]; + Io_buffer.write_csexps buf [ Csexp.Atom "xxxyyy" ]; + print_dyn buf; + [%expect + {| + { total_written = 0; contents = "3:xxx6:xxxyyy"; pos_w = 13; pos_r = 0 } |}] + +let%expect_test "reading" = + let buf = Io_buffer.create ~size:10 in + Io_buffer.write_csexps buf [ Csexp.Atom "abcde" ]; + print_dyn buf; + [%expect + {| + { total_written = 0; contents = "5:abcde"; pos_w = 7; pos_r = 0 } |}]; + Io_buffer.read buf 4; + print_dyn buf; + [%expect + {| + { total_written = 4; contents = "cde"; pos_w = 7; pos_r = 4 } |}]; + Io_buffer.read buf 2; + print_dyn buf; + [%expect + {| + { total_written = 6; contents = "e"; pos_w = 7; pos_r = 6 } |}]; + (* buffer is now empty, this should now error *) + Io_buffer.read buf 2; + print_dyn buf; + [%expect.unreachable] + [@@expect.uncaught_exn + {| + ("(\"not enough bytes in buffer\", { len = 2; length = 1 })") |}] + +let%expect_test "reading" = + let buf = Io_buffer.create ~size:1 in + Io_buffer.write_csexps buf [ Atom "abc" ]; + print_dyn buf; + [%expect + {| + { total_written = 0; contents = "3:abc"; pos_w = 5; pos_r = 0 } |}]; + let flush = Io_buffer.flush_token buf in + printfn "token: %b" (Io_buffer.flushed buf flush); + [%expect {| + token: false |}]; + Io_buffer.read buf 4; + printfn "token: %b" (Io_buffer.flushed buf flush); + [%expect {| + token: false |}]; + Io_buffer.read buf 1; + printfn "token: %b" (Io_buffer.flushed buf flush); + [%expect {| + token: true |}]