diff --git a/README.md b/README.md index a447342..f52a206 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,8 @@ open Lwt.Infix module Make (HTTP_client : Http_mirage_client.S) = struct let start http_client = - Http_mirage_client.one_request http_client "https://mirage.io/" + let body_f _response acc data = Lwt.return (acc ^ data) in + Http_mirage_client.one_request http_client "https://mirage.io/" body_f "" >>= function | Ok (resp, body) -> ... | Error _ -> ... diff --git a/src/http_mirage_client.ml b/src/http_mirage_client.ml index 1938e28..97c6669 100644 --- a/src/http_mirage_client.ml +++ b/src/http_mirage_client.ml @@ -153,7 +153,8 @@ let prepare_http_1_1_headers headers host user_pass body_length = | Some v -> add headers "content-length" (string_of_int v) in add_authentication ~add headers user_pass -let single_http_1_1_request ?config flow user_pass host meth path headers body = +let single_http_1_1_request + ?config flow user_pass host meth path headers body f f_init = let body_length = Option.map String.length body in let headers = prepare_http_1_1_headers headers host user_pass body_length in let req = Httpaf.Request.create ~headers meth path in @@ -163,34 +164,26 @@ let single_http_1_1_request ?config flow user_pass host meth path headers body = fun v -> if not !w then Lwt.wakeup_later notify_finished v ; w := true in + let on_eof response data () = wakeup (Ok (response, data)) in let response_handler response body = - let buf = Buffer.create 0x100 in - let rec on_eof () = - let response = - { - version= response.Httpaf.Response.version - ; status= (response.Httpaf.Response.status :> H2.Status.t) - ; reason= response.Httpaf.Response.reason - ; headers= - H2.Headers.of_list - (Httpaf.Headers.to_list response.Httpaf.Response.headers) - } in - wakeup (Ok (response, Some (Buffer.contents buf))) - and on_read ba ~off ~len = - Buffer.add_string buf (Bigstringaf.substring ~off ~len ba) - ; Httpaf.Body.schedule_read body ~on_read ~on_eof in - let on_eof () = - let response = - { - version= response.Httpaf.Response.version - ; status= (response.Httpaf.Response.status :> H2.Status.t) - ; reason= response.Httpaf.Response.reason - ; headers= - H2.Headers.of_list - (Httpaf.Headers.to_list response.Httpaf.Response.headers) - } in - wakeup (Ok (response, None)) in - Httpaf.Body.schedule_read body ~on_read ~on_eof in + let response = + { + version= response.Httpaf.Response.version + ; status= (response.Httpaf.Response.status :> H2.Status.t) + ; reason= response.Httpaf.Response.reason + ; headers= + H2.Headers.of_list + (Httpaf.Headers.to_list response.Httpaf.Response.headers) + } in + let rec on_read on_eof acc ba ~off ~len = + let acc = + acc >>= fun acc -> f response acc (Bigstringaf.substring ~off ~len ba) + in + Httpaf.Body.schedule_read body ~on_read:(on_read on_eof acc) + ~on_eof:(on_eof response acc) in + let f_init = Lwt.return f_init in + Httpaf.Body.schedule_read body ~on_read:(on_read on_eof f_init) + ~on_eof:(on_eof response f_init) in let error_handler e = let err = match e with @@ -216,8 +209,8 @@ let prepare_h2_headers headers host user_pass body_length = (string_of_int (Option.value ~default:0 body_length)) in add_authentication ~add headers user_pass -let single_h2_request ?config ~scheme flow user_pass host meth path headers body - = +let single_h2_request + ?config ~scheme flow user_pass host meth path headers body f f_init = let body_length = Option.map String.length body in let headers = prepare_h2_headers headers host user_pass body_length in let req = H2.Request.create ~scheme ~headers meth path in @@ -227,30 +220,24 @@ let single_h2_request ?config ~scheme flow user_pass host meth path headers body fun v -> if not !w then Lwt.wakeup_later notify_finished v ; w := true in + let on_eof response data () = wakeup (Ok (response, data)) in let response_handler response response_body = - let buf = Buffer.create 0x100 in - let rec on_eof () = - let response = - { - version= {major= 2; minor= 0} - ; status= response.H2.Response.status - ; reason= "" - ; headers= response.H2.Response.headers - } in - wakeup (Ok (response, Some (Buffer.contents buf))) - and on_read ba ~off ~len = - Buffer.add_string buf (Bigstringaf.substring ~off ~len ba) - ; H2.Body.Reader.schedule_read response_body ~on_read ~on_eof in - let on_eof () = - let response = - { - version= {major= 2; minor= 0} - ; status= response.H2.Response.status - ; reason= "" - ; headers= response.H2.Response.headers - } in - wakeup (Ok (response, None)) in - H2.Body.Reader.schedule_read response_body ~on_read ~on_eof in + let response = + { + version= {major= 2; minor= 0} + ; status= response.H2.Response.status + ; reason= "" + ; headers= response.H2.Response.headers + } in + let rec on_read on_eof acc ba ~off ~len = + let acc = + acc >>= fun acc -> f response acc (Bigstringaf.substring ~off ~len ba) + in + H2.Body.Reader.schedule_read response_body ~on_read:(on_read on_eof acc) + ~on_eof:(on_eof response acc) in + let f_init = Lwt.return f_init in + H2.Body.Reader.schedule_read response_body ~on_read:(on_read on_eof f_init) + ~on_eof:(on_eof response f_init) in let error_handler e = let err = match e with @@ -319,7 +306,8 @@ let alpn_protocol_of_string = function | "h2" -> Some `H2 | _ -> None -let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri = +let single_request + ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri f f_init = Lwt.return (decode_uri ~ctx uri) >>? fun (ctx, scheme, host, user_pass, path) -> let ctx = @@ -333,19 +321,26 @@ let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri = Mimic.resolve ctx >>? fun flow -> (match Option.bind (alpn_protocol flow) alpn_protocol_of_string, config with | (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) -> - single_http_1_1_request ~config flow user_pass host meth path headers body + single_http_1_1_request ~config flow user_pass host meth path headers body f + f_init | (Some `HTTP_1_1 | None), None -> - single_http_1_1_request flow user_pass host meth path headers body + single_http_1_1_request flow user_pass host meth path headers body f f_init | (Some `H2 | None), Some (`H2 config) -> single_h2_request ~config ~scheme flow user_pass host meth path headers body + f f_init | Some `H2, None -> - single_h2_request ~scheme flow user_pass host meth path headers body + single_h2_request ~scheme flow user_pass host meth path headers body f + f_init | Some `H2, Some (`HTTP_1_1 _) -> - single_h2_request ~scheme flow user_pass host meth path headers body + single_h2_request ~scheme flow user_pass host meth path headers body f + f_init | Some `HTTP_1_1, Some (`H2 _) -> - single_http_1_1_request flow user_pass host meth path headers body) + single_http_1_1_request flow user_pass host meth path headers body f f_init) >>= fun r -> - Mimic.close flow >|= fun () -> r + Mimic.close flow >>= fun () -> + match r with + | Error _ as e -> Lwt.return e + | Ok (resp, body) -> Lwt.map (fun body -> Ok (resp, body)) body let tls_config ?tls_config ?config authenticator user's_authenticator = lazy @@ -378,7 +373,7 @@ let resolve_location ~uri ~location = | _ -> Error (`Msg ("expected an absolute uri, got: " ^ uri))) | _ -> Error (`Msg ("unknown location (relative path): " ^ location)) -let one_request +let request ?config ?tls_config:cfg {ctx; alpn_protocol; authenticator} @@ -388,18 +383,20 @@ let one_request ?body ?(max_redirect = 5) ?(follow_redirect = true) - uri = + uri + f + f_init = let tls_config = tls_config ?tls_config:cfg ?config authenticator user's_authenticator in if not follow_redirect then single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers ?body - uri + uri f f_init else let rec follow_redirect count uri = if count = 0 then Lwt.return_error (`Msg "Redirect limit exceeded") else single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers - ?body uri + ?body uri f f_init >>? fun (resp, body) -> if Status.is_redirection resp.status then match Headers.get resp.headers "location" with diff --git a/src/http_mirage_client.mli b/src/http_mirage_client.mli index 2baf154..f0ef229 100644 --- a/src/http_mirage_client.mli +++ b/src/http_mirage_client.mli @@ -22,7 +22,7 @@ type response = { ; headers: Headers.t } -val one_request : +val request : ?config:[ `H2 of H2.Config.t | `HTTP_1_1 of Httpaf.Config.t ] -> ?tls_config:Tls.Config.client -> t @@ -33,4 +33,11 @@ val one_request : -> ?max_redirect:int -> ?follow_redirect:bool -> string - -> (response * string option, [> Mimic.error ]) result Lwt.t + -> (response -> 'a -> string -> 'a Lwt.t) + -> 'a + -> (response * 'a, [> Mimic.error ]) result Lwt.t +(** [request ~config ~tls_config t ~authenticator ~meth ~headers ~body + ~max_redirect ~follow_redirect url body_f body_init] does a HTTP request + to [url] using [meth] and the HTTP protocol in [config]. The response is + the value of this function. The body is provided in chunks (see [body_f]). + Reasonably defaults are used if not provided. *) diff --git a/test/dune b/test/dune new file mode 100644 index 0000000..09e6f9e --- /dev/null +++ b/test/dune @@ -0,0 +1,10 @@ +(executable + (name test) + (libraries http-mirage-client tcpip.stack-socket paf.mirage + mirage-clock-unix mirage-random-stdlib happy-eyeballs-lwt mirage-time-unix + mimic-happy-eyeballs alcotest-lwt)) + +(rule + (alias runtest) + (action + (run ./test.exe --color=always))) diff --git a/test/test.ml b/test/test.ml new file mode 100644 index 0000000..508ecd0 --- /dev/null +++ b/test/test.ml @@ -0,0 +1,167 @@ +(* Functoria *) + +module DNS_client = Dns_client_mirage.Make (Mirage_random_stdlib) (Time) (Mclock) (Pclock) (Tcpip_stack_socket.V4V6) +module Happy_eyeballs = Happy_eyeballs_mirage.Make (Time) (Mclock) (Tcpip_stack_socket.V4V6) (DNS_client) +module Mimic_happy_eyeballs = Mimic_happy_eyeballs.Make (Tcpip_stack_socket.V4V6) (DNS_client) (Happy_eyeballs) +module HTTP_server = Paf_mirage.Make (Tcpip_stack_socket.V4V6.TCP) +module HTTP_client = Http_mirage_client.Make (Pclock) (Tcpip_stack_socket.V4V6.TCP) (Mimic_happy_eyeballs) + +let http_1_1_error_handler ?notify (ipaddr, port) ?request:_ error respond = + let contents = match error with + | `Bad_gateway -> Fmt.str "Bad gateway (%a:%d)" Ipaddr.pp ipaddr port + | `Bad_request -> Fmt.str "Bad request (%a:%d)" Ipaddr.pp ipaddr port + | `Exn exn -> Fmt.str "Exception %S (%a:%d)" (Printexc.to_string exn) Ipaddr.pp ipaddr port + | `Internal_server_error -> Fmt.str "Internal server error (%a:%d)" Ipaddr.pp ipaddr port in + let open Httpaf in + Option.iter (fun push -> push (Some ((ipaddr, port), error))) notify ; + let headers = Headers.of_list + [ "content-type", "text/plain" + ; "content-length", string_of_int (String.length contents) + ; "connection", "close" ] in + let body = respond headers in + Body.write_string body contents ; + Body.close_writer body + +let alpn_error_handler + : type reqd headers request response ro wo. + ?notify:(((Ipaddr.t * int) * Alpn.server_error) option -> unit) -> + (Ipaddr.t * int) -> (reqd, headers, request, response, ro, wo) Alpn.protocol -> + ?request:request -> Alpn.server_error -> (headers -> wo) -> unit + = fun ?notify (ipaddr, port) protocol ?request:_ error respond -> + let contents = match error with + | `Bad_gateway -> Fmt.str "Bad gateway (%a:%d)" Ipaddr.pp ipaddr port + | `Bad_request -> Fmt.str "Bad request (%a:%d)" Ipaddr.pp ipaddr port + | `Exn exn -> Fmt.str "Exception %S (%a:%d)" (Printexc.to_string exn) Ipaddr.pp ipaddr port + | `Internal_server_error -> Fmt.str "Internal server error (%a:%d)" Ipaddr.pp ipaddr port in + Option.iter (fun push -> push (Some ((ipaddr, port), error))) notify ; + let headers = + [ "content-type", "text/plain" + ; "content-length", string_of_int (String.length contents) ] in + match protocol with + | Alpn.HTTP_1_1 _ -> + let open Httpaf in + let headers = Headers.of_list (("connection", "close") :: headers) in + let body = respond headers in + Body.write_string body contents ; + Body.close_writer body + | Alpn.H2 _ -> + let open H2 in + let headers = Headers.of_list headers in + let body = respond headers in + H2.Body.Writer.write_string body contents ; + H2.Body.Writer.close body + +type alpn_handler = + { handler : 'reqd 'headers 'request 'response 'ro 'wo. + 'reqd -> ('reqd, 'headers, 'request, 'response, 'ro, 'wo) Alpn.protocol -> unit } +[@@unboxed] + +let server ?error ?stop stack = function + | `HTTP_1_1 (port, handler) -> + let open Lwt.Syntax in + let+ http_server = HTTP_server.init ~port stack in + let http_service = HTTP_server.http_service ~error_handler:(http_1_1_error_handler ?notify:error) + (fun _flow (_ipaddr, _port) -> handler) in + HTTP_server.serve ?stop http_service http_server + | `ALPN (tls, port, handler) -> + let open Lwt.Syntax in + let alpn_handler = + { Alpn.error= (fun edn protocol ?request v respond -> alpn_error_handler ?notify:error edn protocol ?request v respond) + ; Alpn.request= (fun _flow (_ipaddr, _port) reqd protocol -> handler.handler reqd protocol) } in + let+ http_server = HTTP_server.init ~port stack in + let alpn_service = HTTP_server.alpn_service ~tls alpn_handler in + HTTP_server.serve ?stop alpn_service http_server + +let stack ipaddr = + let open Lwt.Syntax in + let* tcpv4v6 = Tcpip_stack_socket.V4V6.TCP.connect ~ipv4_only:false ~ipv6_only:false + ipaddr None in + let* udpv4v6 = Tcpip_stack_socket.V4V6.UDP.connect ~ipv4_only:false ~ipv6_only:false + ipaddr None in + Tcpip_stack_socket.V4V6.connect udpv4v6 tcpv4v6 + +let test01 = + Alcotest_lwt.test_case "Simple Hello World! (GET)" `Quick @@ fun _sw () -> + let open Lwt.Syntax in + let stop = Lwt_switch.create () in + let handler reqd = + let open Httpaf in + let contents = "Hello World!" in + let headers = Headers.of_list + [ "content-type", "text/plain" + ; "content-length", string_of_int (String.length contents) + ; "connection", "close" ] in + let response = Response.create ~headers `OK in + Reqd.respond_with_string reqd response contents in + let* stack = stack Ipaddr.V4.Prefix.loopback in + let happy_eyeballs = Happy_eyeballs.create stack in + let* ctx = Mimic_happy_eyeballs.connect happy_eyeballs in + let* t = HTTP_client.connect ctx in + let* `Initialized _thread = server ~stop (Tcpip_stack_socket.V4V6.tcp stack) + (`HTTP_1_1 (8080, handler)) in + let* result = Http_mirage_client.request t "http://localhost:8080/" + (fun _response buf str -> Buffer.add_string buf str ; Lwt.return buf) + (Buffer.create 0x100) in + match result with + | Error err -> + let* () = Lwt_switch.turn_off stop in + let* () = Tcpip_stack_socket.V4V6.disconnect stack in + Alcotest.failf "Client error: %a" Mimic.pp_error err + | Ok (_response, buf) -> + let* () = Lwt_switch.turn_off stop in + let* () = Tcpip_stack_socket.V4V6.disconnect stack in + let body = Buffer.contents buf in + Alcotest.(check string) "body" "Hello World!" body ; + Lwt.return_unit + +let random_string ~len = + let res = Bytes.create len in + for i = 0 to len - 1 do + Bytes.set res i (Char.chr (Random.bits () land 0xff)) + done ; Bytes.unsafe_to_string res + +let test02 = + Alcotest_lwt.test_case "Repeat (POST)" `Quick @@ fun _sw () -> + let open Lwt.Syntax in + let stop = Lwt_switch.create () in + let handler reqd = + let open Httpaf in + let { Request.meth; _ } = Reqd.request reqd in + if meth <> `POST then invalid_arg "Invalid HTTP method" ; + let headers = Headers.of_list + [ "content-type", "text/plain" ] in + let response = Response.create ~headers `OK in + let src = Reqd.request_body reqd in + let dst = Reqd.respond_with_streaming reqd response in + let rec on_eof () = + Body.close_reader src ; + Body.close_writer dst + and on_read buf ~off ~len = + Body.write_bigstring dst ~off ~len buf ; + Body.schedule_read src ~on_eof ~on_read in + Body.schedule_read src ~on_eof ~on_read in + let* stack = stack Ipaddr.V4.Prefix.loopback in + let happy_eyeballs = Happy_eyeballs.create stack in + let* ctx = Mimic_happy_eyeballs.connect happy_eyeballs in + let* t = HTTP_client.connect ctx in + let* `Initialized _thread = server ~stop (Tcpip_stack_socket.V4V6.tcp stack) + (`HTTP_1_1 (8080, handler)) in + let str = random_string ~len:0x1000 in + let* result = Http_mirage_client.request ~meth:`POST ~body:str t "http://localhost:8080/" + (fun _response buf str -> Buffer.add_string buf str ; Lwt.return buf) + (Buffer.create 0x1000) in + match result with + | Error err -> + let* () = Lwt_switch.turn_off stop in + let* () = Tcpip_stack_socket.V4V6.disconnect stack in + Alcotest.failf "Client error: %a" Mimic.pp_error err + | Ok (_response, buf) -> + let* () = Lwt_switch.turn_off stop in + let* () = Tcpip_stack_socket.V4V6.disconnect stack in + let body = Buffer.contents buf in + Alcotest.(check string) "body" str body ; + Lwt.return_unit + + let () = Alcotest_lwt.run "http-mirage-client" + [ "http/1.1", [ test01; test02 ] ] + |> Lwt_main.run