stream the response body

This commit is contained in:
Hannes Mehnert 2022-10-27 13:21:24 +02:00
parent 5fe7693c52
commit 1b790ad9c0
2 changed files with 63 additions and 66 deletions

View file

@ -153,7 +153,7 @@ let prepare_http_1_1_headers headers host user_pass body_length =
| Some v -> add headers "content-length" (string_of_int v) in | Some v -> add headers "content-length" (string_of_int v) in
add_authentication ~add headers user_pass add_authentication ~add headers user_pass
let single_http_1_1_request ?config flow user_pass host meth path headers body = let single_http_1_1_request ?config flow user_pass host meth path headers body f f_init =
let body_length = Option.map String.length body in let body_length = Option.map String.length body in
let headers = prepare_http_1_1_headers headers host user_pass body_length in let headers = prepare_http_1_1_headers headers host user_pass body_length in
let req = Httpaf.Request.create ~headers meth path in let req = Httpaf.Request.create ~headers meth path in
@ -163,34 +163,28 @@ let single_http_1_1_request ?config flow user_pass host meth path headers body =
fun v -> fun v ->
if not !w then Lwt.wakeup_later notify_finished v if not !w then Lwt.wakeup_later notify_finished v
; w := true in ; w := true in
let on_eof response data () =
let response =
{
version= response.Httpaf.Response.version
; status= (response.Httpaf.Response.status :> H2.Status.t)
; reason= response.Httpaf.Response.reason
; headers=
H2.Headers.of_list
(Httpaf.Headers.to_list response.Httpaf.Response.headers)
} in
wakeup (Ok (response, data))
in
let response_handler response body = let response_handler response body =
let buf = Buffer.create 0x100 in let rec on_read on_eof data ba ~off ~len =
let rec on_eof () = let data =
let response = data >>= fun data ->
{ f data (Bigstringaf.substring ~off ~len ba)
version= response.Httpaf.Response.version in
; status= (response.Httpaf.Response.status :> H2.Status.t) Httpaf.Body.schedule_read body ~on_read:(on_read on_eof data) ~on_eof:(on_eof response data)
; reason= response.Httpaf.Response.reason in
; headers= let f_init = Lwt.return f_init in
H2.Headers.of_list Httpaf.Body.schedule_read body ~on_read:(on_read on_eof f_init) ~on_eof:(on_eof response f_init) in
(Httpaf.Headers.to_list response.Httpaf.Response.headers)
} in
wakeup (Ok (response, Some (Buffer.contents buf)))
and on_read ba ~off ~len =
Buffer.add_string buf (Bigstringaf.substring ~off ~len ba)
; Httpaf.Body.schedule_read body ~on_read ~on_eof in
let on_eof () =
let response =
{
version= response.Httpaf.Response.version
; status= (response.Httpaf.Response.status :> H2.Status.t)
; reason= response.Httpaf.Response.reason
; headers=
H2.Headers.of_list
(Httpaf.Headers.to_list response.Httpaf.Response.headers)
} in
wakeup (Ok (response, None)) in
Httpaf.Body.schedule_read body ~on_read ~on_eof in
let error_handler e = let error_handler e =
let err = let err =
match e with match e with
@ -216,7 +210,7 @@ let prepare_h2_headers headers host user_pass body_length =
(string_of_int (Option.value ~default:0 body_length)) in (string_of_int (Option.value ~default:0 body_length)) in
add_authentication ~add headers user_pass add_authentication ~add headers user_pass
let single_h2_request ?config ~scheme flow user_pass host meth path headers body let single_h2_request ?config ~scheme flow user_pass host meth path headers body f f_init
= =
let body_length = Option.map String.length body in let body_length = Option.map String.length body in
let headers = prepare_h2_headers headers host user_pass body_length in let headers = prepare_h2_headers headers host user_pass body_length in
@ -227,30 +221,26 @@ let single_h2_request ?config ~scheme flow user_pass host meth path headers body
fun v -> fun v ->
if not !w then Lwt.wakeup_later notify_finished v if not !w then Lwt.wakeup_later notify_finished v
; w := true in ; w := true in
let on_eof response data () =
let response =
{
version= {major= 2; minor= 0}
; status= response.H2.Response.status
; reason= ""
; headers= response.H2.Response.headers
} in
wakeup (Ok (response, data))
in
let response_handler response response_body = let response_handler response response_body =
let buf = Buffer.create 0x100 in let rec on_read on_eof data ba ~off ~len =
let rec on_eof () = let data =
let response = data >>= fun data ->
{ f data (Bigstringaf.substring ~off ~len ba)
version= {major= 2; minor= 0} in
; status= response.H2.Response.status H2.Body.Reader.schedule_read response_body ~on_read:(on_read on_eof data) ~on_eof:(on_eof response data)
; reason= "" in
; headers= response.H2.Response.headers let f_init = Lwt.return f_init in
} in H2.Body.Reader.schedule_read response_body ~on_read:(on_read on_eof f_init) ~on_eof:(on_eof response f_init) in
wakeup (Ok (response, Some (Buffer.contents buf)))
and on_read ba ~off ~len =
Buffer.add_string buf (Bigstringaf.substring ~off ~len ba)
; H2.Body.Reader.schedule_read response_body ~on_read ~on_eof in
let on_eof () =
let response =
{
version= {major= 2; minor= 0}
; status= response.H2.Response.status
; reason= ""
; headers= response.H2.Response.headers
} in
wakeup (Ok (response, None)) in
H2.Body.Reader.schedule_read response_body ~on_read ~on_eof in
let error_handler e = let error_handler e =
let err = let err =
match e with match e with
@ -319,7 +309,7 @@ let alpn_protocol_of_string = function
| "h2" -> Some `H2 | "h2" -> Some `H2
| _ -> None | _ -> None
let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri = let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri f f_init =
Lwt.return (decode_uri ~ctx uri) Lwt.return (decode_uri ~ctx uri)
>>? fun (ctx, scheme, host, user_pass, path) -> >>? fun (ctx, scheme, host, user_pass, path) ->
let ctx = let ctx =
@ -333,19 +323,22 @@ let single_request ~ctx ~alpn_protocol ?config cfg ~meth ~headers ?body uri =
Mimic.resolve ctx >>? fun flow -> Mimic.resolve ctx >>? fun flow ->
(match Option.bind (alpn_protocol flow) alpn_protocol_of_string, config with (match Option.bind (alpn_protocol flow) alpn_protocol_of_string, config with
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) -> | (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
single_http_1_1_request ~config flow user_pass host meth path headers body single_http_1_1_request ~config flow user_pass host meth path headers body f f_init
| (Some `HTTP_1_1 | None), None -> | (Some `HTTP_1_1 | None), None ->
single_http_1_1_request flow user_pass host meth path headers body single_http_1_1_request flow user_pass host meth path headers body f f_init
| (Some `H2 | None), Some (`H2 config) -> | (Some `H2 | None), Some (`H2 config) ->
single_h2_request ~config ~scheme flow user_pass host meth path headers body single_h2_request ~config ~scheme flow user_pass host meth path headers body f f_init
| Some `H2, None -> | Some `H2, None ->
single_h2_request ~scheme flow user_pass host meth path headers body single_h2_request ~scheme flow user_pass host meth path headers body f f_init
| Some `H2, Some (`HTTP_1_1 _) -> | Some `H2, Some (`HTTP_1_1 _) ->
single_h2_request ~scheme flow user_pass host meth path headers body single_h2_request ~scheme flow user_pass host meth path headers body f f_init
| Some `HTTP_1_1, Some (`H2 _) -> | Some `HTTP_1_1, Some (`H2 _) ->
single_http_1_1_request flow user_pass host meth path headers body) single_http_1_1_request flow user_pass host meth path headers body f f_init)
>>= fun r -> >>= fun r ->
Mimic.close flow >|= fun () -> r Mimic.close flow >>= fun () ->
match r with
| Error _ as e -> Lwt.return e
| Ok (resp, body) -> Lwt.map (fun body -> Ok (resp, body)) body
let tls_config ?tls_config ?config authenticator user's_authenticator = let tls_config ?tls_config ?config authenticator user's_authenticator =
lazy lazy
@ -378,7 +371,7 @@ let resolve_location ~uri ~location =
| _ -> Error (`Msg ("expected an absolute uri, got: " ^ uri))) | _ -> Error (`Msg ("expected an absolute uri, got: " ^ uri)))
| _ -> Error (`Msg ("unknown location (relative path): " ^ location)) | _ -> Error (`Msg ("unknown location (relative path): " ^ location))
let one_request let request
?config ?config
?tls_config:cfg ?tls_config:cfg
{ctx; alpn_protocol; authenticator} {ctx; alpn_protocol; authenticator}
@ -388,18 +381,20 @@ let one_request
?body ?body
?(max_redirect = 5) ?(max_redirect = 5)
?(follow_redirect = true) ?(follow_redirect = true)
uri = uri
f
f_init =
let tls_config = let tls_config =
tls_config ?tls_config:cfg ?config authenticator user's_authenticator in tls_config ?tls_config:cfg ?config authenticator user's_authenticator in
if not follow_redirect then if not follow_redirect then
single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers ?body single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers ?body
uri uri f f_init
else else
let rec follow_redirect count uri = let rec follow_redirect count uri =
if count = 0 then Lwt.return_error (`Msg "Redirect limit exceeded") if count = 0 then Lwt.return_error (`Msg "Redirect limit exceeded")
else else
single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers single_request ~ctx ~alpn_protocol ?config tls_config ~meth ~headers
?body uri ?body uri f f_init
>>? fun (resp, body) -> >>? fun (resp, body) ->
if Status.is_redirection resp.status then if Status.is_redirection resp.status then
match Headers.get resp.headers "location" with match Headers.get resp.headers "location" with

View file

@ -22,7 +22,7 @@ type response = {
; headers: Headers.t ; headers: Headers.t
} }
val one_request : val request :
?config:[ `H2 of H2.Config.t | `HTTP_1_1 of Httpaf.Config.t ] ?config:[ `H2 of H2.Config.t | `HTTP_1_1 of Httpaf.Config.t ]
-> ?tls_config:Tls.Config.client -> ?tls_config:Tls.Config.client
-> t -> t
@ -33,4 +33,6 @@ val one_request :
-> ?max_redirect:int -> ?max_redirect:int
-> ?follow_redirect:bool -> ?follow_redirect:bool
-> string -> string
-> (response * string option, [> Mimic.error ]) result Lwt.t -> ('a -> string -> 'a Lwt.t)
-> 'a
-> (response * 'a, [> Mimic.error ]) result Lwt.t