WIP partial writes

If possible, downloads are streamed to disk in the /pending/ directory
in the tar filesystem. If the download is successful and the checksums
match the file is renamed to its sha256 hash. Otherwise, it is moved
under /to-delete/ so it can be deleted by an operator.

Before downloading we check if it has been downloaded before, but we
need to check as well if it is being downloaded (in /pending/) or if we
unsuccessfully downloaded it before (e.g. failed checksum, stored in
/to-delete/).

It is not very elegant code, and it could do with a thorough review or
rewrite.
This commit is contained in:
Reynir Björnsson 2023-01-25 11:34:31 +01:00
parent b76f2997f5
commit 8e326ecbc5

View file

@ -20,7 +20,24 @@ module Make
module HM = Map.Make(struct module HM = Map.Make(struct
type t = Mirage_crypto.Hash.hash type t = Mirage_crypto.Hash.hash
let compare = compare (* TODO remove polymorphic compare *) let compare h h' =
match h, h' with
| `SHA512, `SHA512 -> 0
| `SHA512, _ -> 1
| _, `SHA512 -> -1
| `SHA384, `SHA384 -> 0
| `SHA384, _ -> 1
| _, `SHA384 -> -1
| `SHA256, `SHA256 -> 0
| `SHA256, _ -> 1
| _, `SHA256 -> -1
| `SHA224, `SHA224 -> 0
| `SHA224, _ -> 1
| _, `SHA224 -> -1
| `SHA1, `SHA1 -> 0
| `SHA1, `MD5 -> 1
| `MD5, `MD5 -> 0
| `MD5, _ -> -1
end) end)
let hash_to_string = function let hash_to_string = function
@ -209,6 +226,10 @@ module Make
dev_sha512s : Cache.t ; dev_sha512s : Cache.t ;
} }
let pending = Mirage_kv.Key.v "pending"
let to_delete = Mirage_kv.Key.v "to-delete"
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s } let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
let to_hex d = let to_hex d =
@ -238,6 +259,7 @@ module Make
Lwt.return_unit Lwt.return_unit
let find_key t h key = let find_key t h key =
assert (List.length (Mirage_kv.Key.segments key) = 1);
match match
match h with match h with
| `MD5 -> | `MD5 ->
@ -277,6 +299,182 @@ module Make
in in
read_more a Optint.Int63.zero read_more a Optint.Int63.zero
module Running_hash = struct
type _ t =
| MD5 : Mirage_crypto.Hash.MD5.t -> [> `MD5 ] t
| SHA1 : Mirage_crypto.Hash.SHA1.t -> [> `SHA1 ] t
| SHA224 : Mirage_crypto.Hash.SHA224.t -> [> `SHA224 ] t
| SHA256 : Mirage_crypto.Hash.SHA256.t -> [> `SHA256 ] t
| SHA384 : Mirage_crypto.Hash.SHA384.t -> [> `SHA384 ] t
| SHA512 : Mirage_crypto.Hash.SHA512.t -> [> `SHA512 ] t
let empty : _ -> _ t = function
| `MD5 -> MD5 Mirage_crypto.Hash.MD5.empty
| `SHA1 -> SHA1 Mirage_crypto.Hash.SHA1.empty
| `SHA224 -> SHA224 Mirage_crypto.Hash.SHA224.empty
| `SHA256 -> SHA256 Mirage_crypto.Hash.SHA256.empty
| `SHA384 -> SHA384 Mirage_crypto.Hash.SHA384.empty
| `SHA512 -> SHA512 Mirage_crypto.Hash.SHA512.empty
let feed t data =
let open Mirage_crypto.Hash in
match t with
| MD5 t -> MD5 (MD5.feed t data)
| SHA1 t -> SHA1 (SHA1.feed t data)
| SHA224 t -> SHA224 (SHA224.feed t data)
| SHA256 t -> SHA256 (SHA256.feed t data)
| SHA384 t -> SHA384 (SHA384.feed t data)
| SHA512 t -> SHA512 (SHA512.feed t data)
let get t =
let open Mirage_crypto.Hash in
match t with
| MD5 t -> MD5.get t
| SHA1 t -> SHA1.get t
| SHA224 t -> SHA224.get t
| SHA256 t -> SHA256.get t
| SHA384 t -> SHA384.get t
| SHA512 t -> SHA512.get t
end
type 'a digests = {
md5 : Mirage_crypto.Hash.MD5.t;
sha256 : Mirage_crypto.Hash.SHA256.t;
sha512 : Mirage_crypto.Hash.SHA512.t;
csum : 'a Running_hash.t;
}
let empty_digests h =
let open Mirage_crypto.Hash in
{
md5 = MD5.empty;
sha256 = SHA256.empty;
sha512 = SHA512.empty;
csum = Running_hash.empty h;
}
let update_digests { md5; sha256; sha512; csum } data =
let open Mirage_crypto.Hash in
let data = Cstruct.of_string data in
{
md5 = MD5.feed md5 data;
sha256 = SHA256.feed sha256 data;
sha512 = SHA512.feed sha512 data;
csum = Running_hash.feed csum data;
}
let init_write csums =
let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests hash, Optint.Int63.zero, "")
let content_length_of_string s =
match Int64.of_string s with
| len when len >= 0L -> `Fixed len
| _ | exception _ -> `Bad_response
let body_length headers =
match H2.Headers.get_multi headers "content-length" with
| [] -> `Unknown
| [ x ] -> content_length_of_string x
| hd :: tl ->
(* if there are multiple content-length headers we require them all to be
* exactly equal. *)
if List.for_all (String.equal hd) tl then
content_length_of_string hd
else
`Bad_response
let body_length (response : Http_mirage_client.response) =
if response.status <> `OK then
`Bad_response
else
body_length response.headers
let pending_key (hash, csum) =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
| _ ->
Mirage_kv.Key.(pending / hash_to_string hash / hex_to_string csum)
let to_delete_key (hash, csum) =
let rand = "random" in (* FIXME: generate random string *)
let encoded_csum =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum
| _ ->
hex_to_string csum
in
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
let write_partial t (hash, csum) : _ -> (_ * Optint.Int63.t * string, _) result -> string -> _ result Lwt.t =
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
let key = pending_key (hash, csum) in
let ( >>>= ) = Lwt_result.bind in
fun response r data ->
Lwt.return r >>>= fun (digests, offset, body) ->
let len = String.length data in
match body_length response with
| `Bad_response -> Lwt.return (Error `Bad_response)
| `Fixed size ->
begin if Optint.Int63.equal offset Optint.Int63.zero then
KV.allocate t.dev key (Optint.Int63.of_int64 size)
|> Lwt_result.map_error (fun e -> `Write_error e)
else
Lwt.return (Ok ())
end >>>= fun () ->
KV.set_partial t.dev key ~offset data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let digests = update_digests digests data in
Lwt.return_ok (digests, Optint.Int63.(add offset (of_int len)), body)
| `Unknown ->
let digests = update_digests digests data in
Lwt.return_ok (digests, Optint.Int63.(add offset (of_int len)), body ^ data)
let digests_to_hm digests =
HM.empty
|> HM.add `MD5
(Cstruct.to_string (Mirage_crypto.Hash.MD5.get digests.md5))
|> HM.add `SHA256
(Cstruct.to_string (Mirage_crypto.Hash.SHA256.get digests.sha256))
|> HM.add `SHA512
(Cstruct.to_string (Mirage_crypto.Hash.SHA512.get digests.sha512))
let check_csums_digests csums digests =
let csums' = digests_to_hm digests in
let common_bindings = List.filter (fun (h, _) -> HM.mem h csums) (HM.bindings csums') in
List.length common_bindings > 0 &&
List.for_all
(fun (h, csum) -> String.equal csum (HM.find h csums))
common_bindings
let finalize_write t (hash, csum) csums digests =
let source = pending_key (hash, csum) in
if check_csums_digests csums digests then
let sha256 = to_hex (Mirage_crypto.Hash.SHA256.get digests.sha256)
and md5 = to_hex (Mirage_crypto.Hash.MD5.get digests.md5)
and sha512 = to_hex (Mirage_crypto.Hash.SHA512.get digests.sha512) in
let dest = Mirage_kv.Key.v sha256 in
KV.rename t.dev ~source ~dest >|= function
| Ok () ->
t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s;
Ok ()
| Error e -> Error (`Write_error e)
else
let dest = to_delete_key (hash, csum) in
(* if the checksums mismatch we need to mark the file for deletion *)
KV.rename t.dev ~source ~dest >|= function
| Ok () -> Ok ()
| Error e ->
Logs.warn (fun m -> m "Error renaming file %a -> %a: %a"
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e);
Error (`Bad_checksum (hash, csum))
(* on disk, we use a flat file system where the filename is the sha256 of the data *) (* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s = let init ~verify_sha256 dev dev_md5s dev_sha512s =
KV.list dev Mirage_kv.Key.empty >>= function KV.list dev Mirage_kv.Key.empty >>= function
@ -300,6 +498,12 @@ module Make
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
let idx = ref 1 in let idx = ref 1 in
(* XXX: should we do something about pending downloads?? *)
let entries =
List.filter (fun (p, _) ->
not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete))
entries
in
Lwt_list.iter_s (fun (path, typ) -> Lwt_list.iter_s (fun (path, typ) ->
if !idx mod 10 = 0 then Gc.full_major () ; if !idx mod 10 = 0 then Gc.full_major () ;
match typ with match typ with
@ -364,25 +568,13 @@ module Make
update_caches t >|= fun () -> update_caches t >|= fun () ->
t t
let write t ~url data hm = let write t ~url data hm digests =
let cs = Cstruct.of_string data in let cs = Cstruct.of_string data in
let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex
and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex
in in
if if check_csums_digests hm digests
HM.for_all (fun h v ->
let v' =
match h with `MD5 -> md5 | `SHA256 -> sha256 | `SHA512 -> sha512 | _ -> assert false
in
let v = hex_to_string v in
if String.equal v v' then
true
else begin
Logs.err (fun m -> m "%s hash mismatch %s: expected %s, got %s" url
(hash_to_string h) v v');
false
end) hm
then begin then begin
KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function
| Ok () -> | Ok () ->
@ -393,8 +585,10 @@ module Make
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while writing %s (key %s)" Logs.err (fun m -> m "error %a while writing %s (key %s)"
KV.pp_write_error e url sha256) KV.pp_write_error e url sha256)
end else end else begin
Logs.err (fun m -> m "Bad checksum for %s" url);
Lwt.return_unit Lwt.return_unit
end
let exists t h v = let exists t h v =
match find_key t h v with match find_key t h v with
@ -706,12 +900,14 @@ stamp: %S
let bad_archives = SSet.of_list Bad.archives let bad_archives = SSet.of_list Bad.archives
let download_archives disk http_client store = let download_archives disk http_client store =
(* FIXME: handle resuming partial downloads *)
Git.find_urls store >>= fun urls -> Git.find_urls store >>= fun urls ->
let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in
let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in
let idx = ref 0 in let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) -> Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () -> Lwt_pool.use pool @@ fun () ->
(* FIXME: check pending and to-delete *)
HM.fold (fun h v r -> HM.fold (fun h v r ->
r >>= function r >>= function
| true -> Disk.exists disk h (hex_to_key v) | true -> Disk.exists disk h (hex_to_key v)
@ -724,16 +920,29 @@ stamp: %S
incr idx; incr idx;
if !idx mod 10 = 0 then Gc.full_major () ; if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url); Logs.info (fun m -> m "downloading %s" url);
let body _response acc data = Lwt.return (acc ^ data) in let quux, body_init = Disk.init_write csums in
Http_mirage_client.request http_client url body "" >>= function Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
| Ok (resp, body) -> | Ok (resp, r) ->
if resp.status = `OK then begin begin match Disk.body_length resp, r with
Logs.info (fun m -> m "downloaded %s" url); | `Bad_response, _ | _, Error `Bad_response ->
Disk.write disk ~url body csums
end else begin
Logs.warn (fun m -> m "%s: %a (reason %s)" Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason); url H2.Status.pp_hum resp.status resp.reason);
Lwt.return_unit Lwt.return_unit
| _, Error `Write_error e ->
Logs.warn (fun m -> m "%s: write error %a"
url KV.pp_write_error e);
Lwt.return_unit
| `Unknown, Ok (digests, _, body) ->
Logs.info (fun m -> m "downloaded %s, now writing..." url);
Disk.write disk ~url body csums digests
| `Fixed _size, Ok (digests, _, _) ->
Logs.info (fun m -> m "downloaded %s" url);
Disk.finalize_write disk quux csums digests >|= function
| Ok () -> ()
| Error (`Write_error e) ->
Logs.warn (fun m -> m "Error writing %s: %a" url KV.pp_write_error e)
| Error `Bad_checksum (hash, csum) ->
Logs.err (fun m -> m "%s hash mismatch, expected %s:%s" url (hash_to_string hash) (hex_to_string csum))
end end
| _ -> Lwt.return_unit) | _ -> Lwt.return_unit)
(SM.bindings urls) >>= fun () -> (SM.bindings urls) >>= fun () ->