read the data in chunks and send them chunk-wise

This commit is contained in:
Hannes Mehnert 2022-09-27 10:46:14 +02:00
parent c81ba101f9
commit 938da1a211
2 changed files with 51 additions and 9 deletions

View file

@ -66,7 +66,7 @@ let mirror =
package ~min:"3.7.0" "git-paf" ; package ~min:"3.7.0" "git-paf" ;
package "opam-file-format" ; package "opam-file-format" ;
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ; package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw" "tar-mirage" ; package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ;
package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ; package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ;
package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ; package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ;
] ]

View file

@ -382,6 +382,32 @@ module Make
KV.pp_error e (hash_to_string h) v); KV.pp_error e (hash_to_string h) v);
Error `Not_found Error `Not_found
let read_chunked t h v f =
match find_key t h v with
| Error _ as e -> Lwt.return e
| Ok x ->
let key = Mirage_kv.Key.v x in
KV.size t.dev key >>= function
| Error e ->
Logs.err (fun m -> m "error %a while reading %s %s"
KV.pp_error e (hash_to_string h) v);
Lwt.return (Error `Not_found)
| Ok len ->
let chunk_size = 4096 in
let rec read_more offset =
if offset < len then
KV.get_partial t.dev key ~offset ~length:chunk_size >>= function
| Ok data -> f data ; read_more (offset + chunk_size)
| Error e ->
Logs.err (fun m -> m "error %a while reading %s %s"
KV.pp_error e (hash_to_string h) v);
Lwt.return_unit
else
Lwt.return_unit
in
read_more 0 >|= fun () ->
Ok ()
let last_modified t h v = let last_modified t h v =
match find_key t h v with match find_key t h v with
| Error _ as e -> Lwt.return e | Error _ as e -> Lwt.return e
@ -389,10 +415,21 @@ module Make
KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function
| Ok data -> Ok data | Ok data -> Ok data
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %s" Logs.err (fun m -> m "error %a while last_modified %s %s"
KV.pp_error e (hash_to_string h) v); KV.pp_error e (hash_to_string h) v);
Error `Not_found Error `Not_found
end
let size t h v =
match find_key t h v with
| Error _ as e -> Lwt.return e
| Ok x ->
KV.size t.dev (Mirage_kv.Key.v x) >|= function
| Ok s -> Ok s
| Error e ->
Logs.err (fun m -> m "error %a while size %s %s"
KV.pp_error e (hash_to_string h) v);
Error `Not_found
end
module Tarball = struct module Tarball = struct
module Async = struct module Async = struct
@ -626,22 +663,27 @@ stamp: %S
respond_with_empty reqd resp; respond_with_empty reqd resp;
Lwt.return_unit Lwt.return_unit
else else
Disk.read store h hash >>= function Disk.size store h hash >>= function
| Error _ -> | Error _ ->
Logs.warn (fun m -> m "error retrieving size");
not_found reqd request.Httpaf.Request.target; not_found reqd request.Httpaf.Request.target;
Lwt.return_unit Lwt.return_unit
| Ok data -> | Ok size ->
let size = string_of_int size in
let mime_type = "application/octet-stream" in let mime_type = "application/octet-stream" in
let headers = [ let headers = [
"content-type", mime_type ; "content-type", mime_type ;
"etag", hash ; "etag", hash ;
"last-modified", last_modified ; "last-modified", last_modified ;
"content-length", string_of_int (String.length data) ; "content-length", size ;
] in ]
in
let headers = Httpaf.Headers.of_list headers in let headers = Httpaf.Headers.of_list headers in
let resp = Httpaf.Response.create ~headers `OK in let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data ; let body = Httpaf.Reqd.respond_with_streaming reqd resp in
Lwt.return_unit) Disk.read_chunked store h hash (fun chunk ->
Httpaf.Body.write_string body chunk) >|= fun _ ->
Httpaf.Body.close_writer body)
end end
| _ -> | _ ->
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target); Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);