From 1b790ad9c0954b2801b71ac422492bc123ca758b Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 27 Oct 2022 13:21:24 +0200 Subject: [PATCH] stream the response body --- src/http_mirage_client.ml | 123 ++++++++++++++++++------------------- src/http_mirage_client.mli | 6 +- 2 files changed, 63 insertions(+), 66 deletions(-) diff --git a/src/http_mirage_client.ml b/src/http_mirage_client.ml index 1938e28..230cc54 100644 --- a/src/http_mirage_client.ml +++ b/src/http_mirage_client.ml @@ -153,7 +153,7 @@ let prepare_http_1_1_headers headers host user_pass body_length = | Some v -> add headers "content-length" (string_of_int v) in add_authentication ~add headers user_pass -let single_http_1_1_request ?config flow user_pass host meth path headers body = +let single_http_1_1_request ?config flow user_pass host meth path headers body f f_init = let body_length = Option.map String.length body in let headers = prepare_http_1_1_headers headers host user_pass body_length in let req = Httpaf.Request.create ~headers meth path in @@ -163,34 +163,28 @@ let single_http_1_1_request ?config flow user_pass host meth path headers body = fun v -> if not !w then Lwt.wakeup_later notify_finished v ; w := true in + let on_eof response data () = + let response = + { + version= response.Httpaf.Response.version + ; status= (response.Httpaf.Response.status :> H2.Status.t) + ; reason= response.Httpaf.Response.reason + ; headers= + H2.Headers.of_list + (Httpaf.Headers.to_list response.Httpaf.Response.headers) + } in + wakeup (Ok (response, data)) + in let response_handler response body = - let buf = Buffer.create 0x100 in - let rec on_eof () = - let response = - { - version= response.Httpaf.Response.version - ; status= (response.Httpaf.Response.status :> H2.Status.t) - ; reason= response.Httpaf.Response.reason - ; headers= - H2.Headers.of_list - (Httpaf.Headers.to_list response.Httpaf.Response.headers) - } in - wakeup (Ok (response, Some (Buffer.contents buf))) - and on_read ba ~off ~len = - Buffer.add_string buf (Bigstringaf.substring ~off ~len ba) - ; Httpaf.Body.schedule_read body ~on_read ~on_eof in - let on_eof () = - let response = - { - version= response.Httpaf.Response.version - ; status= (response.Httpaf.Response.status :> H2.Status.t) - ; reason= response.Httpaf.Response.reason - ; headers= - H2.Headers.of_list - (Httpaf.Headers.to_list response.Httpaf.Response.headers) - } in - wakeup (Ok (response, None)) in - Httpaf.Body.schedule_read body ~on_read ~on_eof in + let rec on_read on_eof data ba ~off ~len = + let data = + data >>= fun data -> + f data (Bigstringaf.substring ~off ~len ba) + in + Httpaf.Body.schedule_read body ~on_read:(on_read on_eof data) ~on_eof:(on_eof response data) + in + let f_init = Lwt.return f_init in + Httpaf.Body.schedule_read body ~on_read:(on_read on_eof f_init) ~on_eof:(on_eof response f_init) in let error_handler e = let err = match e with @@ -216,7 +210,7 @@ let prepare_h2_headers headers host user_pass body_length = (string_of_int (Option.value ~default:0 body_length)) in add_authentication ~add headers user_pass -let single_h2_request ?config ~scheme flow user_pass host meth path headers body +let single_h2_request ?config ~scheme flow user_pass host meth path headers body f f_init = let body_length = Option.map String.length body in let headers = prepare_h2_headers headers host user_pass body_length in @@ -227,30 +221,26 @@ let single_h2_request ?config ~scheme flow user_pass host meth path headers body fun v -> if not !w then Lwt.wakeup_later notify_finished v ; w := true in + let on_eof response data () = + let response = + { + version= {major= 2; minor= 0} + ; status= response.H2.Response.status + ; reason= "" + ; headers= response.H2.Response.headers + } in + wakeup (Ok (response, data)) + in let response_handler response response_body = - let buf = Buffer.create 0x100 in - let rec on_eof () = - let response = - { - version= {major= 2; minor= 0} - ; status= response.H2.Response.status - ; reason= "" - ; headers= response.H2.Response.headers - } in - wakeup (Ok (response, Some (Buffer.contents buf))) - and on_read ba ~off ~len = - Buffer.add_string buf (Bigstringaf.substring ~off ~len ba) - ; H2.Body.Reader.schedule_read response_body ~on_read ~on_eof in - let on_eof () = - let response = - { - version= {major= 2; minor= 0} - ; status= response.H2.Response.status - ; reason= "" - ; headers= response.H2.Response.headers - } in - wakeup (Ok (response, None)) in - H2.Body.Reader.schedule_read response_body ~on_read ~on_eof in + let rec on_read on_eof data ba ~off ~len = + let data = + data >>= fun data -> + f data (Bigstringaf.substring ~off ~len ba) + in + H2.Body.Reader.schedule_read response_body ~on_read:(on_read on_eof data) ~on_eof:(on_eof response data) + in + let f_init = Lwt.return f_init in + H2.Body.Reader.schedule_read response_body ~on_read:(on_read on_eof f_init) ~on_eof:(on_eof response f_init) in let error_handler e = let err = match e with @@ -319,7 +309,7 @@ let alpn_protocol_of_string = function | "h2" -> Some `H2 | _ -> None -let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri = +let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri f f_init = Lwt.return (decode_uri ~ctx uri) >>? fun (ctx, scheme, host, user_pass, path) -> let ctx = @@ -333,19 +323,22 @@ let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri = Mimic.resolve ctx >>? fun flow -> (match Option.bind (alpn_protocol flow) alpn_protocol_of_string, config with | (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) -> - single_http_1_1_request ~config flow user_pass host meth path headers body + single_http_1_1_request ~config flow user_pass host meth path headers body f f_init | (Some `HTTP_1_1 | None), None -> - single_http_1_1_request flow user_pass host meth path headers body + single_http_1_1_request flow user_pass host meth path headers body f f_init | (Some `H2 | None), Some (`H2 config) -> - single_h2_request ~config ~scheme flow user_pass host meth path headers body + single_h2_request ~config ~scheme flow user_pass host meth path headers body f f_init | Some `H2, None -> - single_h2_request ~scheme flow user_pass host meth path headers body + single_h2_request ~scheme flow user_pass host meth path headers body f f_init | Some `H2, Some (`HTTP_1_1 _) -> - single_h2_request ~scheme flow user_pass host meth path headers body + single_h2_request ~scheme flow user_pass host meth path headers body f f_init | Some `HTTP_1_1, Some (`H2 _) -> - single_http_1_1_request flow user_pass host meth path headers body) + single_http_1_1_request flow user_pass host meth path headers body f f_init) >>= fun r -> - Mimic.close flow >|= fun () -> r + Mimic.close flow >>= fun () -> + match r with + | Error _ as e -> Lwt.return e + | Ok (resp, body) -> Lwt.map (fun body -> Ok (resp, body)) body let tls_config ?tls_config ?config authenticator user's_authenticator = lazy @@ -378,7 +371,7 @@ let resolve_location ~uri ~location = | _ -> Error (`Msg ("expected an absolute uri, got: " ^ uri))) | _ -> Error (`Msg ("unknown location (relative path): " ^ location)) -let one_request +let request ?config ?tls_config:cfg {ctx; alpn_protocol; authenticator} @@ -388,18 +381,20 @@ let one_request ?body ?(max_redirect = 5) ?(follow_redirect = true) - uri = + uri + f + f_init = let tls_config = tls_config ?tls_config:cfg ?config authenticator user's_authenticator in if not follow_redirect then single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers ?body - uri + uri f f_init else let rec follow_redirect count uri = if count = 0 then Lwt.return_error (`Msg "Redirect limit exceeded") else single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers - ?body uri + ?body uri f f_init >>? fun (resp, body) -> if Status.is_redirection resp.status then match Headers.get resp.headers "location" with diff --git a/src/http_mirage_client.mli b/src/http_mirage_client.mli index 2baf154..e15da90 100644 --- a/src/http_mirage_client.mli +++ b/src/http_mirage_client.mli @@ -22,7 +22,7 @@ type response = { ; headers: Headers.t } -val one_request : +val request : ?config:[ `H2 of H2.Config.t | `HTTP_1_1 of Httpaf.Config.t ] -> ?tls_config:Tls.Config.client -> t @@ -33,4 +33,6 @@ val one_request : -> ?max_redirect:int -> ?follow_redirect:bool -> string - -> (response * string option, [> Mimic.error ]) result Lwt.t + -> ('a -> string -> 'a Lwt.t) + -> 'a + -> (response * 'a, [> Mimic.error ]) result Lwt.t