@ -4,4 +4,5 @@ let default req target server () =
Vif.Response.with_string server `OK "Hello World!\n"
let () = @@ fun () -> ~default [] () ;;
let () = @@ fun () ->
|||| ~default [] () ;;
@ -24,7 +24,7 @@ let foo =
let deserialize req server () =
match Vif.Request.to_json req with
match Vif.Request.of_json req with
| Ok (foo : foo) ->
let str =
Fmt.str "username: %s, password: %s, age: %a, address: %a\n"
Normal file
Normal file
@ -0,0 +1,9 @@
#require "vif" ;;
let default req target server () =
let stream = Vif.Stream.Stream.singleton "Hello World!\n" in
Vif.Response.with_stream ~compression:`DEFLATE server `OK stream
let () = @@ fun () ->
|||| ~default [] () ;;
@ -9,6 +9,8 @@
@ -6,7 +6,7 @@ type error = [ `Error of Jsonm.error ]
type eoi = [ `End ]
let error_msgf fmt = Fmt.kstr (fun msg -> Error (`Msg msg)) fmt
let _max_young_size = Sys.word_size / 8 * 256
let _max_young_size = 0x7ff
let rec pp ppf = function
| `Null -> Fmt.const Fmt.string "()" ppf ()
@ -1,4 +1,5 @@
type 'a t = {
module Bqueue = struct
type 'a t = {
buf: 'a option array
; mutable rd_pos: int
; mutable wr_pos: int
@ -6,9 +7,9 @@ type 'a t = {
; mutex: Miou.Mutex.t
; non_empty_or_close: Miou.Condition.t
; non_full: Miou.Condition.t
let create len =
let create len =
buf= Array.make len None
; rd_pos= 0
@ -19,7 +20,7 @@ let create len =
; non_full= Miou.Condition.create ()
let put t value =
let put t value =
Miou.Mutex.protect t.mutex @@ fun () ->
if t.closed then invalid_arg "Stream.put: closed stream";
while (t.wr_pos + 1) mod Array.length t.buf = t.rd_pos do
@ -29,7 +30,7 @@ let put t value =
t.wr_pos <- (t.wr_pos + 1) mod Array.length t.buf;
Miou.Condition.signal t.non_empty_or_close
let get t =
let get t =
Miou.Mutex.protect t.mutex @@ fun () ->
while t.wr_pos = t.rd_pos && not t.closed do
Miou.Condition.wait t.non_empty_or_close t.mutex
@ -43,7 +44,369 @@ let get t =
let close t =
let close t =
Miou.Mutex.protect t.mutex @@ fun () ->
t.closed <- true;
Miou.Condition.signal t.non_empty_or_close
type 'a source =
| Source : {
init: unit -> 's
; pull: 's -> ('a * 's) option
; stop: 's -> unit
-> 'a source
module Source = struct
let file ?offset path =
let buf = Bytes.create 0x7ff in
let init () =
let fd = Unix.openfile path Unix.[ O_RDONLY ] 0o644 in
match offset with
| Some offset ->
let _ = Unix.lseek fd offset Unix.SEEK_SET in
| None -> fd
let stop fd = Unix.close fd in
let pull fd =
let len = fd buf 0 (Bytes.length buf) in
if len == 0 then None else Some (Bytes.sub_string buf 0 len, fd)
(Source { init; stop; pull } : string source)
let dispose (Source src) = src.stop (src.init ())
type ('a, 'r) sink =
| Sink : {
init: unit -> 's
; push: 's -> 'a -> 's
; full: 's -> bool
; stop: 's -> 'r
-> ('a, 'r) sink
module Bstr = struct
type bigstring =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
external get_uint8 : bigstring -> int -> int = "%caml_ba_ref_1"
external get_int32_ne : bigstring -> int -> int32 = "%caml_bigstring_get32"
external unsafe_set_int32_ne : bigstring -> int -> int32 -> unit
= "%caml_bigstring_set32u"
external unsafe_set_uint8 : bigstring -> int -> int -> unit
= "%caml_ba_unsafe_set_1"
let blit_to_bytes bstr ~src_off dst ~dst_off ~len =
len < 0
|| src_off < 0
|| src_off > Bigarray.Array1.dim bstr - len
|| dst_off < 0
|| dst_off > Bytes.length dst - len
then invalid_arg "Bstr.blit_to_bytes";
let len0 = len land 3 in
let len1 = len lsr 2 in
for i = 0 to len1 - 1 do
let i = i * 4 in
let v = get_int32_ne bstr (src_off + i) in
Bytes.set_int32_ne dst (dst_off + i) v
for i = 0 to len0 - 1 do
let i = (len1 * 4) + i in
let v = get_uint8 bstr (src_off + i) in
Bytes.set_uint8 dst (dst_off + i) v
let blit_from_bytes src ~src_off bstr ~dst_off ~len =
len < 0
|| src_off < 0
|| src_off > Bytes.length src - len
|| dst_off < 0
|| dst_off > Bigarray.Array1.dim bstr - len
then invalid_arg "Bstr.blit_from_bytes";
let len0 = len land 3 in
let len1 = len lsr 2 in
for i = 0 to len1 - 1 do
let i = i * 4 in
let v = Bytes.get_int32_ne src (src_off + i) in
unsafe_set_int32_ne bstr (dst_off + i) v
for i = 0 to len0 - 1 do
let i = (len1 * 4) + i in
let v = Bytes.get_uint8 src (src_off + i) in
unsafe_set_uint8 bstr (dst_off + i) v
let sub_string bstr ~off ~len =
let buf = Bytes.create len in
blit_to_bytes bstr ~src_off:off buf ~dst_off:0 ~len;
Bytes.unsafe_to_string buf
let of_string str =
let len = String.length str in
let bstr = Bigarray.(Array1.create char c_layout len) in
blit_from_bytes (Bytes.unsafe_of_string str) ~src_off:0 bstr ~dst_off:0 ~len;
module Sink = struct
module Hdrs = Vif_headers
let response ?headers:(hdrs = []) status server =
match Vif_s.reqd server with
| `V1 reqd ->
let hdrs = Hdrs.add_unless_exists hdrs "transfer-encoding" "chunked" in
let hdrs = H1.Headers.of_list hdrs in
let status =
match status with
| #H1.Status.t as status -> status
| _ -> invalid_arg "Sink.response: invalid status"
let resp = H1.Response.create ~headers:hdrs status in
let init () = H1.Reqd.respond_with_streaming reqd resp in
let push body str =
H1.Body.Writer.write_string body str;
let full _ = false in
(* TODO(dinosaure): content-length? *)
let stop = H1.Body.Writer.close in
(Sink { init; push; full; stop } : (string, unit) sink)
| `V2 reqd ->
let hdrs = Hdrs.add_unless_exists hdrs "transfer-encoding" "chunked" in
let hdrs = H2.Headers.of_list hdrs in
let resp = H2.Response.create ~headers:hdrs status in
let init () = H2.Reqd.respond_with_streaming reqd resp in
let push body str =
H2.Body.Writer.write_string body str;
let full _ = false in
(* TODO(dinosaure): content-length? *)
let stop = H2.Body.Writer.close in
(Sink { init; push; full; stop } : (string, unit) sink)
type value = [ `Null | `Bool of bool | `String of string | `Float of float ]
type await = [ `Await ]
type error = [ `Error of Jsonm.error ]
type eoi = [ `End ]
let errorf fmt = Fmt.kstr (fun msg -> `Error msg) fmt
let json =
let decoder = Jsonm.decoder `Manual in
let rec error (`Error err) =
errorf "Invalid JSON input: %a" Jsonm.pp_error err
and end_of_input `End = errorf "Unexpected end of input"
and arr acc k =
match Jsonm.decode decoder with
| #await -> `Await (fun () -> arr acc k)
| #error as v -> error v
| #eoi as v -> end_of_input v
| `Lexeme `Ae -> k (`A (List.rev acc))
| `Lexeme v -> core (fun v -> arr (v :: acc) k) v
and name n k =
match Jsonm.decode decoder with
| #await -> `Await (fun () -> name n k)
| #error as v -> error v
| #eoi as v -> end_of_input v
| `Lexeme v -> core (fun v -> k (n, v)) v
and obj acc k =
match Jsonm.decode decoder with
| #await -> `Await (fun () -> obj acc k)
| #error as v -> error v
| #eoi as v -> end_of_input v
| `Lexeme `Oe -> k (`O (List.rev acc))
| `Lexeme (`Name n) -> name n (fun v -> obj (v :: acc) k)
| `Lexeme v ->
errorf "Unexpected lexeme: %a (expected key)" Jsonm.pp_lexeme v
and core k = function
| #value as v -> k v
| `Os -> obj [] k
| `As -> arr [] k
| `Ae | `Oe -> errorf "Retrieve invalid end of JSON array/object"
| `Name _ -> errorf "Retrieve invalid JSON key value"
and init () =
match Jsonm.decode decoder with
| #await -> `Await init
| #error as v -> error v
| #eoi -> `Json `Null
| `Lexeme (#Jsonm.lexeme as lexeme) -> core (fun v -> `Json v) lexeme
let push v str =
match v with
| `Await k ->
Jsonm.Manual.src decoder
(Bytes.unsafe_of_string str)
0 (String.length str);
k ()
| `Error _ as err -> err
| `Json _ as value -> value
let full = function `Error _ | `Json _ -> true | _ -> false in
let rec stop = function
| `Await k ->
Jsonm.Manual.src decoder Bytes.empty 0 0;
stop (k ())
| `Error msg -> Error (`Msg msg)
| `Json value -> Ok value
Sink { init; push; full; stop }
type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed]
module Flow = struct
let identity = { flow= }
let compose { flow= f } { flow= g } = { flow= (fun sink -> f (g sink)) }
let ( << ) a b = compose a b
let ( >> ) b a = compose a b
let rec deflate_until_end ~push ~acc encoder o =
match Zl.Def.encode encoder with
| `Await _ -> assert false
| `Flush encoder ->
let len = Bigarray.Array1.dim o - Zl.Def.dst_rem encoder in
let encoder = Zl.Def.dst encoder o 0 (Bigarray.Array1.dim o) in
let acc = push acc (Bstr.sub_string o ~off:0 ~len) in
deflate_until_end ~push ~acc encoder o
| `End encoder ->
let len = Bigarray.Array1.dim o - Zl.Def.dst_rem encoder in
push acc (Bstr.sub_string o ~off:0 ~len)
let rec deflate_until_await ~push ~acc encoder o =
match Zl.Def.encode encoder with
| `Await encoder -> (encoder, o, acc)
| `Flush encoder ->
let len = Bigarray.Array1.dim o - Zl.Def.dst_rem encoder in
let encoder = Zl.Def.dst encoder o 0 (Bigarray.Array1.dim o) in
let acc = push acc (Bstr.sub_string o ~off:0 ~len) in
deflate_until_await ~push ~acc encoder o
| `End _ -> assert false
let deflate ?(q = De.Queue.create 0x100) ?(w = De.Lz77.make_window ~bits:15)
?(level = 4) () =
let flow (Sink k) =
let init () =
let encoder = Zl.Def.encoder ~q ~w ~level `Manual `Manual in
let o = De.bigstring_create 0x7ff in
let encoder = Zl.Def.dst encoder o 0 0x7ff in
let acc = k.init () in
(encoder, o, acc)
let push (encoder, o, acc) = function
| "" -> (encoder, o, acc)
| str ->
let bstr = Bstr.of_string str in
let encoder = Zl.Def.src encoder bstr 0 (String.length str) in
deflate_until_await ~push:k.push ~acc encoder o
let full (_, _, acc) = k.full acc in
let stop (encoder, o, acc) =
let encoder = Zl.Def.src encoder De.bigstring_empty 0 0 in
let acc = deflate_until_end ~push:k.push ~acc encoder o in
k.stop acc
Sink { init; stop; full; push }
{ flow }
external reraise : exn -> 'a = "%reraise"
type 'a stream = { stream: 'r. ('a, 'r) sink -> 'r } [@@unboxed]
module Stream = struct
let run ~from:(Source src) ~via:{ flow } ~into:snk =
let (Sink snk) = flow snk in
let rec loop r s =
match snk.full r with
| true ->
let r' = snk.stop r in
let leftover = Source { src with init= Fun.const s } in
(r', Some leftover)
| false -> begin
match src.pull s with
| Some (x, s') -> loop (snk.push r x) s'
| None ->
src.stop s;
let r' = snk.stop r in
(r', None)
let r0 = snk.init () in
match snk.full r0 with
| true ->
let r' = snk.stop r0 in
(r', Some (Source src))
| false -> (
let s0' = ref None in
let s0 = src.init () in
s0' := Some s0;
loop r0 s0
with exn ->
Option.iter src.stop !s0';
let _ = snk.stop r0 in
reraise exn)
let into sink t = sink
let via { flow } t =
let stream sink = into (flow sink) t in
{ stream }
let from (Source src) =
let stream (Sink k) =
let rec go r s =
if k.full r then k.stop r
match src.pull s with
| None -> src.stop s; k.stop r
| Some (x, s') -> go (k.push r x) s'
let r0 = k.init () in
if k.full r0 then k.stop r0
let s0' = ref None in
let s0 = src.init () in
s0' := Some s0;
go r0 s0
with exn ->
Option.iter src.stop !s0';
let _ = k.stop r0 in
reraise exn
{ stream }
let bracket : init:(unit -> 's) -> stop:('s -> 'r) -> ('s -> 's) -> 'r =
fun ~init ~stop fn ->
let acc = init () in
try stop (fn acc)
with exn ->
let _ = stop acc in
reraise exn
let of_bqueue bq =
let stream (Sink k) =
let rec go r =
if k.full r then r
match Bqueue.get bq with None -> r | Some str -> go (k.push r str)
let stop r = Bqueue.close bq; k.stop r in
bracket go ~init:k.init ~stop
{ stream }
let singleton v =
let stream (Sink k) = k.stop (k.push (k.init ()) v) in
{ stream }
@ -1,6 +1,69 @@
type 'a t
module Bqueue : sig
type 'a t
val create : int -> 'a t
val put : 'a t -> 'a -> unit
val get : 'a t -> 'a option
val close : 'a t -> unit
val create : int -> 'a t
val put : 'a t -> 'a -> unit
val get : 'a t -> 'a option
val close : 'a t -> unit
type 'a source =
| Source : {
init: unit -> 's
; pull: 's -> ('a * 's) option
; stop: 's -> unit
-> 'a source
module Source : sig
val file : ?offset:int -> string -> string source
val dispose : 'a source -> unit
type ('a, 'r) sink =
| Sink : {
init: unit -> 's
; push: 's -> 'a -> 's
; full: 's -> bool
; stop: 's -> 'r
-> ('a, 'r) sink
module Sink : sig
val response :
?headers:Vif_headers.t -> Vif_status.t -> Vif_s.t -> (string, unit) sink
val json : (string, (Json.t, [ `Msg of string ]) result) sink
type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed]
module Flow : sig
val identity : ('a, 'a) flow
val compose : ('a, 'b) flow -> ('b, 'c) flow -> ('a, 'c) flow
val ( >> ) : ('a, 'b) flow -> ('c, 'a) flow -> ('c, 'b) flow
val ( << ) : ('a, 'b) flow -> ('b, 'c) flow -> ('a, 'c) flow
val deflate :
-> ?w:De.Lz77.window
-> ?level:int
-> unit
-> (string, string) flow
type 'a stream = { stream: 'r. ('a, 'r) sink -> 'r } [@@unboxed]
module Stream : sig
val run :
from:'a source
-> via:('a, 'b) flow
-> into:('b, 'c) sink
-> 'c * 'a source option
val into : ('a, 'b) sink -> 'a stream -> 'b
val via : ('a, 'b) flow -> 'a stream -> 'b stream
val from : 'a source -> 'a stream
val of_bqueue : 'a Bqueue.t -> 'a stream
val singleton : 'a -> 'a stream
@ -22,14 +22,7 @@ module U : sig
val eval : ('f, string) t -> 'f
module Stream : sig
type 'a t
val create : int -> 'a t
val put : 'a t -> 'a -> unit
val get : 'a t -> 'a option
val close : 'a t -> unit
module Stream = Stream
module Headers : sig
type t = (string * string) list
@ -69,8 +62,8 @@ module Request : sig
val version : ('c, 'a) t -> int
val headers : ('c, 'a) t -> Headers.t
val to_string : ('c, 'a) t -> string
val to_stream : ('c, 'a) t -> string Stream.t
val to_json : (Content_type.json, 'a) t -> ('a, [> `Msg of string ]) result
val to_stream : ('c, 'a) t -> string
val of_json : (Content_type.json, 'a) t -> ('a, [ `Msg of string ]) result
module R : sig
@ -232,7 +225,12 @@ module Response : sig
type t
val with_stream :
S.t -> ?headers:Headers.t -> Status.t -> (string Stream.t -> unit) -> t
?compression:[ `DEFLATE ]
-> S.t
-> ?headers:Headers.t
-> Status.t
-> string
-> t
val with_string : S.t -> ?headers:Headers.t -> Status.t -> string -> t
@ -51,10 +51,10 @@ let to_string ~schedule ~close body =
Miou.Computation.await_exn c
let to_stream ~schedule ~close body =
let stream = Stream.create 0x7ff in
let rec on_eof () = close body; Stream.close stream
let stream = Stream.Bqueue.create 0x100 in
let rec on_eof () = close body; Stream.Bqueue.close stream
and on_read bstr ~off ~len =
Stream.put stream (Bigstringaf.substring bstr ~off ~len);
Stream.Bqueue.put stream (Bigstringaf.substring bstr ~off ~len);
schedule body ~on_eof ~on_read
schedule body ~on_eof ~on_read;
@ -74,34 +74,31 @@ let to_stream { body; _ } =
| `V1 body ->
to_stream ~schedule:H1.Body.Reader.schedule_read
~close:H1.Body.Reader.close body
|> Stream.Stream.of_bqueue
| `V2 body ->
to_stream ~schedule:H2.Body.Reader.schedule_read
~close:H2.Body.Reader.close body
|> Stream.Stream.of_bqueue
let destruct : type a. a Json_encoding.encoding -> Json.t -> a =
let error_msgf fmt = Format.kasprintf (fun msg -> Error (`Msg msg)) fmt
let to_json : type a.
let of_json : type a.
(Vif_content_type.json, a) t -> (a, [> `Msg of string ]) result = function
| { encoding= Any; _ } as req -> Ok (to_string req)
| { encoding= Json; _ } as req ->
Log.debug (fun m -> m "Parse the body as a JSON data");
let stream = to_stream req in
Log.debug (fun m -> m "Get the stream");
let input () =
let str = Stream.get stream in
Log.debug (fun m -> m "json(%a)" Fmt.(option (fmt "%S")) str);
Json.decode ~input Result.ok
| { encoding= Json_encoding encoding; _ } as req -> (
Stream.Stream.into Stream.Sink.json stream
| { encoding= Json_encoding encoding; _ } as req -> begin
let stream = to_stream req in
let input () = Stream.get stream in
match Json.decode ~input Result.ok with
match Stream.Stream.into Stream.Sink.json stream with
| Error (`Msg _) as err -> err
| Ok (json : Json.t) -> (
| Ok (json : Json.t) -> begin
try Ok (destruct encoding json)
with Json_encoding.Cannot_destruct (_, _) ->
error_msgf "Invalid JSON value"))
error_msgf "Invalid JSON value"
@ -40,35 +40,20 @@ let with_string server ?headers:(hdrs = []) status str =
H2.Reqd.respond_with_string reqd resp str;
let with_stream server ?headers:(hdrs = []) status fn =
match Vif_s.reqd server with
| `V1 reqd ->
let hdrs = Hdrs.add_unless_exists hdrs "transfer-encoding" "chunked" in
let hdrs = H1.Headers.of_list hdrs in
let status =
match status with
| #H1.Status.t as status -> status
| _ -> invalid_arg "Response.with_string: invalid status"
let with_stream ?compression server ?headers status stream =
let headers, stream =
match compression with
| None -> (headers, stream)
| Some `DEFLATE ->
let headers =
match headers with
| None -> Some [ ("content-encoding", "deflate") ]
| Some hdrs ->
Vif_headers.add_unless_exists hdrs "content-encoding" "deflate"
|> Option.some
let resp = H1.Response.create ~headers:hdrs status in
let stream = Stream.create 0x7ff in
let body = H1.Reqd.respond_with_streaming reqd resp in
let res0 = Miou.Ownership.create ~finally:H1.Body.Writer.close body in
let res1 = Miou.Ownership.create ~finally:Stream.close stream in
let rec send stream body res =
match Stream.get stream with
| Some str ->
H1.Body.Writer.write_string body str;
send stream body res
| None -> H1.Body.Writer.close body; Miou.Ownership.disown res
(headers, Stream.Stream.via (Stream.Flow.deflate ()) stream)
let prm0 = Miou.async ~give:[ res0 ] @@ fun () -> send stream body res0 in
let prm1 =
Miou.async ~give:[ res1 ] @@ fun () ->
let () = fn stream in
Stream.close stream; Miou.Ownership.disown res1
Miou.await_all [ prm0; prm1 ]
|> List.iter (function Ok () -> () | Error exn -> raise exn);
let sink = Stream.Sink.response ?headers status server in
Stream.Stream.into sink stream;
| `V2 _ -> assert false
