diff --git a/examples/01-hello/main.ml b/examples/01-hello/main.ml index b77aa65..c62c275 100644 --- a/examples/01-hello/main.ml +++ b/examples/01-hello/main.ml @@ -4,4 +4,5 @@ let default req target server () = Vif.Response.with_string server `OK "Hello World!\n" ;; -let () = Miou_unix.run @@ fun () -> Vif.run ~default [] () ;; +let () = Miou_unix.run @@ fun () -> + Vif.run ~default [] () ;; diff --git a/examples/05-json/main.ml b/examples/05-json/main.ml index 2bb0125..a294292 100644 --- a/examples/05-json/main.ml +++ b/examples/05-json/main.ml @@ -24,7 +24,7 @@ let foo = ;; let deserialize req server () = - match Vif.Request.to_json req with + match Vif.Request.of_json req with | Ok (foo : foo) -> let str = Fmt.str "username: %s, password: %s, age: %a, address: %a\n" diff --git a/examples/07-deflate/main.ml b/examples/07-deflate/main.ml new file mode 100644 index 0000000..a15f346 --- /dev/null +++ b/examples/07-deflate/main.ml @@ -0,0 +1,9 @@ +#require "vif" ;; + +let default req target server () = + let stream = Vif.Stream.Stream.singleton "Hello World!\n" in + Vif.Response.with_stream ~compression:`DEFLATE server `OK stream +;; + +let () = Miou_unix.run @@ fun () -> + Vif.run ~default [] () ;; diff --git a/lib/vif/dune b/lib/vif/dune index 6afe609..20bc23c 100644 --- a/lib/vif/dune +++ b/lib/vif/dune @@ -9,6 +9,8 @@ jsonm cmdliner hmap + decompress.zl + decompress.gz mirage-crypto-rng-miou-unix httpcats tyre)) diff --git a/lib/vif/json.ml b/lib/vif/json.ml index 4844712..3512ba6 100644 --- a/lib/vif/json.ml +++ b/lib/vif/json.ml @@ -6,7 +6,7 @@ type error = [ `Error of Jsonm.error ] type eoi = [ `End ] let error_msgf fmt = Fmt.kstr (fun msg -> Error (`Msg msg)) fmt -let _max_young_size = Sys.word_size / 8 * 256 +let _max_young_size = 0x7ff let rec pp ppf = function | `Null -> Fmt.const Fmt.string "()" ppf () diff --git a/lib/vif/stream.ml b/lib/vif/stream.ml index 9f1100f..606364e 100644 --- a/lib/vif/stream.ml +++ b/lib/vif/stream.ml @@ -1,49 +1,412 @@ -type 'a t = { - buf: 'a option array - ; mutable rd_pos: int - ; mutable wr_pos: int - ; mutable closed: bool - ; mutex: Miou.Mutex.t - ; non_empty_or_close: Miou.Condition.t - ; non_full: Miou.Condition.t -} - -let create len = - { - buf= Array.make len None - ; rd_pos= 0 - ; wr_pos= 0 - ; closed= false - ; mutex= Miou.Mutex.create () - ; non_empty_or_close= Miou.Condition.create () - ; non_full= Miou.Condition.create () +module Bqueue = struct + type 'a t = { + buf: 'a option array + ; mutable rd_pos: int + ; mutable wr_pos: int + ; mutable closed: bool + ; mutex: Miou.Mutex.t + ; non_empty_or_close: Miou.Condition.t + ; non_full: Miou.Condition.t } -let put t value = - Miou.Mutex.protect t.mutex @@ fun () -> - if t.closed then invalid_arg "Stream.put: closed stream"; - while (t.wr_pos + 1) mod Array.length t.buf = t.rd_pos do - Miou.Condition.wait t.non_full t.mutex - done; - t.buf.(t.wr_pos) <- Some value; - t.wr_pos <- (t.wr_pos + 1) mod Array.length t.buf; - Miou.Condition.signal t.non_empty_or_close + let create len = + { + buf= Array.make len None + ; rd_pos= 0 + ; wr_pos= 0 + ; closed= false + ; mutex= Miou.Mutex.create () + ; non_empty_or_close= Miou.Condition.create () + ; non_full= Miou.Condition.create () + } -let get t = - Miou.Mutex.protect t.mutex @@ fun () -> - while t.wr_pos = t.rd_pos && not t.closed do - Miou.Condition.wait t.non_empty_or_close t.mutex - done; - if t.wr_pos = t.rd_pos && t.closed then None - else begin - let value = t.buf.(t.rd_pos) in - t.buf.(t.rd_pos) <- None; - t.rd_pos <- (t.rd_pos + 1) mod Array.length t.buf; - Miou.Condition.signal t.non_full; - value - end + let put t value = + Miou.Mutex.protect t.mutex @@ fun () -> + if t.closed then invalid_arg "Stream.put: closed stream"; + while (t.wr_pos + 1) mod Array.length t.buf = t.rd_pos do + Miou.Condition.wait t.non_full t.mutex + done; + t.buf.(t.wr_pos) <- Some value; + t.wr_pos <- (t.wr_pos + 1) mod Array.length t.buf; + Miou.Condition.signal t.non_empty_or_close -let close t = - Miou.Mutex.protect t.mutex @@ fun () -> - t.closed <- true; - Miou.Condition.signal t.non_empty_or_close + let get t = + Miou.Mutex.protect t.mutex @@ fun () -> + while t.wr_pos = t.rd_pos && not t.closed do + Miou.Condition.wait t.non_empty_or_close t.mutex + done; + if t.wr_pos = t.rd_pos && t.closed then None + else begin + let value = t.buf.(t.rd_pos) in + t.buf.(t.rd_pos) <- None; + t.rd_pos <- (t.rd_pos + 1) mod Array.length t.buf; + Miou.Condition.signal t.non_full; + value + end + + let close t = + Miou.Mutex.protect t.mutex @@ fun () -> + t.closed <- true; + Miou.Condition.signal t.non_empty_or_close +end + +type 'a source = + | Source : { + init: unit -> 's + ; pull: 's -> ('a * 's) option + ; stop: 's -> unit + } + -> 'a source + +module Source = struct + let file ?offset path = + let buf = Bytes.create 0x7ff in + let init () = + let fd = Unix.openfile path Unix.[ O_RDONLY ] 0o644 in + match offset with + | Some offset -> + let _ = Unix.lseek fd offset Unix.SEEK_SET in + fd + | None -> fd + in + let stop fd = Unix.close fd in + let pull fd = + let len = Unix.read fd buf 0 (Bytes.length buf) in + if len == 0 then None else Some (Bytes.sub_string buf 0 len, fd) + in + (Source { init; stop; pull } : string source) + + let dispose (Source src) = src.stop (src.init ()) +end + +type ('a, 'r) sink = + | Sink : { + init: unit -> 's + ; push: 's -> 'a -> 's + ; full: 's -> bool + ; stop: 's -> 'r + } + -> ('a, 'r) sink + +module Bstr = struct + type bigstring = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + external get_uint8 : bigstring -> int -> int = "%caml_ba_ref_1" + external get_int32_ne : bigstring -> int -> int32 = "%caml_bigstring_get32" + + external unsafe_set_int32_ne : bigstring -> int -> int32 -> unit + = "%caml_bigstring_set32u" + + external unsafe_set_uint8 : bigstring -> int -> int -> unit + = "%caml_ba_unsafe_set_1" + + let blit_to_bytes bstr ~src_off dst ~dst_off ~len = + if + len < 0 + || src_off < 0 + || src_off > Bigarray.Array1.dim bstr - len + || dst_off < 0 + || dst_off > Bytes.length dst - len + then invalid_arg "Bstr.blit_to_bytes"; + let len0 = len land 3 in + let len1 = len lsr 2 in + for i = 0 to len1 - 1 do + let i = i * 4 in + let v = get_int32_ne bstr (src_off + i) in + Bytes.set_int32_ne dst (dst_off + i) v + done; + for i = 0 to len0 - 1 do + let i = (len1 * 4) + i in + let v = get_uint8 bstr (src_off + i) in + Bytes.set_uint8 dst (dst_off + i) v + done + + let blit_from_bytes src ~src_off bstr ~dst_off ~len = + if + len < 0 + || src_off < 0 + || src_off > Bytes.length src - len + || dst_off < 0 + || dst_off > Bigarray.Array1.dim bstr - len + then invalid_arg "Bstr.blit_from_bytes"; + let len0 = len land 3 in + let len1 = len lsr 2 in + for i = 0 to len1 - 1 do + let i = i * 4 in + let v = Bytes.get_int32_ne src (src_off + i) in + unsafe_set_int32_ne bstr (dst_off + i) v + done; + for i = 0 to len0 - 1 do + let i = (len1 * 4) + i in + let v = Bytes.get_uint8 src (src_off + i) in + unsafe_set_uint8 bstr (dst_off + i) v + done + + let sub_string bstr ~off ~len = + let buf = Bytes.create len in + blit_to_bytes bstr ~src_off:off buf ~dst_off:0 ~len; + Bytes.unsafe_to_string buf + + let of_string str = + let len = String.length str in + let bstr = Bigarray.(Array1.create char c_layout len) in + blit_from_bytes (Bytes.unsafe_of_string str) ~src_off:0 bstr ~dst_off:0 ~len; + bstr +end + +module Sink = struct + module Hdrs = Vif_headers + + let response ?headers:(hdrs = []) status server = + match Vif_s.reqd server with + | `V1 reqd -> + let hdrs = Hdrs.add_unless_exists hdrs "transfer-encoding" "chunked" in + let hdrs = H1.Headers.of_list hdrs in + let status = + match status with + | #H1.Status.t as status -> status + | _ -> invalid_arg "Sink.response: invalid status" + in + let resp = H1.Response.create ~headers:hdrs status in + let init () = H1.Reqd.respond_with_streaming reqd resp in + let push body str = + H1.Body.Writer.write_string body str; + body + in + let full _ = false in + (* TODO(dinosaure): content-length? *) + let stop = H1.Body.Writer.close in + (Sink { init; push; full; stop } : (string, unit) sink) + | `V2 reqd -> + let hdrs = Hdrs.add_unless_exists hdrs "transfer-encoding" "chunked" in + let hdrs = H2.Headers.of_list hdrs in + let resp = H2.Response.create ~headers:hdrs status in + let init () = H2.Reqd.respond_with_streaming reqd resp in + let push body str = + H2.Body.Writer.write_string body str; + body + in + let full _ = false in + (* TODO(dinosaure): content-length? *) + let stop = H2.Body.Writer.close in + (Sink { init; push; full; stop } : (string, unit) sink) + + type value = [ `Null | `Bool of bool | `String of string | `Float of float ] + type await = [ `Await ] + type error = [ `Error of Jsonm.error ] + type eoi = [ `End ] + + let errorf fmt = Fmt.kstr (fun msg -> `Error msg) fmt + + let json = + let decoder = Jsonm.decoder `Manual in + let rec error (`Error err) = + errorf "Invalid JSON input: %a" Jsonm.pp_error err + and end_of_input `End = errorf "Unexpected end of input" + and arr acc k = + match Jsonm.decode decoder with + | #await -> `Await (fun () -> arr acc k) + | #error as v -> error v + | #eoi as v -> end_of_input v + | `Lexeme `Ae -> k (`A (List.rev acc)) + | `Lexeme v -> core (fun v -> arr (v :: acc) k) v + and name n k = + match Jsonm.decode decoder with + | #await -> `Await (fun () -> name n k) + | #error as v -> error v + | #eoi as v -> end_of_input v + | `Lexeme v -> core (fun v -> k (n, v)) v + and obj acc k = + match Jsonm.decode decoder with + | #await -> `Await (fun () -> obj acc k) + | #error as v -> error v + | #eoi as v -> end_of_input v + | `Lexeme `Oe -> k (`O (List.rev acc)) + | `Lexeme (`Name n) -> name n (fun v -> obj (v :: acc) k) + | `Lexeme v -> + errorf "Unexpected lexeme: %a (expected key)" Jsonm.pp_lexeme v + and core k = function + | #value as v -> k v + | `Os -> obj [] k + | `As -> arr [] k + | `Ae | `Oe -> errorf "Retrieve invalid end of JSON array/object" + | `Name _ -> errorf "Retrieve invalid JSON key value" + and init () = + match Jsonm.decode decoder with + | #await -> `Await init + | #error as v -> error v + | #eoi -> `Json `Null + | `Lexeme (#Jsonm.lexeme as lexeme) -> core (fun v -> `Json v) lexeme + in + let push v str = + match v with + | `Await k -> + Jsonm.Manual.src decoder + (Bytes.unsafe_of_string str) + 0 (String.length str); + k () + | `Error _ as err -> err + | `Json _ as value -> value + in + let full = function `Error _ | `Json _ -> true | _ -> false in + let rec stop = function + | `Await k -> + Jsonm.Manual.src decoder Bytes.empty 0 0; + stop (k ()) + | `Error msg -> Error (`Msg msg) + | `Json value -> Ok value + in + Sink { init; push; full; stop } +end + +type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed] + +module Flow = struct + let identity = { flow= Fun.id } + let compose { flow= f } { flow= g } = { flow= (fun sink -> f (g sink)) } + let ( << ) a b = compose a b + let ( >> ) b a = compose a b + + let rec deflate_until_end ~push ~acc encoder o = + match Zl.Def.encode encoder with + | `Await _ -> assert false + | `Flush encoder -> + let len = Bigarray.Array1.dim o - Zl.Def.dst_rem encoder in + let encoder = Zl.Def.dst encoder o 0 (Bigarray.Array1.dim o) in + let acc = push acc (Bstr.sub_string o ~off:0 ~len) in + deflate_until_end ~push ~acc encoder o + | `End encoder -> + let len = Bigarray.Array1.dim o - Zl.Def.dst_rem encoder in + push acc (Bstr.sub_string o ~off:0 ~len) + + let rec deflate_until_await ~push ~acc encoder o = + match Zl.Def.encode encoder with + | `Await encoder -> (encoder, o, acc) + | `Flush encoder -> + let len = Bigarray.Array1.dim o - Zl.Def.dst_rem encoder in + let encoder = Zl.Def.dst encoder o 0 (Bigarray.Array1.dim o) in + let acc = push acc (Bstr.sub_string o ~off:0 ~len) in + deflate_until_await ~push ~acc encoder o + | `End _ -> assert false + + let deflate ?(q = De.Queue.create 0x100) ?(w = De.Lz77.make_window ~bits:15) + ?(level = 4) () = + let flow (Sink k) = + let init () = + let encoder = Zl.Def.encoder ~q ~w ~level `Manual `Manual in + let o = De.bigstring_create 0x7ff in + let encoder = Zl.Def.dst encoder o 0 0x7ff in + let acc = k.init () in + (encoder, o, acc) + in + let push (encoder, o, acc) = function + | "" -> (encoder, o, acc) + | str -> + let bstr = Bstr.of_string str in + let encoder = Zl.Def.src encoder bstr 0 (String.length str) in + deflate_until_await ~push:k.push ~acc encoder o + in + let full (_, _, acc) = k.full acc in + let stop (encoder, o, acc) = + let encoder = Zl.Def.src encoder De.bigstring_empty 0 0 in + let acc = deflate_until_end ~push:k.push ~acc encoder o in + k.stop acc + in + Sink { init; stop; full; push } + in + { flow } +end + +external reraise : exn -> 'a = "%reraise" + +type 'a stream = { stream: 'r. ('a, 'r) sink -> 'r } [@@unboxed] + +module Stream = struct + let run ~from:(Source src) ~via:{ flow } ~into:snk = + let (Sink snk) = flow snk in + let rec loop r s = + match snk.full r with + | true -> + let r' = snk.stop r in + let leftover = Source { src with init= Fun.const s } in + (r', Some leftover) + | false -> begin + match src.pull s with + | Some (x, s') -> loop (snk.push r x) s' + | None -> + src.stop s; + let r' = snk.stop r in + (r', None) + end + in + let r0 = snk.init () in + match snk.full r0 with + | true -> + let r' = snk.stop r0 in + (r', Some (Source src)) + | false -> ( + let s0' = ref None in + try + let s0 = src.init () in + s0' := Some s0; + loop r0 s0 + with exn -> + Option.iter src.stop !s0'; + let _ = snk.stop r0 in + reraise exn) + + let into sink t = t.stream sink + + let via { flow } t = + let stream sink = into (flow sink) t in + { stream } + + let from (Source src) = + let stream (Sink k) = + let rec go r s = + if k.full r then k.stop r + else + match src.pull s with + | None -> src.stop s; k.stop r + | Some (x, s') -> go (k.push r x) s' + in + let r0 = k.init () in + if k.full r0 then k.stop r0 + else + let s0' = ref None in + try + let s0 = src.init () in + s0' := Some s0; + go r0 s0 + with exn -> + Option.iter src.stop !s0'; + let _ = k.stop r0 in + reraise exn + in + { stream } + + let bracket : init:(unit -> 's) -> stop:('s -> 'r) -> ('s -> 's) -> 'r = + fun ~init ~stop fn -> + let acc = init () in + try stop (fn acc) + with exn -> + let _ = stop acc in + reraise exn + + let of_bqueue bq = + let stream (Sink k) = + let rec go r = + if k.full r then r + else + match Bqueue.get bq with None -> r | Some str -> go (k.push r str) + in + let stop r = Bqueue.close bq; k.stop r in + bracket go ~init:k.init ~stop + in + { stream } + + let singleton v = + let stream (Sink k) = k.stop (k.push (k.init ()) v) in + { stream } +end diff --git a/lib/vif/stream.mli b/lib/vif/stream.mli index da91ca3..88dd446 100644 --- a/lib/vif/stream.mli +++ b/lib/vif/stream.mli @@ -1,6 +1,69 @@ -type 'a t +module Bqueue : sig + type 'a t -val create : int -> 'a t -val put : 'a t -> 'a -> unit -val get : 'a t -> 'a option -val close : 'a t -> unit + val create : int -> 'a t + val put : 'a t -> 'a -> unit + val get : 'a t -> 'a option + val close : 'a t -> unit +end + +type 'a source = + | Source : { + init: unit -> 's + ; pull: 's -> ('a * 's) option + ; stop: 's -> unit + } + -> 'a source + +module Source : sig + val file : ?offset:int -> string -> string source + val dispose : 'a source -> unit +end + +type ('a, 'r) sink = + | Sink : { + init: unit -> 's + ; push: 's -> 'a -> 's + ; full: 's -> bool + ; stop: 's -> 'r + } + -> ('a, 'r) sink + +module Sink : sig + val response : + ?headers:Vif_headers.t -> Vif_status.t -> Vif_s.t -> (string, unit) sink + + val json : (string, (Json.t, [ `Msg of string ]) result) sink +end + +type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed] + +module Flow : sig + val identity : ('a, 'a) flow + val compose : ('a, 'b) flow -> ('b, 'c) flow -> ('a, 'c) flow + val ( >> ) : ('a, 'b) flow -> ('c, 'a) flow -> ('c, 'b) flow + val ( << ) : ('a, 'b) flow -> ('b, 'c) flow -> ('a, 'c) flow + + val deflate : + ?q:De.Queue.t + -> ?w:De.Lz77.window + -> ?level:int + -> unit + -> (string, string) flow +end + +type 'a stream = { stream: 'r. ('a, 'r) sink -> 'r } [@@unboxed] + +module Stream : sig + val run : + from:'a source + -> via:('a, 'b) flow + -> into:('b, 'c) sink + -> 'c * 'a source option + + val into : ('a, 'b) sink -> 'a stream -> 'b + val via : ('a, 'b) flow -> 'a stream -> 'b stream + val from : 'a source -> 'a stream + val of_bqueue : 'a Bqueue.t -> 'a stream + val singleton : 'a -> 'a stream +end diff --git a/lib/vif/vif.mli b/lib/vif/vif.mli index b07ad71..de6471b 100644 --- a/lib/vif/vif.mli +++ b/lib/vif/vif.mli @@ -22,14 +22,7 @@ module U : sig val eval : ('f, string) t -> 'f end -module Stream : sig - type 'a t - - val create : int -> 'a t - val put : 'a t -> 'a -> unit - val get : 'a t -> 'a option - val close : 'a t -> unit -end +module Stream = Stream module Headers : sig type t = (string * string) list @@ -69,8 +62,8 @@ module Request : sig val version : ('c, 'a) t -> int val headers : ('c, 'a) t -> Headers.t val to_string : ('c, 'a) t -> string - val to_stream : ('c, 'a) t -> string Stream.t - val to_json : (Content_type.json, 'a) t -> ('a, [> `Msg of string ]) result + val to_stream : ('c, 'a) t -> string Stream.stream + val of_json : (Content_type.json, 'a) t -> ('a, [ `Msg of string ]) result end module R : sig @@ -232,7 +225,12 @@ module Response : sig type t val with_stream : - S.t -> ?headers:Headers.t -> Status.t -> (string Stream.t -> unit) -> t + ?compression:[ `DEFLATE ] + -> S.t + -> ?headers:Headers.t + -> Status.t + -> string Stream.stream + -> t val with_string : S.t -> ?headers:Headers.t -> Status.t -> string -> t end diff --git a/lib/vif/vif_request.ml b/lib/vif/vif_request.ml index 3de4466..3cbec65 100644 --- a/lib/vif/vif_request.ml +++ b/lib/vif/vif_request.ml @@ -51,10 +51,10 @@ let to_string ~schedule ~close body = Miou.Computation.await_exn c let to_stream ~schedule ~close body = - let stream = Stream.create 0x7ff in - let rec on_eof () = close body; Stream.close stream + let stream = Stream.Bqueue.create 0x100 in + let rec on_eof () = close body; Stream.Bqueue.close stream and on_read bstr ~off ~len = - Stream.put stream (Bigstringaf.substring bstr ~off ~len); + Stream.Bqueue.put stream (Bigstringaf.substring bstr ~off ~len); schedule body ~on_eof ~on_read in schedule body ~on_eof ~on_read; @@ -74,34 +74,31 @@ let to_stream { body; _ } = | `V1 body -> to_stream ~schedule:H1.Body.Reader.schedule_read ~close:H1.Body.Reader.close body + |> Stream.Stream.of_bqueue | `V2 body -> to_stream ~schedule:H2.Body.Reader.schedule_read ~close:H2.Body.Reader.close body + |> Stream.Stream.of_bqueue let destruct : type a. a Json_encoding.encoding -> Json.t -> a = Json_encoding.destruct let error_msgf fmt = Format.kasprintf (fun msg -> Error (`Msg msg)) fmt -let to_json : type a. +let of_json : type a. (Vif_content_type.json, a) t -> (a, [> `Msg of string ]) result = function | { encoding= Any; _ } as req -> Ok (to_string req) | { encoding= Json; _ } as req -> Log.debug (fun m -> m "Parse the body as a JSON data"); let stream = to_stream req in - Log.debug (fun m -> m "Get the stream"); - let input () = - let str = Stream.get stream in - Log.debug (fun m -> m "json(%a)" Fmt.(option (fmt "%S")) str); - str - in - Json.decode ~input Result.ok - | { encoding= Json_encoding encoding; _ } as req -> ( + Stream.Stream.into Stream.Sink.json stream + | { encoding= Json_encoding encoding; _ } as req -> begin let stream = to_stream req in - let input () = Stream.get stream in - match Json.decode ~input Result.ok with + match Stream.Stream.into Stream.Sink.json stream with | Error (`Msg _) as err -> err - | Ok (json : Json.t) -> ( + | Ok (json : Json.t) -> begin try Ok (destruct encoding json) with Json_encoding.Cannot_destruct (_, _) -> - error_msgf "Invalid JSON value")) + error_msgf "Invalid JSON value" + end + end diff --git a/lib/vif/vif_response.ml b/lib/vif/vif_response.ml index 33a4c50..0cffd30 100644 --- a/lib/vif/vif_response.ml +++ b/lib/vif/vif_response.ml @@ -40,35 +40,20 @@ let with_string server ?headers:(hdrs = []) status str = H2.Reqd.respond_with_string reqd resp str; Response -let with_stream server ?headers:(hdrs = []) status fn = - match Vif_s.reqd server with - | `V1 reqd -> - let hdrs = Hdrs.add_unless_exists hdrs "transfer-encoding" "chunked" in - let hdrs = H1.Headers.of_list hdrs in - let status = - match status with - | #H1.Status.t as status -> status - | _ -> invalid_arg "Response.with_string: invalid status" - in - let resp = H1.Response.create ~headers:hdrs status in - let stream = Stream.create 0x7ff in - let body = H1.Reqd.respond_with_streaming reqd resp in - let res0 = Miou.Ownership.create ~finally:H1.Body.Writer.close body in - let res1 = Miou.Ownership.create ~finally:Stream.close stream in - let rec send stream body res = - match Stream.get stream with - | Some str -> - H1.Body.Writer.write_string body str; - send stream body res - | None -> H1.Body.Writer.close body; Miou.Ownership.disown res - in - let prm0 = Miou.async ~give:[ res0 ] @@ fun () -> send stream body res0 in - let prm1 = - Miou.async ~give:[ res1 ] @@ fun () -> - let () = fn stream in - Stream.close stream; Miou.Ownership.disown res1 - in - Miou.await_all [ prm0; prm1 ] - |> List.iter (function Ok () -> () | Error exn -> raise exn); - Response - | `V2 _ -> assert false +let with_stream ?compression server ?headers status stream = + let headers, stream = + match compression with + | None -> (headers, stream) + | Some `DEFLATE -> + let headers = + match headers with + | None -> Some [ ("content-encoding", "deflate") ] + | Some hdrs -> + Vif_headers.add_unless_exists hdrs "content-encoding" "deflate" + |> Option.some + in + (headers, Stream.Stream.via (Stream.Flow.deflate ()) stream) + in + let sink = Stream.Sink.response ?headers status server in + Stream.Stream.into sink stream; + Response