-
Notifications
You must be signed in to change notification settings - Fork 411
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(rpc): ignore sigpipe on linux (#7319)
Signed-off-by: Rudi Grinberg <me@rgrinberg.com>
- Loading branch information
Showing
8 changed files
with
298 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
dune_util | ||
csexp | ||
fiber | ||
threads.posix | ||
(re_export unix)) | ||
(foreign_stubs | ||
(language c) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
open Stdune | ||
|
||
type t = | ||
{ mutable bytes : Bytes.t (* underlying bytes *) | ||
; (* the position we can start reading from (until [pos_w]) *) | ||
mutable pos_r : int | ||
; (* the position we can start writing to (until [Bytes.length bytes - 1]) *) | ||
mutable pos_w : int | ||
; (* total number of bytes written to this buffer. 2^63 bytes should be | ||
enough for anybody *) | ||
mutable total_written : int | ||
} | ||
|
||
type flush_token = int | ||
|
||
(* We can't use [Out_channel] for writes on Linux because we want to disable | ||
sigpipes. Eventually we'll move to event based IO and ditch the threads, | ||
so we'll need this anyway *) | ||
|
||
let create ~size = | ||
{ bytes = Bytes.create size; pos_r = 0; pos_w = 0; total_written = 0 } | ||
|
||
let length t = t.pos_w - t.pos_r | ||
|
||
let max_buffer_size = 65536 | ||
|
||
let maybe_resize_to_fit t write_size = | ||
let buf_len = Bytes.length t.bytes in | ||
let capacity = buf_len - t.pos_w in | ||
if capacity < write_size then ( | ||
let bytes = | ||
let new_size = | ||
let needed = buf_len + write_size - capacity in | ||
max (min max_buffer_size (buf_len * 2)) needed | ||
in | ||
Bytes.create new_size | ||
in | ||
let len = length t in | ||
Bytes.blit ~src:t.bytes ~src_pos:t.pos_r ~dst:bytes ~dst_pos:0 ~len; | ||
t.bytes <- bytes; | ||
t.pos_w <- len; | ||
t.pos_r <- 0) | ||
|
||
let write_char_exn t c = | ||
assert (t.pos_w < Bytes.length t.bytes); | ||
Bytes.set t.bytes t.pos_w c; | ||
t.pos_w <- t.pos_w + 1 | ||
|
||
let write_string_exn t src = | ||
assert (t.pos_w < Bytes.length t.bytes); | ||
let len = String.length src in | ||
Bytes.blit_string ~src ~src_pos:0 ~dst:t.bytes ~dst_pos:t.pos_w ~len; | ||
t.pos_w <- t.pos_w + len | ||
|
||
let read t len = | ||
let pos_r = t.pos_r + len in | ||
if pos_r > t.pos_w then | ||
Code_error.raise "not enough bytes in buffer" | ||
[ ("len", Dyn.int len); ("length", Dyn.int (length t)) ]; | ||
t.pos_r <- pos_r; | ||
t.total_written <- t.total_written + len | ||
|
||
let flush_token t = t.total_written + length t | ||
|
||
let flushed t token = t.total_written >= token | ||
|
||
let write_csexps = | ||
let rec loop t (csexp : Csexp.t) = | ||
match csexp with | ||
| Atom str -> | ||
write_string_exn t (string_of_int (String.length str)); | ||
write_char_exn t ':'; | ||
write_string_exn t str | ||
| List e -> | ||
write_char_exn t '('; | ||
List.iter ~f:(loop t) e; | ||
write_char_exn t ')' | ||
in | ||
fun t csexps -> | ||
let length = | ||
List.fold_left csexps ~init:0 ~f:(fun acc csexp -> | ||
acc + Csexp.serialised_length csexp) | ||
in | ||
maybe_resize_to_fit t length; | ||
List.iter ~f:(loop t) csexps | ||
|
||
let pos t = t.pos_r | ||
|
||
let bytes t = t.bytes | ||
|
||
let to_dyn ({ bytes; pos_r; pos_w; total_written } as t) = | ||
let open Dyn in | ||
record | ||
[ ("total_written", int total_written) | ||
; ("contents", string (Bytes.sub_string bytes ~pos:pos_r ~len:(length t))) | ||
; ("pos_w", int pos_w) | ||
; ("pos_r", int pos_r) | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
(** A resizable IO buffer *) | ||
|
||
type t | ||
|
||
val to_dyn : t -> Dyn.t | ||
|
||
(** create a new io buffer *) | ||
val create : size:int -> t | ||
|
||
(** [read t n] reads [n] bytes *) | ||
val read : t -> int -> unit | ||
|
||
(** [write t csexps] write [csexps] to [t] while resizing [t] as necessary *) | ||
val write_csexps : t -> Csexp.t list -> unit | ||
|
||
(** a flush token is used to determine when a write has been completely flushed *) | ||
type flush_token | ||
|
||
(** [flush_token t] will be flushed whenever everything in [t] will be written *) | ||
val flush_token : t -> flush_token | ||
|
||
(** [flushed t token] will return [true] once all the data that was present in | ||
[t] when [token] was created will be written *) | ||
val flushed : t -> flush_token -> bool | ||
|
||
(** underlying raw buffer *) | ||
val bytes : t -> Bytes.t | ||
|
||
(** [pos t] in [bytes t] to read *) | ||
val pos : t -> int | ||
|
||
(** [length t] the number of bytes to read [bytes t] *) | ||
val length : t -> int |
Oops, something went wrong.