From 2720510f975fd6192243d7f5984d5b3cd931fd94 Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Fri, 14 Feb 2025 16:16:23 +0100 Subject: [PATCH] Fix vif --- examples/05-json/main.ml | 2 + examples/08-digest/main.ml | 25 +++++++++ lib/vif/stream.ml | 23 ++++++-- lib/vif/stream.mli | 5 +- lib/vif/vif.ml | 107 +++++++++++++++++++++++++++++++------ lib/vif/vif.mli | 3 +- lib/vif/vif_d.ml | 6 +-- lib/vif/vif_headers.ml | 26 +++++++-- lib/vif/vif_request.ml | 58 ++++---------------- lib/vif/vif_request0.ml | 28 +++++++++- lib/vif/vif_response.ml | 12 ++--- 11 files changed, 211 insertions(+), 84 deletions(-) create mode 100644 examples/08-digest/main.ml diff --git a/examples/05-json/main.ml b/examples/05-json/main.ml index 2a8c73f..f1a87d4 100644 --- a/examples/05-json/main.ml +++ b/examples/05-json/main.ml @@ -26,8 +26,10 @@ let foo = open Vif ;; let deserialize req _server () = + Logs.debug (fun m -> m "New request"); match Vif.Request.of_json req with | Ok (foo : foo) -> + Logs.debug (fun m -> m "JSON decoded"); let str = Fmt.str "username: %s, password: %s, age: %a, address: %a\n" foo.username foo.password diff --git a/examples/08-digest/main.ml b/examples/08-digest/main.ml new file mode 100644 index 0000000..0e2c63b --- /dev/null +++ b/examples/08-digest/main.ml @@ -0,0 +1,25 @@ +#require "vif" ;; +#require "digestif.c" ;; + +open Vif ;; + +let sha1 = + let open Stream in + let init () = Digestif.SHA1.empty in + let push ctx str = Digestif.SHA1.feed_string ctx str in + let full = Fun.const false in + let stop = Digestif.SHA1.get in + Sink { init; push; full; stop } +;; + +let default req target server () = + let stream = Request.stream req in + let hash = Stream.Stream.into sha1 stream in + let field = "content-type" in + let* () = Response.add ~field "text/plain; charset=utf-8" in + let* () = Response.with_string req (Digestif.SHA1.to_hex hash) in + Response.respond `OK +;; + +let () = Miou_unix.run @@ fun () -> + Vif.run ~default [] () ;; diff --git a/lib/vif/stream.ml b/lib/vif/stream.ml index 7cc9067..37e24dc 100644 --- a/lib/vif/stream.ml +++ b/lib/vif/stream.ml @@ -1,3 +1,9 @@ +let src = Logs.Src.create "vif.stream" + +module Log = (val Logs.src_log src : Logs.LOG) + +let ( % ) f g = fun x -> f (g x) + module Bqueue = struct type 'a t = { buf: 'a option array @@ -33,6 +39,8 @@ module Bqueue = struct let get t = Miou.Mutex.protect t.mutex @@ fun () -> while t.wr_pos = t.rd_pos && not t.closed do + Log.debug (fun m -> + m "bq: wr_pos:%d, rd_pos:%d, closed: %b" t.wr_pos t.rd_pos t.closed); Miou.Condition.wait t.non_empty_or_close t.mutex done; if t.wr_pos = t.rd_pos && t.closed then None @@ -163,7 +171,14 @@ module Sink = struct let errorf fmt = Fmt.kstr (fun msg -> `Error msg) fmt - let json = + let string = + let init () = Buffer.create 0x7ff in + let push buf str = Buffer.add_string buf str; buf in + let full = Fun.const false in + let stop = Buffer.contents in + Sink { init; push; full; stop } + + let json () = let decoder = Jsonm.decoder `Manual in let rec error (`Error err) = errorf "Invalid JSON input: %a" Jsonm.pp_error err @@ -355,6 +370,7 @@ module Stream = struct let acc = init () in try stop (fn acc) with exn -> + Log.err (fun m -> m "Stram.Sink.bracket: %s" (Printexc.to_string exn)); let _ = stop acc in reraise exn @@ -363,9 +379,10 @@ module Stream = struct let rec go r = if k.full r then r else - match Bqueue.get bq with None -> r | Some str -> go (k.push r str) + let value = Bqueue.get bq in + Option.fold ~none:r ~some:(go % k.push r) value in - let stop r = Bqueue.close bq; k.stop r in + let stop r = k.stop r in bracket go ~init:k.init ~stop in { stream } diff --git a/lib/vif/stream.mli b/lib/vif/stream.mli index 9fe3269..74d3207 100644 --- a/lib/vif/stream.mli +++ b/lib/vif/stream.mli @@ -30,7 +30,8 @@ type ('a, 'r) sink = -> ('a, 'r) sink module Sink : sig - val json : (string, (Json.t, [ `Msg of string ]) result) sink + val json : unit -> (string, (Json.t, [ `Msg of string ]) result) sink + val string : (string, string) sink end type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed] @@ -61,6 +62,6 @@ module Stream : sig val into : ('a, 'b) sink -> 'a stream -> 'b val via : ('a, 'b) flow -> 'a stream -> 'b stream val from : 'a source -> 'a stream - val of_bqueue : 'a Bqueue.t -> 'a stream + val of_bqueue : string Bqueue.t -> string stream val singleton : 'a -> 'a stream end diff --git a/lib/vif/vif.ml b/lib/vif/vif.ml index da22742..8b5e684 100644 --- a/lib/vif/vif.ml +++ b/lib/vif/vif.ml @@ -27,7 +27,7 @@ module Ds = struct | [] : 'value t | ( :: ) : ('value, 'a) D.device * 'value t -> 'value t - let run : Vif_d.t -> 'value t -> 'value -> Vif_d.t = + let run : Vif_d.Hmap.t -> 'value t -> 'value -> Vif_d.Hmap.t = fun t lst user's_value -> let rec go t = function | [] -> t @@ -91,7 +91,7 @@ let is_application_json { Multipart_form.Content_type.ty; subty; _ } = let content_type req0 = let headers = Vif_request0.headers req0 in - let c = List.assoc_opt "content-type" headers in + let c = Vif_headers.get headers "content-type" in let c = Option.map (fun c -> c ^ "\r\n") c in let c = Option.to_result ~none:`Not_found c in Result.bind c Multipart_form.Content_type.of_string @@ -141,18 +141,77 @@ let recognize_request ~env req0 = in { Vif_r.extract } -let handler cfg ~default ~middlewares routes devices user's_value = +type 'value daemon = { + queue: 'value user's_function Queue.t + ; mutex: Miou.Mutex.t + ; orphans: unit Miou.orphans + ; condition: Miou.Condition.t + ; user's_value: 'value + ; server: Vif_s.t +} + +and 'value user's_function = + | User's_task : Vif_request0.t * 'value fn -> 'value user's_function + +and 'value fn = Vif_s.t -> 'value -> (e, s, unit) Vif_response.t + +let to_ctx daemon req0 = + { + Ms.server= daemon.server + ; Ms.request= req0 + ; Ms.target= Vif_request0.target req0 + ; Ms.user's_value= daemon.user's_value + } + +let rec clean_up orphans = + match Miou.care orphans with + | None -> () + | Some None -> () + | Some (Some prm) -> begin + match Miou.await prm with + | Ok () -> clean_up orphans + | Error exn -> + let bt = Printexc.get_raw_backtrace () in + Log.err (fun m -> m "User's exception: %s" (Printexc.to_string exn)); + Log.err (fun m -> m "%s" (Printexc.raw_backtrace_to_string bt)); + clean_up orphans + end + +let rec user's_functions daemon = + clean_up daemon.orphans; + let tasks = + Miou.Mutex.protect daemon.mutex @@ fun () -> + while Queue.is_empty daemon.queue do + Miou.Condition.wait daemon.condition daemon.mutex + done; + let lst = List.of_seq (Queue.to_seq daemon.queue) in + Queue.drop daemon.queue; lst + in + let fn (User's_task (req0, fn)) = + let _prm = + Miou.call ~orphans:daemon.orphans @@ fun () -> + let response = fn daemon.server daemon.user's_value in + match Vif_response.(run req0 empty) response with + | Vif_response.Sent, () -> () + in + () + in + List.iter fn tasks; user's_functions daemon + +let handler _cfg ~default ~middlewares routes daemon = (); fun socket reqd -> - let server = { Vif_s.devices; cookie_key= cfg.Vif_config.cookie_key } in let req0 = Vif_request0.of_reqd socket reqd in - let target = Vif_request0.target req0 in - let ctx = { Ms.server; request= req0; target; user's_value } in + let ctx = to_ctx daemon req0 in let env = Ms.run middlewares ctx Vif_m.Hmap.empty in let request = recognize_request ~env req0 in + let target = Vif_request0.target req0 in let fn = R.dispatch ~default routes ~request ~target in - match Vif_response.(run req0 empty) (fn server user's_value) with - | Response.Sent, () -> () + begin + Miou.Mutex.protect daemon.mutex @@ fun () -> + Queue.push (User's_task (req0, fn)) daemon.queue; + Miou.Condition.signal daemon.condition + end type config = Vif_config.config @@ -193,29 +252,43 @@ let run ?(cfg = Vif_options.config_from_globals ()) ?(devices = Ds.[]) match interactive with | true -> let stop = Httpcats.Server.stop () in - let fn _sigint = Httpcats.Server.switch stop in + let fn _sigint = + Log.debug (fun m -> m "Server shutdown request (SIGINT)"); + Httpcats.Server.switch stop + in let behavior = Sys.Signal_handle fn in ignore (Miou.sys_signal Sys.sigint behavior); Some stop | false -> None in Logs.debug (fun m -> m "Vif.run, interactive:%b" interactive); - let[@warning "-8"] (Vif_d.Devices devices) = - Ds.run Vif_d.empty devices user's_value - in + let devices = Ds.run Vif_d.Hmap.empty devices user's_value in Logs.debug (fun m -> m "devices launched"); - let fn0 = handler cfg ~default ~middlewares routes devices user's_value in - let prm = Miou.async @@ fun () -> handle stop cfg fn0 in + let server = { Vif_s.devices; cookie_key= cfg.Vif_config.cookie_key } in + let daemon = + { + queue= Queue.create () + ; mutex= Miou.Mutex.create () + ; orphans= Miou.orphans () + ; condition= Miou.Condition.create () + ; user's_value + ; server + } + in + let user's_tasks = Miou.call @@ fun () -> user's_functions daemon in + let fn0 = handler cfg ~default ~middlewares routes daemon in + let prm0 = Miou.async @@ fun () -> handle stop cfg fn0 in let tasks = - List.init domains (fun _ -> - handler cfg ~default ~middlewares routes devices user's_value) + let fn _ = handler cfg ~default ~middlewares routes daemon in + List.init domains fn in let tasks = if domains > 0 then Miou.parallel (handle stop cfg) tasks else [] in - Miou.await_exn prm; + Miou.await_exn prm0; List.iter (function Ok () -> () | Error exn -> raise exn) tasks; Ds.finally (Vif_d.Devices devices); + Miou.cancel user's_tasks; Log.debug (fun m -> m "Vif (and devices) terminated") let setup_config = Vif_options.setup_config diff --git a/lib/vif/vif.mli b/lib/vif/vif.mli index 0594749..c9b205e 100644 --- a/lib/vif/vif.mli +++ b/lib/vif/vif.mli @@ -67,9 +67,8 @@ module Request : sig val meth : ('c, 'a) t -> Method.t val version : ('c, 'a) t -> int val headers : ('c, 'a) t -> Headers.t - val to_string : ('c, 'a) t -> string - val to_stream : ('c, 'a) t -> string Stream.stream val of_json : (Content_type.json, 'a) t -> ('a, [ `Msg of string ]) result + val stream : ('c, 'a) t -> string Stream.stream val get : ('cfg, 'v) M.t -> ('c, 'a) t -> 'v option type request diff --git a/lib/vif/vif_d.ml b/lib/vif/vif_d.ml index 0dcec64..4cde1bd 100644 --- a/lib/vif/vif_d.ml +++ b/lib/vif/vif_d.ml @@ -67,13 +67,13 @@ let device : type v f r. let key : r Hmap.key = Hmap.Key.create { name; finally } in Device (args, fn, key) -let run : type v. t -> v -> (v, 'r) device -> t = +let run : type v. Hmap.t -> v -> (v, 'r) device -> Hmap.t = fun devices user's_value (Device (args, fn, key)) -> let v = ref None in let k fn devices = v := Some devices; fn user's_value in - let x = keval_args devices user's_value k args fn in + let x = keval_args (Devices devices) user's_value k args fn in let[@warning "-8"] (Devices t) = Option.get !v in - Devices (Hmap.add key x t) + Hmap.add key x t diff --git a/lib/vif/vif_headers.ml b/lib/vif/vif_headers.ml index 72ddd1d..f89b32a 100644 --- a/lib/vif/vif_headers.ml +++ b/lib/vif/vif_headers.ml @@ -1,6 +1,26 @@ type t = (string * string) list -let add_unless_exists hdrs k v = - if List.mem_assoc k hdrs then hdrs else (k, v) :: hdrs +let mem hdrs key = + let exception True in + let key = String.lowercase_ascii key in + let fn (key', _) = + if String.lowercase_ascii key' = key then raise_notrace True + in + try List.iter fn hdrs; false with True -> true -let get hdrs key = List.assoc_opt key hdrs +let add_unless_exists hdrs k v = if mem hdrs k then hdrs else (k, v) :: hdrs + +let get hdrs key = + let exception Found of string in + let key = String.lowercase_ascii key in + let fn (key', value) = + if String.lowercase_ascii key' = key then raise_notrace (Found value) + in + try List.iter fn hdrs; None with Found value -> Some value + +let rem hdrs key = + let key = String.lowercase_ascii key in + let fn acc (key', value) = + if String.lowercase_ascii key' = key then acc else (key', value) :: acc + in + List.fold_left fn [] hdrs |> List.rev diff --git a/lib/vif/vif_request.ml b/lib/vif/vif_request.ml index 7de53b4..dc912ff 100644 --- a/lib/vif/vif_request.ml +++ b/lib/vif/vif_request.ml @@ -23,49 +23,11 @@ let meth { request; _ } = Vif_request0.meth request let version { request; _ } = Vif_request0.version request let headers { request; _ } = Vif_request0.headers request let reqd { request; _ } = Vif_request0.reqd request +let stream { request; _ } = Vif_request0.stream request -let to_string ~schedule ~close body = - let buf = Buffer.create 0x7ff in - let c = Miou.Computation.create () in - let rec on_eof () = - close body; - assert (Miou.Computation.try_return c (Buffer.contents buf)) - and on_read bstr ~off ~len = - Buffer.add_string buf (Bigstringaf.substring bstr ~off ~len); - schedule body ~on_eof ~on_read - in - schedule body ~on_eof ~on_read; - Miou.Computation.await_exn c - -let to_stream ~schedule ~close body = - let stream = Stream.Bqueue.create 0x100 in - let rec on_eof () = close body; Stream.Bqueue.close stream - and on_read bstr ~off ~len = - Stream.Bqueue.put stream (Bigstringaf.substring bstr ~off ~len); - schedule body ~on_eof ~on_read - in - schedule body ~on_eof ~on_read; - stream - -let to_string { body; _ } = - match body with - | `V1 body -> - to_string ~schedule:H1.Body.Reader.schedule_read - ~close:H1.Body.Reader.close body - | `V2 body -> - to_string ~schedule:H2.Body.Reader.schedule_read - ~close:H2.Body.Reader.close body - -let to_stream { body; _ } = - match body with - | `V1 body -> - to_stream ~schedule:H1.Body.Reader.schedule_read - ~close:H1.Body.Reader.close body - |> Stream.Stream.of_bqueue - | `V2 body -> - to_stream ~schedule:H2.Body.Reader.schedule_read - ~close:H2.Body.Reader.close body - |> Stream.Stream.of_bqueue +let to_string { request; _ } = + let stream = Vif_request0.stream request in + Stream.Stream.into Stream.Sink.string stream let destruct : type a. a Json_encoding.encoding -> Json.t -> a = Json_encoding.destruct @@ -76,18 +38,20 @@ let of_json : type a. (Vif_content_type.json, a) t -> (a, [> `Msg of string ]) result = function | { encoding= Any; _ } as req -> Ok (to_string req) | { encoding= Json; _ } as req -> - Log.debug (fun m -> m "Parse the body as a JSON data"); - let stream = to_stream req in - Stream.Stream.into Stream.Sink.json stream + let stream = stream req in + Stream.Stream.into (Stream.Sink.json ()) stream | { encoding= Json_encoding encoding; _ } as req -> begin - let stream = to_stream req in - match Stream.Stream.into Stream.Sink.json stream with + let stream = stream req in + match Stream.Stream.into (Stream.Sink.json ()) stream with | Error (`Msg _) as err -> err | Ok (json : Json.t) -> begin try Ok (destruct encoding json) with Json_encoding.Cannot_destruct (_, _) -> error_msgf "Invalid JSON value" end + | exception exn -> + Log.err (fun m -> m "Invalid JSON: %s" (Printexc.to_string exn)); + error_msgf "Invalid JSON value" end let get : type v. ('cfg, v) Vif_m.t -> ('a, 'c) t -> v option = diff --git a/lib/vif/vif_request0.ml b/lib/vif/vif_request0.ml index d65f8dc..3181c9c 100644 --- a/lib/vif/vif_request0.ml +++ b/lib/vif/vif_request0.ml @@ -4,12 +4,36 @@ type t = { ; reqd: reqd ; socket: socket ; on_localhost: bool + ; stream: string Stream.stream } and reqd = Httpcats.Server.reqd and socket = [ `Tcp of Miou_unix.file_descr | `Tls of Tls_miou_unix.t ] and request = V1 of H1.Request.t | V2 of H2.Request.t +let to_bqueue ~schedule ~close body = + let stream = Stream.Bqueue.create 0x100 in + let rec on_eof () = close body; Stream.Bqueue.close stream + and on_read bstr ~off ~len = + let str = Bigstringaf.substring bstr ~off ~len in + Stream.Bqueue.put stream str; + schedule body ~on_eof ~on_read + in + schedule body ~on_eof ~on_read; + stream + +let to_stream = function + | `V1 reqd -> + let body = H1.Reqd.request_body reqd in + to_bqueue ~schedule:H1.Body.Reader.schedule_read + ~close:H1.Body.Reader.close body + |> Stream.Stream.of_bqueue + | `V2 reqd -> + let body = H2.Reqd.request_body reqd in + to_bqueue ~schedule:H2.Body.Reader.schedule_read + ~close:H2.Body.Reader.close body + |> Stream.Stream.of_bqueue + let of_reqd socket reqd = let request = match reqd with @@ -33,7 +57,8 @@ let of_reqd socket reqd = inet_addr = Unix.inet_addr_loopback || inet_addr = Unix.inet6_addr_loopback in - { request; tls; reqd; socket; on_localhost } + let stream = to_stream reqd in + { request; tls; reqd; socket; on_localhost; stream } let headers { request; _ } = match request with @@ -59,3 +84,4 @@ let version { request; _ } = match request with V1 _ -> 1 | V2 _ -> 2 let tls { tls; _ } = tls let on_localhost { on_localhost; _ } = on_localhost let reqd { reqd; _ } = reqd +let stream { stream; _ } = stream diff --git a/lib/vif/vif_response.ml b/lib/vif/vif_response.ml index 12efbe0..bb51c40 100644 --- a/lib/vif/vif_response.ml +++ b/lib/vif/vif_response.ml @@ -39,7 +39,7 @@ let strf fmt = Format.asprintf fmt module Hdrs = Vif_headers let compress_string ~headers str = - match List.assoc_opt "content-encoding" headers with + match Vif_headers.get headers "content-encoding" with | Some "gzip" -> assert false | _ -> str @@ -145,7 +145,7 @@ let run : type a p q. Vif_request0.t -> p state -> (p, q, a) t -> q state * a = go state (fn x) | state, Return x -> (state, x) | state, Add_unless_exists (k, v) -> begin - match List.assoc_opt k !headers with + match Vif_headers.get !headers k with | Some _ -> (state, false) | None -> headers := (k, v) :: !headers; @@ -155,21 +155,21 @@ let run : type a p q. Vif_request0.t -> p state -> (p, q, a) t -> q state * a = headers := (k, v) :: !headers; (state, ()) | state, Rem_header k -> - headers := List.remove_assoc k !headers; + headers := Vif_headers.rem !headers k; (state, ()) | state, Set_header (k, v) -> - headers := (k, v) :: List.remove_assoc k !headers; + headers := (k, v) :: Vif_headers.rem !headers k; (state, ()) | Empty, Stream stream -> (Filled stream, ()) | Empty, String str -> (Filled (Stream.Stream.singleton str), ()) | Filled stream, Respond status -> let headers = !headers in let headers, stream = - match List.assoc_opt "content-encoding" headers with + match Vif_headers.get headers "content-encoding" with | Some "deflate" -> let flow = Stream.Flow.deflate () in let stream = Stream.Stream.via flow stream in - let headers = List.remove_assoc "content-length" headers in + let headers = Vif_headers.rem headers "content-length" in let headers = Vif_headers.add_unless_exists headers "transfer-encoding" "chunked"