Merge pull request 'read the data in chunks and send them chunk-wise' (#16) from partial-read into main
Reviewed-on: https://git.robur.io/robur/opam-mirror/pulls/16
This commit is contained in:
commit
83d494c433
2 changed files with 143 additions and 76 deletions
|
@ -66,7 +66,7 @@ let mirror =
|
||||||
package ~min:"3.7.0" "git-paf" ;
|
package ~min:"3.7.0" "git-paf" ;
|
||||||
package "opam-file-format" ;
|
package "opam-file-format" ;
|
||||||
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
||||||
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw" "tar-mirage" ;
|
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ;
|
||||||
package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ;
|
package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ;
|
||||||
package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ;
|
package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ;
|
||||||
]
|
]
|
||||||
|
|
|
@ -235,6 +235,45 @@ module Make
|
||||||
Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e);
|
Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
|
let find_key t h key =
|
||||||
|
match
|
||||||
|
match h with
|
||||||
|
| `MD5 -> SM.find_opt key t.md5s
|
||||||
|
| `SHA512 -> SM.find_opt key t.sha512s
|
||||||
|
| `SHA256 -> Some key
|
||||||
|
| _ -> None
|
||||||
|
with
|
||||||
|
| None -> Error `Not_found
|
||||||
|
| Some x -> Ok x
|
||||||
|
|
||||||
|
let read_chunked t h v f a =
|
||||||
|
match find_key t h v with
|
||||||
|
| Error `Not_found ->
|
||||||
|
Lwt.return (Error (`Not_found (Mirage_kv.Key.v v)))
|
||||||
|
| Ok x ->
|
||||||
|
let key = Mirage_kv.Key.v x in
|
||||||
|
KV.size t.dev key >>= function
|
||||||
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a while reading %s %s"
|
||||||
|
KV.pp_error e (hash_to_string h) v);
|
||||||
|
Lwt.return (Error (`Not_found key))
|
||||||
|
| Ok len ->
|
||||||
|
let chunk_size = 4096 in
|
||||||
|
let rec read_more a offset =
|
||||||
|
if offset < len then
|
||||||
|
KV.get_partial t.dev key ~offset ~length:chunk_size >>= function
|
||||||
|
| Ok data ->
|
||||||
|
f a data >>= fun a ->
|
||||||
|
read_more a (offset + chunk_size)
|
||||||
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a while reading %s %s"
|
||||||
|
KV.pp_error e (hash_to_string h) v);
|
||||||
|
Lwt.return (Error e)
|
||||||
|
else
|
||||||
|
Lwt.return (Ok a)
|
||||||
|
in
|
||||||
|
read_more a 0
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
let init ~verify dev dev_md5s dev_sha512s =
|
let init ~verify dev dev_md5s dev_sha512s =
|
||||||
Logs.info (fun m -> m "init with verify %B" verify);
|
Logs.info (fun m -> m "init with verify %B" verify);
|
||||||
|
@ -262,51 +301,82 @@ module Make
|
||||||
Logs.warn (fun m -> m "unexpected dictionary at %s" name);
|
Logs.warn (fun m -> m "unexpected dictionary at %s" name);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| `Value ->
|
| `Value ->
|
||||||
let ( >|?= ) x f = Lwt_result.iter (fun v -> Lwt.return (f v)) x in
|
let open Mirage_crypto.Hash in
|
||||||
let _data = ref None in
|
let sha256_final =
|
||||||
let read_data () =
|
if verify then
|
||||||
match !_data with
|
let f s =
|
||||||
| Some cs -> Lwt.return (Ok cs)
|
let digest = SHA256.get s in
|
||||||
| None ->
|
|
||||||
incr idx;
|
|
||||||
KV.get dev (Mirage_kv.Key.v name) >|= function
|
|
||||||
| Error e ->
|
|
||||||
Logs.err (fun m -> m "error %a reading %s"
|
|
||||||
KV.pp_error e name);
|
|
||||||
Error ()
|
|
||||||
| Ok data ->
|
|
||||||
let cs = Cstruct.of_string data in
|
|
||||||
_data := Some cs;
|
|
||||||
Ok cs
|
|
||||||
in
|
|
||||||
begin
|
|
||||||
if verify then begin
|
|
||||||
read_data () >|?= fun cs ->
|
|
||||||
let digest = Mirage_crypto.Hash.digest `SHA256 cs in
|
|
||||||
if not (String.equal name (to_hex digest)) then
|
if not (String.equal name (to_hex digest)) then
|
||||||
Logs.err (fun m -> m "corrupt data, expected %s, read %s (should remove)"
|
Logs.err (fun m -> m "corrupt SHA256 data for %s, \
|
||||||
name (hex_to_string (Cstruct.to_string digest)));
|
computed %s (should remove)"
|
||||||
end else
|
name (to_hex digest))
|
||||||
Lwt.return_unit
|
in
|
||||||
end >>= fun () ->
|
Some f
|
||||||
begin
|
else
|
||||||
if not (SSet.mem name md5s) then begin
|
None
|
||||||
read_data () >|?= fun cs ->
|
and md5_final =
|
||||||
let md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex in
|
if not (SSet.mem name md5s) then
|
||||||
let md5s = SM.add md5 name t.md5s in
|
let f s =
|
||||||
t.md5s <- md5s
|
let digest = MD5.get s in
|
||||||
end else
|
t.md5s <- SM.add (to_hex digest) name t.md5s
|
||||||
Lwt.return_unit
|
in
|
||||||
end >>= fun () ->
|
Some f
|
||||||
begin
|
else if verify then
|
||||||
if not (SSet.mem name sha512s) then begin
|
let f s =
|
||||||
read_data () >|?= fun cs ->
|
let digest = MD5.get s |> to_hex in
|
||||||
let sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex in
|
match SM.find_opt digest t.md5s with
|
||||||
let sha512s = SM.add sha512 name t.sha512s in
|
| Some x when String.equal name x -> ()
|
||||||
t.sha512s <- sha512s
|
| y ->
|
||||||
end else
|
Logs.err (fun m -> m "corrupt MD5 data for %s, \
|
||||||
Lwt.return_unit
|
expected %a, computed %s"
|
||||||
end >|= fun () ->
|
name Fmt.(option ~none:(any "NONE") string) y
|
||||||
|
digest)
|
||||||
|
in
|
||||||
|
Some f
|
||||||
|
else
|
||||||
|
None
|
||||||
|
and sha512_final =
|
||||||
|
if not (SSet.mem name sha512s) then
|
||||||
|
let f s =
|
||||||
|
let digest = SHA512.get s in
|
||||||
|
t.sha512s <- SM.add (to_hex digest) name t.sha512s
|
||||||
|
in
|
||||||
|
Some f
|
||||||
|
else if verify then
|
||||||
|
let f s =
|
||||||
|
let digest = SHA512.get s |> to_hex in
|
||||||
|
match SM.find_opt digest t.sha512s with
|
||||||
|
| Some x when String.equal name x -> ()
|
||||||
|
| y ->
|
||||||
|
Logs.err (fun m -> m "corrupt SHA512 data for %s, \
|
||||||
|
expected %a, computed %s"
|
||||||
|
name Fmt.(option ~none:(any "NONE") string) y
|
||||||
|
digest)
|
||||||
|
in
|
||||||
|
Some f
|
||||||
|
else
|
||||||
|
None
|
||||||
|
in
|
||||||
|
match sha256_final, md5_final, sha512_final with
|
||||||
|
| None, None, None -> Lwt.return_unit
|
||||||
|
| _ ->
|
||||||
|
read_chunked t `SHA256 name
|
||||||
|
(fun (sha256, md5, sha512) data ->
|
||||||
|
let cs = Cstruct.of_string data in
|
||||||
|
Lwt.return
|
||||||
|
(Option.map (fun t -> SHA256.feed t cs) sha256,
|
||||||
|
Option.map (fun t -> MD5.feed t cs) md5,
|
||||||
|
Option.map (fun t -> SHA512.feed t cs) sha512))
|
||||||
|
(Option.map (fun _ -> SHA256.empty) sha256_final,
|
||||||
|
Option.map (fun _ -> MD5.empty) md5_final,
|
||||||
|
Option.map (fun _ -> SHA512.empty) sha512_final) >|= function
|
||||||
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a of %s while computing digests"
|
||||||
|
KV.pp_error e name)
|
||||||
|
| Ok (sha256, md5, sha512) ->
|
||||||
|
Option.iter (fun f -> f (Option.get sha256)) sha256_final;
|
||||||
|
Option.iter (fun f -> f (Option.get md5)) md5_final;
|
||||||
|
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|
||||||
Logs.info (fun m -> m "added %s" name))
|
Logs.info (fun m -> m "added %s" name))
|
||||||
entries >>= fun () ->
|
entries >>= fun () ->
|
||||||
update_caches t >|= fun () ->
|
update_caches t >|= fun () ->
|
||||||
|
@ -344,17 +414,6 @@ module Make
|
||||||
end else
|
end else
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
let find_key t h key =
|
|
||||||
match
|
|
||||||
match h with
|
|
||||||
| `MD5 -> SM.find_opt key t.md5s
|
|
||||||
| `SHA512 -> SM.find_opt key t.sha512s
|
|
||||||
| `SHA256 -> Some key
|
|
||||||
| _ -> None
|
|
||||||
with
|
|
||||||
| None -> Error `Not_found
|
|
||||||
| Some x -> Ok x
|
|
||||||
|
|
||||||
let exists t h v =
|
let exists t h v =
|
||||||
match find_key t h v with
|
match find_key t h v with
|
||||||
| Error _ -> Lwt.return false
|
| Error _ -> Lwt.return false
|
||||||
|
@ -371,17 +430,6 @@ module Make
|
||||||
(hash_to_string h) v KV.pp_error e);
|
(hash_to_string h) v KV.pp_error e);
|
||||||
false
|
false
|
||||||
|
|
||||||
let read t h v =
|
|
||||||
match find_key t h v with
|
|
||||||
| Error _ as e -> Lwt.return e
|
|
||||||
| Ok x ->
|
|
||||||
KV.get t.dev (Mirage_kv.Key.v x) >|= function
|
|
||||||
| Ok data -> Ok data
|
|
||||||
| Error e ->
|
|
||||||
Logs.err (fun m -> m "error %a while reading %s %s"
|
|
||||||
KV.pp_error e (hash_to_string h) v);
|
|
||||||
Error `Not_found
|
|
||||||
|
|
||||||
let last_modified t h v =
|
let last_modified t h v =
|
||||||
match find_key t h v with
|
match find_key t h v with
|
||||||
| Error _ as e -> Lwt.return e
|
| Error _ as e -> Lwt.return e
|
||||||
|
@ -389,7 +437,18 @@ module Make
|
||||||
KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function
|
KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function
|
||||||
| Ok data -> Ok data
|
| Ok data -> Ok data
|
||||||
| Error e ->
|
| Error e ->
|
||||||
Logs.err (fun m -> m "error %a while reading %s %s"
|
Logs.err (fun m -> m "error %a while last_modified %s %s"
|
||||||
|
KV.pp_error e (hash_to_string h) v);
|
||||||
|
Error `Not_found
|
||||||
|
|
||||||
|
let size t h v =
|
||||||
|
match find_key t h v with
|
||||||
|
| Error _ as e -> Lwt.return e
|
||||||
|
| Ok x ->
|
||||||
|
KV.size t.dev (Mirage_kv.Key.v x) >|= function
|
||||||
|
| Ok s -> Ok s
|
||||||
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a while size %s %s"
|
||||||
KV.pp_error e (hash_to_string h) v);
|
KV.pp_error e (hash_to_string h) v);
|
||||||
Error `Not_found
|
Error `Not_found
|
||||||
end
|
end
|
||||||
|
@ -626,22 +685,30 @@ stamp: %S
|
||||||
respond_with_empty reqd resp;
|
respond_with_empty reqd resp;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
else
|
else
|
||||||
Disk.read store h hash >>= function
|
Disk.size store h hash >>= function
|
||||||
| Error _ ->
|
| Error _ ->
|
||||||
|
Logs.warn (fun m -> m "error retrieving size");
|
||||||
not_found reqd request.Httpaf.Request.target;
|
not_found reqd request.Httpaf.Request.target;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok data ->
|
| Ok size ->
|
||||||
|
let size = string_of_int size in
|
||||||
let mime_type = "application/octet-stream" in
|
let mime_type = "application/octet-stream" in
|
||||||
let headers = [
|
let headers = [
|
||||||
"content-type", mime_type ;
|
"content-type", mime_type ;
|
||||||
"etag", hash ;
|
"etag", hash ;
|
||||||
"last-modified", last_modified ;
|
"last-modified", last_modified ;
|
||||||
"content-length", string_of_int (String.length data) ;
|
"content-length", size ;
|
||||||
] in
|
]
|
||||||
|
in
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data ;
|
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
||||||
Lwt.return_unit)
|
Disk.read_chunked store h hash (fun () chunk ->
|
||||||
|
let wait, wakeup = Lwt.task () in
|
||||||
|
Httpaf.Body.write_string body chunk;
|
||||||
|
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
||||||
|
wait) () >|= fun _ ->
|
||||||
|
Httpaf.Body.close_writer body)
|
||||||
end
|
end
|
||||||
| _ ->
|
| _ ->
|
||||||
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
||||||
|
|
Loading…
Reference in a new issue