cohttp-eio: new modules request, response, chunk
Adds Request abstraction in
Adds Response abstraction in
Adds so that the type can be reused in both and
Adds cohttp_eio.mli and start documenting functions and types.
Adds so that it can be used in both and
Updates with the new types
bikallem committed Feb 2, 2022
1 parent 16872bf commit 82197b3
Showing 12 changed files with 294 additions and 230 deletions.
20 changes: 10 additions & 10 deletions cohttp-eio/examples/
Expand Up @@ -31,23 +31,23 @@ let text =
open Cohttp_eio

let root : Server.handler =
fun (req, _) ->
let uri = Http.Request.resource req |> Uri.of_string in
match Uri.path uri with "/" -> Server.text text | _ -> None
fun req ->
let uri = Request.resource req |> Uri.of_string in
match Uri.path uri with "/" -> Some (Response.text text) | _ -> None

let exit : Server.handler =
fun (req, _) ->
let uri = Http.Request.resource req |> Uri.of_string in
fun req ->
let uri = Request.resource req |> Uri.of_string in
match Uri.path uri with "/exit" -> exit 0 | _ -> None

let html : Server.handler =
fun (req, _) ->
let uri = Http.Request.resource req |> Uri.of_string in
match Uri.path uri with "/html" -> Server.html text | _ -> None
fun req ->
let uri = Request.resource req |> Uri.of_string in
match Uri.path uri with "/html" -> Some (Response.html text) | _ -> None

let app =
let open Cohttp_eio.Server.Infix in
root >>? exit >>? html >>? Cohttp_eio.Server.not_found
let open Server.Infix in
root >>? exit >>? html >>? Server.not_found

let () =
let port = ref 8080 in
5 changes: 1 addition & 4 deletions cohttp-eio/src/ → cohttp-eio/src/
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
type 'a t = [ `String of Cstruct.t | `Chunked of 'a read_chunk | `None ]
and 'a read_chunk = (chunk -> unit) -> [ `Ok of 'a | `Eof ]

and chunk =
type t =
| Chunk of {
data : Cstruct.t;
length : int;
9 changes: 9 additions & 0 deletions cohttp-eio/src/
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type t = {
flow : < Eio.Flow.two_way ; Eio.Flow.close >;
switch : Eio.Std.Switch.t;
addr : Eio.Net.Sockaddr.t;
reader : Reader.t;
oc : Eio.Flow.write;

let close t = Eio.Flow.close t.flow
4 changes: 3 additions & 1 deletion cohttp-eio/src/
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module Body = Body
module Reader = Reader
module Chunk = Chunk
module Request = Request
module Response = Response
module Server = Server
158 changes: 158 additions & 0 deletions cohttp-eio/src/cohttp_eio.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
(** [Reader] is a buffered reader. *)
module Reader : sig
type t

val create : ?buffer_size:int -> -> t
(** [create ?buffer_size reader] creates [t]. [buffer_size] is the maximum
number of bytes [reader] attempts to read in one call. If [buffer_size] is
not given then [default_io_buffer_size] is used. *)

val reader : t ->
(** [reader t] returns the reader used by [t]. *)

val buffer_size : t -> int
(** [bufer_size t] returns the current [reader t] read buffer size. *)

val default_io_buffer_size : int
(** [default_io_buffer_size] is [4096]. *)

(** {1 Low Level API} *)

val consume : t -> int -> unit
(** [consume t n] marks [n] bytes of data as consumed in [t]. *)

val feed_input : t -> Cstruct.t
(** [feed_input t] is [buf]. Attempts to read at most [buffer_size t] bytes
into [t] and returns a view into unconsumed buffer represented by [buf].
[buf.len = 0] if [reader t] has reached end of file. *)

(** {2 High Level API} *)

val read_into : t -> off:int -> len:int -> Cstruct.buffer -> int
(** [read_into t ~off ~len buf] fills [buf] from [off] to length [len] with
data from [t]. *)

val read_char : t -> char
(** [read_char t] reads and returns [char] from [t] or raises [End_of_file] if
[t] has reached end of input.*)

(** [Chunk] encapsulates HTTP/1.1 chunk transfer encoding data structures. *)
module Chunk : sig
type t =
| Chunk of {
data : Cstruct.t;
length : int;
extensions : chunk_extension list;
| Last_chunk of chunk_extension list

and chunk_extension = { name : string; value : string option }

(** [Request] is a HTTP/1.1 request. *)
module Request : sig
type t

(** {1 Request Details} *)

val has_body : t -> [ `No | `Unknown | `Yes ]
val headers : t -> Http.Header.t
val meth : t -> Http.Method.t
val scheme : t -> string option
val resource : t -> string
val version : t -> Http.Version.t
val is_keep_alive : t -> bool

(** {1 Builtin Request Body Readers} *)

val read_fixed : t -> (Cstruct.t, string) result
(** [read_fixed t] is [Ok buf] if "Content-Length" header value exists in [t]
and is a valid integer value. Otherwise it is [Error err] where [err] is
the error text. *)

val read_chunk : t -> (Chunk.t -> unit) -> (t, string) result
(** [read_chunk t f] is [Ok req] if "Transfer-Encoding" header value exists,
is "chunked" in [t] and all chunks in a request are read successfully.
[req] is the updated request as specified by the chunked encoding
algorithm in
Otherwise it is [Error err] where [err] is the error text. *)

(** {1 Custom Request Body Readers} *)

val reader : t -> Reader.t
(** [reader t] returns a [Reader.t] instance. This can be used to create a
custom request body reader. *)

val set_read_complete : t -> unit
(** [set_read_complet t] indicates that request [t] body has been read. *)

(** [Response] is a HTTP/1.1 response. *)
module Response : sig
type t

and body =
[ `String of Cstruct.t
| `Chunked of write_chunk
| `Custom of Faraday.t -> unit
| `None ]

and write_chunk = (Chunk.t -> unit) -> unit

val create : ?headers:Http.Header.t -> ?status:Http.Status.t -> body -> t

(** {1 Response Details} *)

val headers : t -> Http.Header.t
val status : t -> Http.Status.t
val body : t -> body

(** {1 Basic Response} *)

val text : string -> t
(** [text s] is a HTTP/1.1, 200 status response with header "Content-Type:
text/plain". *)

val html : string -> t
(** [html s] is a HTTP/1.1, 200 status response with header "Content-Type:
text/html". *)

val not_found : t
(** [not_found] is a HTTP/1.1, 404 status response. *)

val internal_server_error : t
(** [internal_server_error] is a HTTP/1.1, 500 status response. *)

val bad_request : t
(** [bad_request] is a HTTP/1.1, 400 status response. *)

(** [Server] is a HTTP 1.1 server. *)
module Server : sig
type t
type handler = Request.t -> Response.t option
type middleware = handler -> handler

(** {1 Run Server} *)

val create : ?socket_backlog:int -> ?domains:int -> port:int -> handler -> t
val run : t -> unit
val close : t -> unit

(** {1 Basic Handlers} *)

val not_found : handler

(** {1 Handler Combinators} *)

val join : handler -> handler -> handler
(** [join h1 h2] executes handler [h1]. If response is [None] then it executes
handler [h2]. *)

module Infix : sig
val ( >>? ) : handler -> handler -> handler
(** [h1 >>? h2] is [join h1 h2] *)
8 changes: 4 additions & 4 deletions cohttp-eio/src/
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ let chunk_exts =
let chunk_ext_val = quoted_string <|> token in
(fun name value : Body.chunk_extension -> { name; value })
(fun name value : Chunk.chunk_extension -> { name; value })
(char ';' *> chunk_ext_name)
(optional (char '=' *> chunk_ext_val)))

Expand Down Expand Up @@ -213,11 +213,11 @@ let parse : 'a Angstrom.t -> Reader.t -> 'a =
let rec loop = function
| Unbuffered.Partial k ->
Reader.consume reader k.committed;
let buf, off, len = Reader.feed_input reader in
let buf = Reader.feed_input reader in
let more =
if len = 0 then Unbuffered.Complete else Unbuffered.Incomplete
if buf.len = 0 then Unbuffered.Complete else Unbuffered.Incomplete
loop (k.continue buf ~off ~len more)
loop (k.continue buf.buffer ~len:buf.len more)
| Unbuffered.Done (len, a) ->
Reader.consume reader len;
8 changes: 4 additions & 4 deletions cohttp-eio/src/
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ let feed_input t =
match Eio.Flow.read_into t.reader buf with
| got ->
t.len <- t.len + got;
(t.buf,, t.len)
| exception End_of_file -> (t.buf,, 0)
Cstruct.of_bigarray ~len:t.len t.buf
| exception End_of_file -> Cstruct.empty

let ensure_input t len =
if t.len < len then
let continue = ref true in
while !continue do
let _, _, got = feed_input t in
continue := if got > 0 && t.len < len then true else false
let buf = feed_input t in
continue := if buf.len > 0 && t.len < len then true else false

let read_into t ~off ~len buf =
47 changes: 47 additions & 0 deletions cohttp-eio/src/
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
type t = {
req : Http.Request.t;
reader : Reader.t;
mutable read_complete : bool;

let reader t = t.reader
let has_body t = Http.Request.has_body t.req
let headers t = t.req.headers
let meth t = t.req.meth
let scheme t = t.req.scheme
let resource t = t.req.resource
let version t = t.req.version
let is_keep_alive t = Http.Request.is_keep_alive t.req

let read_fixed t =
match Http.Header.get_transfer_encoding t.req.headers with
| Http.Transfer.Fixed content_length -> (
if t.read_complete then Error "End of file"
let content_length = Int64.to_int content_length in
try Ok Parser.(parse (fixed_body content_length) t.reader)
with e -> Error (Printexc.to_string e))
| _ -> Error "Request is not a fixed content body"

let read_chunk t =
match Http.Header.get_transfer_encoding t.req.headers with
| Http.Transfer.Chunked ->
let total_read = ref 0 in
let rec chunk_loop f =
if t.read_complete then Error "End of file"
let chunk = Parser.(parse (chunk !total_read t.req) t.reader) in
match chunk with
| `Chunk (data, length, extensions) ->
f (Chunk.Chunk { data; length; extensions });
total_read := !total_read + length;
(chunk_loop [@tailcall]) f
| `Last_chunk (extensions, updated_request) ->
t.read_complete <- true;
f (Chunk.Last_chunk extensions);
Ok { t with req = updated_request }
| _ -> fun _ -> Error "Request is not a chunked request"

let set_read_complete t = t.read_complete <- true
39 changes: 39 additions & 0 deletions cohttp-eio/src/
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
type t = { res : Http.Response.t; body : body }

and body =
[ `String of Cstruct.t
| `Chunked of write_chunk
| `Custom of Faraday.t -> unit
| `None ]

and write_chunk = (Chunk.t -> unit) -> unit

let create ?headers ?(status = `OK) body =
let res = Http.Response.make ?headers ~version:`HTTP_1_1 ~status () in
{ res; body }

(* Response Details *)

let headers t = t.res.headers
let status t = t.res.status
let body t = t.body

(* Basic Response *)

let text body =
let headers =
Http.Header.init_with "content-type" "text/plain; charset=UTF-8"
let body = Cstruct.of_string body in
create ~headers (`String body)

let html body =
let headers =
Http.Header.init_with "content-type" "text/html; charset=UTF-8"
let body = Cstruct.of_string body in
create ~headers (`String body)

let not_found = create ~status:`Not_found `None
let internal_server_error = create ~status:`Internal_server_error `None
let bad_request = create ~status:`Bad_request `None

