Calascibetta Romain 2025-01-19 17:12:50 +01:00
parent d86a49c637
commit 565f596b09
14 changed files with 584 additions and 64 deletions

@ -3,4 +3,4 @@
(public_name vif)
(:standard -linkall))
(libraries httpcats tyre))
(libraries hmap mirage-crypto-rng-miou-unix httpcats tyre))

lib/vif/stream.ml Normal file
@ -0,0 +1,49 @@
type 'a t = {
buf: 'a option array
; mutable rd_pos: int
; mutable wr_pos: int
; mutable closed: bool
; mutex: Miou.Mutex.t
; non_empty_or_close: Miou.Condition.t
; non_full: Miou.Condition.t
let create len =
buf= Array.make len None
; rd_pos= 0
; wr_pos= 0
; closed= false
; mutex= Miou.Mutex.create ()
; non_empty_or_close= Miou.Condition.create ()
; non_full= Miou.Condition.create ()
let put t value =
Miou.Mutex.protect t.mutex @@ fun () ->
if t.closed then invalid_arg "Stream.put: closed stream";
while (t.wr_pos + 1) mod Array.length t.buf = t.rd_pos do
Miou.Condition.wait t.non_full t.mutex
t.buf.(t.wr_pos) <- Some value;
t.wr_pos <- (t.wr_pos + 1) mod Array.length t.buf;
Miou.Condition.signal t.non_empty_or_close
let get t =
Miou.Mutex.protect t.mutex @@ fun () ->
while t.wr_pos = t.rd_pos && not t.closed do
Miou.Condition.wait t.non_empty_or_close t.mutex
if t.wr_pos = t.rd_pos && t.closed then None
else begin
let value = t.buf.(t.rd_pos) in
t.buf.(t.rd_pos) <- None;
t.rd_pos <- (t.rd_pos + 1) mod Array.length t.buf;
Miou.Condition.signal t.non_full;
let close t =
Miou.Mutex.protect t.mutex @@ fun () ->
t.closed <- true;
Miou.Condition.signal t.non_empty_or_close

lib/vif/stream.mli Normal file
@ -0,0 +1,6 @@
type 'a t
val create : int -> 'a t
val put : 'a t -> 'a -> unit
val get : 'a t -> 'a option
val close : 'a t -> unit

@ -2,47 +2,96 @@ module U = Vif_u
module R = Vif_r
module C = Vif_c
module Request = struct
type t = H1 of H1.Request.t | H2 of H2.Request.t
let rng_d, rng_s =
let initialize () =
Mirage_crypto_rng_miou_unix.(initialize (module Pfortuna))
let finally = Mirage_crypto_rng_miou_unix.kill in
Vif_d.device ~name:"rng" ~finally Vif_d.[ const () ] initialize
let of_reqd = function
| `V1 reqd -> H1 (H1.Reqd.request reqd)
| `V2 reqd -> H2 (H2.Reqd.request reqd)
module D = struct
include Vif_d
let target = function
| H1 request -> request.H1.Request.target
| H2 request -> request.H2.Request.target
let rng = rng_d
type config =
[ `HTTP_1_1 of H1.Config.t
| `H2 of H2.Config.t
| `Both of H1.Config.t * H2.Config.t ]
module S = struct
include Vif_s
let default _reqd _target = ()
let rng = rng_s
let handler routes _socket reqd =
module Stream = Stream
module Method = Vif_method
module Status = Vif_status
module Headers = Vif_headers
module Request = Vif_request
module Response = Vif_response
type stop = Httpcats.Server.stop
type config = {
[ `HTTP_1_1 of H1.Config.t
| `H2 of H2.Config.t
| `Both of H1.Config.t * H2.Config.t ]
; tls: Tls.Config.server option
; backlog: int
; stop: stop option
; sockaddr: Unix.sockaddr
type devices = [] : devices | ( :: ) : 'a Vif_d.arg * devices -> devices
let rec keval devices k = function
| [] -> k devices
| device :: rest ->
let devices, _ = Vif_d.ctor devices device in
keval devices k rest
let eval devices = keval Vif_d.(Devices Hmap.empty) Fun.id devices
let config ?http ?tls ?(backlog = 64) ?stop sockaddr =
let http =
match http with
| Some (`H1 cfg) -> Some (`HTTP_1_1 cfg)
| Some (`H2 cfg) -> Some (`H2 cfg)
| Some (`Both (h1, h2)) -> Some (`Both (h1, h2))
| None -> None
{ http; tls; backlog; stop; sockaddr }
let stop = Httpcats.Server.stop
let handler ~default routes devices user's_value socket reqd =
let request = Request.of_reqd reqd in
let target = Request.target request in
R.dispatch ~default routes ~target reqd
let server = { Vif_s.reqd; socket; devices } in
R.dispatch ~default routes ~target server request user's_value
let run ?stop ?config ?backlog ?tls_config routes sockaddr =
let run ~cfg ~devices ~default routes user's_value =
let domains = Miou.Domain.available () in
let handler = handler routes in
let fn =
match (config, tls_config) with
| _, Some tls_config ->
fun () ->
Httpcats.Server.with_tls ?stop ?config ?backlog tls_config ~handler
let handle =
fun handler ->
match (cfg.http, cfg.tls) with
| config, Some tls ->
Httpcats.Server.with_tls ?stop:cfg.stop ?config ~backlog:cfg.backlog tls
~handler cfg.sockaddr
| Some (`H2 _), None ->
failwith "Impossible to launch an h2 server without TLS."
| Some (`Both (config, _) | `HTTP_1_1 config), None ->
fun () -> Httpcats.Server.clear ?stop ~config ~handler sockaddr
| None, None -> fun () -> Httpcats.Server.clear ?stop ~handler sockaddr
Httpcats.Server.clear ?stop:cfg.stop ~config ~handler cfg.sockaddr
| None, None -> Httpcats.Server.clear ?stop:cfg.stop ~handler cfg.sockaddr
let prm = Miou.async fn in
let[@warning "-8"] (Vif_d.Devices devices) = eval devices in
let handler = handler ~default routes devices user's_value in
let prm = Miou.async @@ fun () -> handle handler in
if domains > 0 then
Miou.parallel fn (List.init domains (Fun.const ()))
Miou.parallel handle (List.init domains (Fun.const handler))
|> List.iter (function Ok () -> () | Error exn -> raise exn);
Miou.await_exn prm
Miou.await_exn prm;
let finally (Vif_d.Hmap.B (k, v)) =
(Vif_d.Hmap.Key.info k).Vif_d.Device.finally v
Vif_d.Hmap.iter finally devices

@ -69,22 +69,166 @@ module C : sig
/ "heads"
/% Tyre.string
/ "README.md"
/?? nil
let get_readme ?(branch = "main") ~org ~repository () =
C.request ~meth:`GET readme org repository branch
]} *)
type config =
[ `HTTP_1_1 of H1.Config.t
| `H2 of H2.Config.t
| `Both of H1.Config.t * H2.Config.t ]
module D : sig
type 'a arg
type 'a device
type ('f, 'r) args =
| [] : ('r, 'r) args
| ( :: ) : 'a arg * ('f, 'a -> 'r) args -> ('f, 'r) args
val const : 'a -> 'a arg
val map : ('f, 'r) args -> 'f -> 'r arg
val device :
-> finally:('r -> unit)
-> ('f, 'r) args
-> 'f
-> 'r arg * 'r device
(** Some devices. *)
val rng : Mirage_crypto_rng_miou_unix.rng arg
module S : sig
type t
type reqd = [ `V1 of H1.Reqd.t | `V2 of H2.Reqd.t ]
val reqd : t -> reqd
val device : 'a D.device -> t -> 'a
val rng : Mirage_crypto_rng_miou_unix.rng D.device
module Stream : sig
type 'a t
val create : int -> 'a t
val put : 'a t -> 'a -> unit
val get : 'a t -> 'a option
val close : 'a t -> unit
module Method : sig
type t =
| `GET
| `PUT
| `Other of string ]
module Status : sig
type t =
[ `Accepted
| `Bad_gateway
| `Bad_request
| `Code of int
| `Conflict
| `Continue
| `Created
| `Enhance_your_calm
| `Expectation_failed
| `Forbidden
| `Found
| `Gateway_timeout
| `Gone
| `Http_version_not_supported
| `I_m_a_teapot
| `Internal_server_error
| `Length_required
| `Method_not_allowed
| `Misdirected_request
| `Moved_permanently
| `Multiple_choices
| `Network_authentication_required
| `No_content
| `Non_authoritative_information
| `Not_acceptable
| `Not_found
| `Not_implemented
| `Not_modified
| `OK
| `Partial_content
| `Payload_too_large
| `Payment_required
| `Precondition_failed
| `Precondition_required
| `Proxy_authentication_required
| `Range_not_satisfiable
| `Request_header_fields_too_large
| `Request_timeout
| `Reset_content
| `See_other
| `Service_unavailable
| `Switching_protocols
| `Temporary_redirect
| `Too_many_requests
| `Unauthorized
| `Unsupported_media_type
| `Upgrade_required
| `Uri_too_long
| `Use_proxy ]
module Headers : sig
type t = (string * string) list
module Request : sig
type t
val target : t -> string
val meth : t -> Method.t
val version : t -> int
val headers : t -> Headers.t
val to_string : t -> string
val to_stream : t -> string Stream.t
module Response : sig
type t
val with_stream :
S.t -> ?headers:Headers.t -> Status.t -> (string Stream.t -> unit) -> unit
val with_string : S.t -> ?headers:Headers.t -> Status.t -> string -> unit
type config
and stop
val config :
[ `H1 of H1.Config.t
| `H2 of H2.Config.t
| `Both of H1.Config.t * H2.Config.t ]
-> ?tls:Tls.Config.server
-> ?backlog:int
-> ?stop:stop
-> Unix.sockaddr
-> config
val stop : unit -> stop
type devices = [] : devices | ( :: ) : 'a D.arg * devices -> devices
val run :
-> ?config:config
-> ?backlog:int
-> ?tls_config:Tls.Config.server
-> (Httpcats.Server.reqd -> unit) R.route list
-> Unix.sockaddr
-> devices:devices
-> default:(string -> S.t -> Request.t -> 'value -> unit)
-> (S.t -> Request.t -> 'value -> unit) R.route list
-> 'value
-> unit

@ -0,0 +1,64 @@
type t = ..
module Device = struct
type nonrec 'a t = {
name: string
; initialize: t -> t * 'a
; finally: 'a -> unit
let make ~initialize ~finally name = { name; initialize; finally }
module Hmap = Hmap.Make (Device)
type t += Devices : Hmap.t -> t
(* NOTE(dinosaure): or module-rec? *)
type 'a arg =
| Value : 'a Hmap.key -> 'a arg
| Const : 'a -> 'a arg
| Map : ('f, 'a) args * 'f -> 'a arg
and ('fu, 'return) args =
| [] : ('r, 'r) args
| ( :: ) : 'a arg * ('f, 'a -> 'r) args -> ('f, 'r) args
and 'a device = 'a Hmap.key
let map args fn = Map (args, fn)
let const value = Const value
let rec ctor : type a. t -> a arg -> t * a =
fun devices -> function
| Const v -> (devices, v)
| Map (lst, fn) -> keval_args devices (fun devices x -> (devices, x)) lst fn
| Value k -> (
let[@warning "-8"] (Devices m) = devices in
match Hmap.find k m with
| None ->
let devices, device = (Hmap.Key.info k).Device.initialize devices in
let[@warning "-8"] (Devices devices) = devices in
let devices = Hmap.add k device devices in
(Devices devices, device)
| Some device -> (devices, device))
and keval_args : type f x r. t -> (t -> x -> r) -> (f, x) args -> f -> r =
fun devices k -> function
| [] -> k devices
| x :: r ->
let devices, v = ctor devices x in
let k devices fn = k devices (fn v) in
keval_args devices k r
let device : type r.
name:string -> finally:(r -> unit) -> ('f, r) args -> 'f -> r arg * r device
fun ~name ~finally args fn ->
let initialize devices =
let k devices v = (devices, v) in
keval_args devices k args fn
let device = Device.make ~initialize ~finally name in
let key = Hmap.Key.create device in
(Value key, key)

@ -0,0 +1,4 @@
type t = (string * string) list
let add_unless_exists hdrs k v =
if List.mem_assoc k hdrs then hdrs else (k, v) :: hdrs

@ -0,0 +1,10 @@
type t =
| `GET
| `PUT
| `Other of string ]

@ -1,3 +1,7 @@
let src = Logs.Src.create "vif.r"
module Log = (val Logs.src_log src : Logs.LOG)
type 'a atom = 'a Tyre.Internal.wit
let atom re = Tyre.Internal.build re
@ -218,6 +222,8 @@ let rec find_and_trigger : type r.
fun ~original subs -> function
| [] -> assert false
| Re (f, id, ret) :: l ->
Log.debug (fun m -> m "original:%S subs:%a\n%!" original Re.Group.pp subs);
Log.debug (fun m -> m "recognized:%b" (Re.Mark.test subs id));
if Re.Mark.test subs id then extract ~original ret subs f
else find_and_trigger ~original subs l
@ -230,4 +236,7 @@ let dispatch : type r.
let subs = Re.exec re target in
find_and_trigger ~original:target subs wl
with Not_found -> default target
with Not_found ->
let bt = Printexc.get_raw_backtrace () in
Log.warn (fun m -> m "%s" (Printexc.raw_backtrace_to_string bt));
default target

@ -0,0 +1,72 @@
type t = {
request: [ `V1 of H1.Request.t | `V2 of H2.Request.t ]
; body: [ `V1 of H1.Body.Reader.t | `V2 of H2.Body.Reader.t ]
let of_reqd = function
| `V1 reqd ->
let request = `V1 (H1.Reqd.request reqd) in
let body = `V1 (H1.Reqd.request_body reqd) in
{ request; body }
| `V2 reqd ->
let request = `V2 (H2.Reqd.request reqd) in
let body = `V2 (H2.Reqd.request_body reqd) in
{ request; body }
let target { request; _ } =
match request with
| `V1 request -> request.H1.Request.target
| `V2 request -> request.H2.Request.target
let meth { request; _ } =
match request with
| `V1 request -> request.H1.Request.meth
| `V2 request -> request.H2.Request.meth
let version { request; _ } = match request with `V1 _ -> 1 | `V2 _ -> 2
let headers { request; _ } =
match request with
| `V1 request -> H1.Headers.to_list request.H1.Request.headers
| `V2 request -> H2.Headers.to_list request.H2.Request.headers
let to_string ~schedule ~close body =
let buf = Buffer.create 0x7ff in
let c = Miou.Computation.create () in
let rec on_eof () =
close body;
assert (Miou.Computation.try_return c (Buffer.contents buf))
and on_read bstr ~off ~len =
Buffer.add_string buf (Bigstringaf.substring bstr ~off ~len);
schedule body ~on_eof ~on_read
schedule body ~on_eof ~on_read;
Miou.Computation.await_exn c
let to_stream ~schedule ~close body =
let stream = Stream.create 0x7ff in
let rec on_eof () = close body; Stream.close stream
and on_read bstr ~off ~len =
Stream.put stream (Bigstringaf.substring bstr ~off ~len);
schedule body ~on_eof ~on_read
schedule body ~on_eof ~on_read;
let to_string { body; _ } =
match body with
| `V1 body ->
to_string ~schedule:H1.Body.Reader.schedule_read
~close:H1.Body.Reader.close body
| `V2 body ->
to_string ~schedule:H2.Body.Reader.schedule_read
~close:H2.Body.Reader.close body
let to_stream { body; _ } =
match body with
| `V1 body ->
to_stream ~schedule:H1.Body.Reader.schedule_read
~close:H1.Body.Reader.close body
| `V2 body ->
to_stream ~schedule:H2.Body.Reader.schedule_read
~close:H2.Body.Reader.close body

@ -0,0 +1,70 @@
type t = { response: Httpcats.response; compress: [ `GZip | `DEFLATE ] option }
let strf fmt = Format.asprintf fmt
let with_string server ?(headers = []) status str =
match Vif_s.reqd server with
| `V1 reqd ->
let headers =
Vif_headers.add_unless_exists headers "content-length"
(strf "%d" (String.length str))
let headers = H1.Headers.of_list headers in
let status =
match status with
| #H1.Status.t as status -> status
| _ -> invalid_arg "Response.with_string: invalid status"
let resp = H1.Response.create ~headers status in
H1.Reqd.respond_with_string reqd resp str
| `V2 reqd ->
let headers =
Vif_headers.add_unless_exists headers "content-length"
(strf "%d" (String.length str))
let headers = H2.Headers.of_list headers in
let resp = H2.Response.create ~headers status in
H2.Reqd.respond_with_string reqd resp str
let with_stream server ?(headers = []) status fn =
match Vif_s.reqd server with
| `V1 reqd ->
let headers =
Vif_headers.add_unless_exists headers "transfer-encoding" "chunked"
let headers = H1.Headers.of_list headers in
let status =
match status with
| #H1.Status.t as status -> status
| _ -> invalid_arg "Response.with_string: invalid status"
let resp = H1.Response.create ~headers status in
let stream = Stream.create 0x7ff in
let body = H1.Reqd.respond_with_streaming reqd resp in
let res0 =
let finally = H1.Body.Writer.close in
Miou.Ownership.create ~finally body
let res1 =
let finally = Stream.close in
Miou.Ownership.create ~finally stream
let prm0 =
Miou.async ~give:[ res0 ] @@ fun () ->
let rec go () =
match Stream.get stream with
| Some str ->
let () = H1.Body.Writer.write_string body str in
go ()
| None -> H1.Body.Writer.close body; Miou.Ownership.disown res0
go ()
let prm1 =
Miou.async ~give:[ res1 ] @@ fun () ->
let () = fn stream in
Stream.close stream; Miou.Ownership.disown res1
Miou.await_all [ prm0; prm1 ]
|> List.iter (function Ok () -> () | Error exn -> raise exn)
| `V2 _ -> assert false

@ -0,0 +1,12 @@
type t = { reqd: reqd; socket: socket; devices: Vif_d.Hmap.t }
and reqd = Httpcats.Server.reqd
and socket = [ `Tcp of Miou_unix.file_descr | `Tls of Tls_miou_unix.t ]
and 'a device = 'a Vif_d.device
let reqd { reqd; _ } = reqd
let device : type a. a Vif_d.device -> t -> a =
fun k { devices; _ } ->
match Vif_d.Hmap.find k devices with
| Some value -> value
| None -> failwith "Device not found"

@ -0,0 +1 @@
type t = H2.Status.t

@ -1,33 +1,63 @@
#require "miou.unix" ;;
#require "mirage-crypto-rng-miou-unix" ;;
#require "vif" ;;
#require "miou.unix"
open Vif
#require "mirage-crypto-rng-miou-unix"
let[@warning "-8"] index (`V1 reqd : Httpcats.Server.reqd) =
let open H1 in
let text = "Hello from an OCaml script!" in
let headers =
("content-type", "text/plain; charset=utf-8")
; ("content-length", string_of_int (String.length text))
#require "vif"
#require "digestif.c"
#require "base64"
let index server _req () =
Vif.Response.with_string server `OK "Hello from an OCaml script!"
let test arg server _req () =
Vif.Response.with_string server `OK (Fmt.str "%02x\n%!" arg)
let digest server req () =
let ic = Vif.Request.to_stream req in
let rec go ctx =
match Vif.Stream.get ic with
| Some str -> go (Digestif.SHA1.feed_string ctx str)
| None -> Digestif.SHA1.get ctx
let resp = Response.create ~headers `OK in
Reqd.respond_with_string reqd resp text
let hash = go Digestif.SHA1.empty in
let hash = Digestif.SHA1.to_hex hash in
Vif.Response.with_string server `OK hash
let random len server req () =
let buf = Bytes.create 0x7ff in
Vif.Response.with_stream server `OK @@ fun oc ->
let rec go rem =
Format.printf ">>> %d\n%!" rem;
if rem > 0 then begin
let len = Int.min rem (Bytes.length buf) in
Mirage_crypto_rng.generate_into buf len;
let str = Bytes.sub_string buf 0 len in
let str = Base64.encode_exn str in
Vif.Stream.put oc str;
go (rem - len)
go len
let routes =
let open U in
let open R in
[ (rel /?? nil) --> index ]
let open Vif.U in
let open Vif.R in
(rel /?? nil) --> index; (rel / "random" /% Tyre.int /?? nil) --> random
; (rel / "test" /% Tyre.int /?? nil) --> test
; (rel / "digest" /?? nil) --> digest
let default target server req () =
Vif.Response.with_string server `Not_found (Fmt.str "%S not found\n%!" target)
let my_device_as_arg, my_device =
Vif.D.device ~name:"my-device" ~finally:ignore [] ()
let () =
Miou_unix.run @@ fun () ->
let rng = Mirage_crypto_rng_miou_unix.(initialize (module Pfortuna)) in
let sockaddr = Unix.(ADDR_INET (inet_addr_loopback, 8080)) in
Vif.run routes sockaddr;
Mirage_crypto_rng_miou_unix.kill rng
let cfg = Vif.config sockaddr in
Vif.run ~cfg ~default ~devices:Vif.[ D.rng; my_device_as_arg ] routes ()