This commit is contained in:
Calascibetta Romain 2025-02-14 16:16:23 +01:00
parent 57d94524f7
commit 2720510f97
11 changed files with 211 additions and 84 deletions

View file

@ -26,8 +26,10 @@ let foo =
open Vif ;;
let deserialize req _server () =
Logs.debug (fun m -> m "New request");
match Vif.Request.of_json req with
| Ok (foo : foo) ->
Logs.debug (fun m -> m "JSON decoded");
let str =
Fmt.str "username: %s, password: %s, age: %a, address: %a\n"
foo.username foo.password

View file

@ -0,0 +1,25 @@
#require "vif" ;;
#require "digestif.c" ;;
open Vif ;;
let sha1 =
let open Stream in
let init () = Digestif.SHA1.empty in
let push ctx str = Digestif.SHA1.feed_string ctx str in
let full = Fun.const false in
let stop = Digestif.SHA1.get in
Sink { init; push; full; stop }
;;
let default req target server () =
let stream = Request.stream req in
let hash = Stream.Stream.into sha1 stream in
let field = "content-type" in
let* () = Response.add ~field "text/plain; charset=utf-8" in
let* () = Response.with_string req (Digestif.SHA1.to_hex hash) in
Response.respond `OK
;;
let () = Miou_unix.run @@ fun () ->
Vif.run ~default [] () ;;

View file

@ -1,3 +1,9 @@
let src = Logs.Src.create "vif.stream"
module Log = (val Logs.src_log src : Logs.LOG)
let ( % ) f g = fun x -> f (g x)
module Bqueue = struct
type 'a t = {
buf: 'a option array
@ -33,6 +39,8 @@ module Bqueue = struct
let get t =
Miou.Mutex.protect t.mutex @@ fun () ->
while t.wr_pos = t.rd_pos && not t.closed do
Log.debug (fun m ->
m "bq: wr_pos:%d, rd_pos:%d, closed: %b" t.wr_pos t.rd_pos t.closed);
Miou.Condition.wait t.non_empty_or_close t.mutex
done;
if t.wr_pos = t.rd_pos && t.closed then None
@ -163,7 +171,14 @@ module Sink = struct
let errorf fmt = Fmt.kstr (fun msg -> `Error msg) fmt
let json =
let string =
let init () = Buffer.create 0x7ff in
let push buf str = Buffer.add_string buf str; buf in
let full = Fun.const false in
let stop = Buffer.contents in
Sink { init; push; full; stop }
let json () =
let decoder = Jsonm.decoder `Manual in
let rec error (`Error err) =
errorf "Invalid JSON input: %a" Jsonm.pp_error err
@ -355,6 +370,7 @@ module Stream = struct
let acc = init () in
try stop (fn acc)
with exn ->
Log.err (fun m -> m "Stram.Sink.bracket: %s" (Printexc.to_string exn));
let _ = stop acc in
reraise exn
@ -363,9 +379,10 @@ module Stream = struct
let rec go r =
if k.full r then r
else
match Bqueue.get bq with None -> r | Some str -> go (k.push r str)
let value = Bqueue.get bq in
Option.fold ~none:r ~some:(go % k.push r) value
in
let stop r = Bqueue.close bq; k.stop r in
let stop r = k.stop r in
bracket go ~init:k.init ~stop
in
{ stream }

View file

@ -30,7 +30,8 @@ type ('a, 'r) sink =
-> ('a, 'r) sink
module Sink : sig
val json : (string, (Json.t, [ `Msg of string ]) result) sink
val json : unit -> (string, (Json.t, [ `Msg of string ]) result) sink
val string : (string, string) sink
end
type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed]
@ -61,6 +62,6 @@ module Stream : sig
val into : ('a, 'b) sink -> 'a stream -> 'b
val via : ('a, 'b) flow -> 'a stream -> 'b stream
val from : 'a source -> 'a stream
val of_bqueue : 'a Bqueue.t -> 'a stream
val of_bqueue : string Bqueue.t -> string stream
val singleton : 'a -> 'a stream
end

View file

@ -27,7 +27,7 @@ module Ds = struct
| [] : 'value t
| ( :: ) : ('value, 'a) D.device * 'value t -> 'value t
let run : Vif_d.t -> 'value t -> 'value -> Vif_d.t =
let run : Vif_d.Hmap.t -> 'value t -> 'value -> Vif_d.Hmap.t =
fun t lst user's_value ->
let rec go t = function
| [] -> t
@ -91,7 +91,7 @@ let is_application_json { Multipart_form.Content_type.ty; subty; _ } =
let content_type req0 =
let headers = Vif_request0.headers req0 in
let c = List.assoc_opt "content-type" headers in
let c = Vif_headers.get headers "content-type" in
let c = Option.map (fun c -> c ^ "\r\n") c in
let c = Option.to_result ~none:`Not_found c in
Result.bind c Multipart_form.Content_type.of_string
@ -141,18 +141,77 @@ let recognize_request ~env req0 =
in
{ Vif_r.extract }
let handler cfg ~default ~middlewares routes devices user's_value =
type 'value daemon = {
queue: 'value user's_function Queue.t
; mutex: Miou.Mutex.t
; orphans: unit Miou.orphans
; condition: Miou.Condition.t
; user's_value: 'value
; server: Vif_s.t
}
and 'value user's_function =
| User's_task : Vif_request0.t * 'value fn -> 'value user's_function
and 'value fn = Vif_s.t -> 'value -> (e, s, unit) Vif_response.t
let to_ctx daemon req0 =
{
Ms.server= daemon.server
; Ms.request= req0
; Ms.target= Vif_request0.target req0
; Ms.user's_value= daemon.user's_value
}
let rec clean_up orphans =
match Miou.care orphans with
| None -> ()
| Some None -> ()
| Some (Some prm) -> begin
match Miou.await prm with
| Ok () -> clean_up orphans
| Error exn ->
let bt = Printexc.get_raw_backtrace () in
Log.err (fun m -> m "User's exception: %s" (Printexc.to_string exn));
Log.err (fun m -> m "%s" (Printexc.raw_backtrace_to_string bt));
clean_up orphans
end
let rec user's_functions daemon =
clean_up daemon.orphans;
let tasks =
Miou.Mutex.protect daemon.mutex @@ fun () ->
while Queue.is_empty daemon.queue do
Miou.Condition.wait daemon.condition daemon.mutex
done;
let lst = List.of_seq (Queue.to_seq daemon.queue) in
Queue.drop daemon.queue; lst
in
let fn (User's_task (req0, fn)) =
let _prm =
Miou.call ~orphans:daemon.orphans @@ fun () ->
let response = fn daemon.server daemon.user's_value in
match Vif_response.(run req0 empty) response with
| Vif_response.Sent, () -> ()
in
()
in
List.iter fn tasks; user's_functions daemon
let handler _cfg ~default ~middlewares routes daemon =
();
fun socket reqd ->
let server = { Vif_s.devices; cookie_key= cfg.Vif_config.cookie_key } in
let req0 = Vif_request0.of_reqd socket reqd in
let target = Vif_request0.target req0 in
let ctx = { Ms.server; request= req0; target; user's_value } in
let ctx = to_ctx daemon req0 in
let env = Ms.run middlewares ctx Vif_m.Hmap.empty in
let request = recognize_request ~env req0 in
let target = Vif_request0.target req0 in
let fn = R.dispatch ~default routes ~request ~target in
match Vif_response.(run req0 empty) (fn server user's_value) with
| Response.Sent, () -> ()
begin
Miou.Mutex.protect daemon.mutex @@ fun () ->
Queue.push (User's_task (req0, fn)) daemon.queue;
Miou.Condition.signal daemon.condition
end
type config = Vif_config.config
@ -193,29 +252,43 @@ let run ?(cfg = Vif_options.config_from_globals ()) ?(devices = Ds.[])
match interactive with
| true ->
let stop = Httpcats.Server.stop () in
let fn _sigint = Httpcats.Server.switch stop in
let fn _sigint =
Log.debug (fun m -> m "Server shutdown request (SIGINT)");
Httpcats.Server.switch stop
in
let behavior = Sys.Signal_handle fn in
ignore (Miou.sys_signal Sys.sigint behavior);
Some stop
| false -> None
in
Logs.debug (fun m -> m "Vif.run, interactive:%b" interactive);
let[@warning "-8"] (Vif_d.Devices devices) =
Ds.run Vif_d.empty devices user's_value
in
let devices = Ds.run Vif_d.Hmap.empty devices user's_value in
Logs.debug (fun m -> m "devices launched");
let fn0 = handler cfg ~default ~middlewares routes devices user's_value in
let prm = Miou.async @@ fun () -> handle stop cfg fn0 in
let server = { Vif_s.devices; cookie_key= cfg.Vif_config.cookie_key } in
let daemon =
{
queue= Queue.create ()
; mutex= Miou.Mutex.create ()
; orphans= Miou.orphans ()
; condition= Miou.Condition.create ()
; user's_value
; server
}
in
let user's_tasks = Miou.call @@ fun () -> user's_functions daemon in
let fn0 = handler cfg ~default ~middlewares routes daemon in
let prm0 = Miou.async @@ fun () -> handle stop cfg fn0 in
let tasks =
List.init domains (fun _ ->
handler cfg ~default ~middlewares routes devices user's_value)
let fn _ = handler cfg ~default ~middlewares routes daemon in
List.init domains fn
in
let tasks =
if domains > 0 then Miou.parallel (handle stop cfg) tasks else []
in
Miou.await_exn prm;
Miou.await_exn prm0;
List.iter (function Ok () -> () | Error exn -> raise exn) tasks;
Ds.finally (Vif_d.Devices devices);
Miou.cancel user's_tasks;
Log.debug (fun m -> m "Vif (and devices) terminated")
let setup_config = Vif_options.setup_config

View file

@ -67,9 +67,8 @@ module Request : sig
val meth : ('c, 'a) t -> Method.t
val version : ('c, 'a) t -> int
val headers : ('c, 'a) t -> Headers.t
val to_string : ('c, 'a) t -> string
val to_stream : ('c, 'a) t -> string Stream.stream
val of_json : (Content_type.json, 'a) t -> ('a, [ `Msg of string ]) result
val stream : ('c, 'a) t -> string Stream.stream
val get : ('cfg, 'v) M.t -> ('c, 'a) t -> 'v option
type request

View file

@ -67,13 +67,13 @@ let device : type v f r.
let key : r Hmap.key = Hmap.Key.create { name; finally } in
Device (args, fn, key)
let run : type v. t -> v -> (v, 'r) device -> t =
let run : type v. Hmap.t -> v -> (v, 'r) device -> Hmap.t =
fun devices user's_value (Device (args, fn, key)) ->
let v = ref None in
let k fn devices =
v := Some devices;
fn user's_value
in
let x = keval_args devices user's_value k args fn in
let x = keval_args (Devices devices) user's_value k args fn in
let[@warning "-8"] (Devices t) = Option.get !v in
Devices (Hmap.add key x t)
Hmap.add key x t

View file

@ -1,6 +1,26 @@
type t = (string * string) list
let add_unless_exists hdrs k v =
if List.mem_assoc k hdrs then hdrs else (k, v) :: hdrs
let mem hdrs key =
let exception True in
let key = String.lowercase_ascii key in
let fn (key', _) =
if String.lowercase_ascii key' = key then raise_notrace True
in
try List.iter fn hdrs; false with True -> true
let get hdrs key = List.assoc_opt key hdrs
let add_unless_exists hdrs k v = if mem hdrs k then hdrs else (k, v) :: hdrs
let get hdrs key =
let exception Found of string in
let key = String.lowercase_ascii key in
let fn (key', value) =
if String.lowercase_ascii key' = key then raise_notrace (Found value)
in
try List.iter fn hdrs; None with Found value -> Some value
let rem hdrs key =
let key = String.lowercase_ascii key in
let fn acc (key', value) =
if String.lowercase_ascii key' = key then acc else (key', value) :: acc
in
List.fold_left fn [] hdrs |> List.rev

View file

@ -23,49 +23,11 @@ let meth { request; _ } = Vif_request0.meth request
let version { request; _ } = Vif_request0.version request
let headers { request; _ } = Vif_request0.headers request
let reqd { request; _ } = Vif_request0.reqd request
let stream { request; _ } = Vif_request0.stream request
let to_string ~schedule ~close body =
let buf = Buffer.create 0x7ff in
let c = Miou.Computation.create () in
let rec on_eof () =
close body;
assert (Miou.Computation.try_return c (Buffer.contents buf))
and on_read bstr ~off ~len =
Buffer.add_string buf (Bigstringaf.substring bstr ~off ~len);
schedule body ~on_eof ~on_read
in
schedule body ~on_eof ~on_read;
Miou.Computation.await_exn c
let to_stream ~schedule ~close body =
let stream = Stream.Bqueue.create 0x100 in
let rec on_eof () = close body; Stream.Bqueue.close stream
and on_read bstr ~off ~len =
Stream.Bqueue.put stream (Bigstringaf.substring bstr ~off ~len);
schedule body ~on_eof ~on_read
in
schedule body ~on_eof ~on_read;
stream
let to_string { body; _ } =
match body with
| `V1 body ->
to_string ~schedule:H1.Body.Reader.schedule_read
~close:H1.Body.Reader.close body
| `V2 body ->
to_string ~schedule:H2.Body.Reader.schedule_read
~close:H2.Body.Reader.close body
let to_stream { body; _ } =
match body with
| `V1 body ->
to_stream ~schedule:H1.Body.Reader.schedule_read
~close:H1.Body.Reader.close body
|> Stream.Stream.of_bqueue
| `V2 body ->
to_stream ~schedule:H2.Body.Reader.schedule_read
~close:H2.Body.Reader.close body
|> Stream.Stream.of_bqueue
let to_string { request; _ } =
let stream = Vif_request0.stream request in
Stream.Stream.into Stream.Sink.string stream
let destruct : type a. a Json_encoding.encoding -> Json.t -> a =
Json_encoding.destruct
@ -76,18 +38,20 @@ let of_json : type a.
(Vif_content_type.json, a) t -> (a, [> `Msg of string ]) result = function
| { encoding= Any; _ } as req -> Ok (to_string req)
| { encoding= Json; _ } as req ->
Log.debug (fun m -> m "Parse the body as a JSON data");
let stream = to_stream req in
Stream.Stream.into Stream.Sink.json stream
let stream = stream req in
Stream.Stream.into (Stream.Sink.json ()) stream
| { encoding= Json_encoding encoding; _ } as req -> begin
let stream = to_stream req in
match Stream.Stream.into Stream.Sink.json stream with
let stream = stream req in
match Stream.Stream.into (Stream.Sink.json ()) stream with
| Error (`Msg _) as err -> err
| Ok (json : Json.t) -> begin
try Ok (destruct encoding json)
with Json_encoding.Cannot_destruct (_, _) ->
error_msgf "Invalid JSON value"
end
| exception exn ->
Log.err (fun m -> m "Invalid JSON: %s" (Printexc.to_string exn));
error_msgf "Invalid JSON value"
end
let get : type v. ('cfg, v) Vif_m.t -> ('a, 'c) t -> v option =

View file

@ -4,12 +4,36 @@ type t = {
; reqd: reqd
; socket: socket
; on_localhost: bool
; stream: string Stream.stream
}
and reqd = Httpcats.Server.reqd
and socket = [ `Tcp of Miou_unix.file_descr | `Tls of Tls_miou_unix.t ]
and request = V1 of H1.Request.t | V2 of H2.Request.t
let to_bqueue ~schedule ~close body =
let stream = Stream.Bqueue.create 0x100 in
let rec on_eof () = close body; Stream.Bqueue.close stream
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
Stream.Bqueue.put stream str;
schedule body ~on_eof ~on_read
in
schedule body ~on_eof ~on_read;
stream
let to_stream = function
| `V1 reqd ->
let body = H1.Reqd.request_body reqd in
to_bqueue ~schedule:H1.Body.Reader.schedule_read
~close:H1.Body.Reader.close body
|> Stream.Stream.of_bqueue
| `V2 reqd ->
let body = H2.Reqd.request_body reqd in
to_bqueue ~schedule:H2.Body.Reader.schedule_read
~close:H2.Body.Reader.close body
|> Stream.Stream.of_bqueue
let of_reqd socket reqd =
let request =
match reqd with
@ -33,7 +57,8 @@ let of_reqd socket reqd =
inet_addr = Unix.inet_addr_loopback
|| inet_addr = Unix.inet6_addr_loopback
in
{ request; tls; reqd; socket; on_localhost }
let stream = to_stream reqd in
{ request; tls; reqd; socket; on_localhost; stream }
let headers { request; _ } =
match request with
@ -59,3 +84,4 @@ let version { request; _ } = match request with V1 _ -> 1 | V2 _ -> 2
let tls { tls; _ } = tls
let on_localhost { on_localhost; _ } = on_localhost
let reqd { reqd; _ } = reqd
let stream { stream; _ } = stream

View file

@ -39,7 +39,7 @@ let strf fmt = Format.asprintf fmt
module Hdrs = Vif_headers
let compress_string ~headers str =
match List.assoc_opt "content-encoding" headers with
match Vif_headers.get headers "content-encoding" with
| Some "gzip" -> assert false
| _ -> str
@ -145,7 +145,7 @@ let run : type a p q. Vif_request0.t -> p state -> (p, q, a) t -> q state * a =
go state (fn x)
| state, Return x -> (state, x)
| state, Add_unless_exists (k, v) -> begin
match List.assoc_opt k !headers with
match Vif_headers.get !headers k with
| Some _ -> (state, false)
| None ->
headers := (k, v) :: !headers;
@ -155,21 +155,21 @@ let run : type a p q. Vif_request0.t -> p state -> (p, q, a) t -> q state * a =
headers := (k, v) :: !headers;
(state, ())
| state, Rem_header k ->
headers := List.remove_assoc k !headers;
headers := Vif_headers.rem !headers k;
(state, ())
| state, Set_header (k, v) ->
headers := (k, v) :: List.remove_assoc k !headers;
headers := (k, v) :: Vif_headers.rem !headers k;
(state, ())
| Empty, Stream stream -> (Filled stream, ())
| Empty, String str -> (Filled (Stream.Stream.singleton str), ())
| Filled stream, Respond status ->
let headers = !headers in
let headers, stream =
match List.assoc_opt "content-encoding" headers with
match Vif_headers.get headers "content-encoding" with
| Some "deflate" ->
let flow = Stream.Flow.deflate () in
let stream = Stream.Stream.via flow stream in
let headers = List.remove_assoc "content-length" headers in
let headers = Vif_headers.rem headers "content-length" in
let headers =
Vif_headers.add_unless_exists headers "transfer-encoding"
"chunked"