From 8e326ecbc573b517a1dfcad3180f4085cfe23ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reynir=20Bj=C3=B6rnsson?= Date: Wed, 25 Jan 2023 11:34:31 +0100 Subject: [PATCH] WIP partial writes If possible, downloads are streamed to disk in the /pending/ directory in the tar filesystem. If the download is successful and the checksums match the file is renamed to its sha256 hash. Otherwise, it is moved under /to-delete/ so it can be deleted by an operator. Before downloading we check if it has been downloaded before, but we need to check as well if it is being downloaded (in /pending/) or if we unsuccessfully downloaded it before (e.g. failed checksum, stored in /to-delete/). It is not very elegant code, and it could do with a thorough review or rewrite. --- mirage/unikernel.ml | 261 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 235 insertions(+), 26 deletions(-) diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index f503b62..9d8daf5 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -20,7 +20,24 @@ module Make module HM = Map.Make(struct type t = Mirage_crypto.Hash.hash - let compare = compare (* TODO remove polymorphic compare *) + let compare h h' = + match h, h' with + | `SHA512, `SHA512 -> 0 + | `SHA512, _ -> 1 + | _, `SHA512 -> -1 + | `SHA384, `SHA384 -> 0 + | `SHA384, _ -> 1 + | _, `SHA384 -> -1 + | `SHA256, `SHA256 -> 0 + | `SHA256, _ -> 1 + | _, `SHA256 -> -1 + | `SHA224, `SHA224 -> 0 + | `SHA224, _ -> 1 + | _, `SHA224 -> -1 + | `SHA1, `SHA1 -> 0 + | `SHA1, `MD5 -> 1 + | `MD5, `MD5 -> 0 + | `MD5, _ -> -1 end) let hash_to_string = function @@ -209,6 +226,10 @@ module Make dev_sha512s : Cache.t ; } + let pending = Mirage_kv.Key.v "pending" + + let to_delete = Mirage_kv.Key.v "to-delete" + let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s } let to_hex d = @@ -238,6 +259,7 @@ module Make Lwt.return_unit let find_key t h key = + assert (List.length (Mirage_kv.Key.segments key) = 1); match match h with | `MD5 -> @@ -277,6 +299,182 @@ module Make in read_more a Optint.Int63.zero + module Running_hash = struct + type _ t = + | MD5 : Mirage_crypto.Hash.MD5.t -> [> `MD5 ] t + | SHA1 : Mirage_crypto.Hash.SHA1.t -> [> `SHA1 ] t + | SHA224 : Mirage_crypto.Hash.SHA224.t -> [> `SHA224 ] t + | SHA256 : Mirage_crypto.Hash.SHA256.t -> [> `SHA256 ] t + | SHA384 : Mirage_crypto.Hash.SHA384.t -> [> `SHA384 ] t + | SHA512 : Mirage_crypto.Hash.SHA512.t -> [> `SHA512 ] t + + let empty : _ -> _ t = function + | `MD5 -> MD5 Mirage_crypto.Hash.MD5.empty + | `SHA1 -> SHA1 Mirage_crypto.Hash.SHA1.empty + | `SHA224 -> SHA224 Mirage_crypto.Hash.SHA224.empty + | `SHA256 -> SHA256 Mirage_crypto.Hash.SHA256.empty + | `SHA384 -> SHA384 Mirage_crypto.Hash.SHA384.empty + | `SHA512 -> SHA512 Mirage_crypto.Hash.SHA512.empty + + let feed t data = + let open Mirage_crypto.Hash in + match t with + | MD5 t -> MD5 (MD5.feed t data) + | SHA1 t -> SHA1 (SHA1.feed t data) + | SHA224 t -> SHA224 (SHA224.feed t data) + | SHA256 t -> SHA256 (SHA256.feed t data) + | SHA384 t -> SHA384 (SHA384.feed t data) + | SHA512 t -> SHA512 (SHA512.feed t data) + + let get t = + let open Mirage_crypto.Hash in + match t with + | MD5 t -> MD5.get t + | SHA1 t -> SHA1.get t + | SHA224 t -> SHA224.get t + | SHA256 t -> SHA256.get t + | SHA384 t -> SHA384.get t + | SHA512 t -> SHA512.get t + end + + type 'a digests = { + md5 : Mirage_crypto.Hash.MD5.t; + sha256 : Mirage_crypto.Hash.SHA256.t; + sha512 : Mirage_crypto.Hash.SHA512.t; + csum : 'a Running_hash.t; + } + + let empty_digests h = + let open Mirage_crypto.Hash in + { + md5 = MD5.empty; + sha256 = SHA256.empty; + sha512 = SHA512.empty; + csum = Running_hash.empty h; + } + + let update_digests { md5; sha256; sha512; csum } data = + let open Mirage_crypto.Hash in + let data = Cstruct.of_string data in + { + md5 = MD5.feed md5 data; + sha256 = SHA256.feed sha256 data; + sha512 = SHA512.feed sha512 data; + csum = Running_hash.feed csum data; + } + + let init_write csums = + let hash, csum = HM.max_binding csums in + (hash, csum), Ok (empty_digests hash, Optint.Int63.zero, "") + + let content_length_of_string s = + match Int64.of_string s with + | len when len >= 0L -> `Fixed len + | _ | exception _ -> `Bad_response + + let body_length headers = + match H2.Headers.get_multi headers "content-length" with + | [] -> `Unknown + | [ x ] -> content_length_of_string x + | hd :: tl -> + (* if there are multiple content-length headers we require them all to be + * exactly equal. *) + if List.for_all (String.equal hd) tl then + content_length_of_string hd + else + `Bad_response + + let body_length (response : Http_mirage_client.response) = + if response.status <> `OK then + `Bad_response + else + body_length response.headers + + let pending_key (hash, csum) = + match hash with + | `SHA512 -> + (* We can't use hex because the filename would become too long for tar *) + Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum) + | _ -> + Mirage_kv.Key.(pending / hash_to_string hash / hex_to_string csum) + + let to_delete_key (hash, csum) = + let rand = "random" in (* FIXME: generate random string *) + let encoded_csum = + match hash with + | `SHA512 -> + (* We can't use hex because the filename would become too long for tar *) + Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum + | _ -> + hex_to_string csum + in + Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand)) + + let write_partial t (hash, csum) : _ -> (_ * Optint.Int63.t * string, _) result -> string -> _ result Lwt.t = + (* XXX: we may be in trouble if different hash functions are used for the same archive *) + let key = pending_key (hash, csum) in + let ( >>>= ) = Lwt_result.bind in + fun response r data -> + Lwt.return r >>>= fun (digests, offset, body) -> + let len = String.length data in + match body_length response with + | `Bad_response -> Lwt.return (Error `Bad_response) + | `Fixed size -> + begin if Optint.Int63.equal offset Optint.Int63.zero then + KV.allocate t.dev key (Optint.Int63.of_int64 size) + |> Lwt_result.map_error (fun e -> `Write_error e) + else + Lwt.return (Ok ()) + end >>>= fun () -> + KV.set_partial t.dev key ~offset data + |> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () -> + let digests = update_digests digests data in + Lwt.return_ok (digests, Optint.Int63.(add offset (of_int len)), body) + | `Unknown -> + let digests = update_digests digests data in + Lwt.return_ok (digests, Optint.Int63.(add offset (of_int len)), body ^ data) + + let digests_to_hm digests = + HM.empty + |> HM.add `MD5 + (Cstruct.to_string (Mirage_crypto.Hash.MD5.get digests.md5)) + |> HM.add `SHA256 + (Cstruct.to_string (Mirage_crypto.Hash.SHA256.get digests.sha256)) + |> HM.add `SHA512 + (Cstruct.to_string (Mirage_crypto.Hash.SHA512.get digests.sha512)) + + let check_csums_digests csums digests = + let csums' = digests_to_hm digests in + let common_bindings = List.filter (fun (h, _) -> HM.mem h csums) (HM.bindings csums') in + List.length common_bindings > 0 && + List.for_all + (fun (h, csum) -> String.equal csum (HM.find h csums)) + common_bindings + + let finalize_write t (hash, csum) csums digests = + let source = pending_key (hash, csum) in + if check_csums_digests csums digests then + let sha256 = to_hex (Mirage_crypto.Hash.SHA256.get digests.sha256) + and md5 = to_hex (Mirage_crypto.Hash.MD5.get digests.md5) + and sha512 = to_hex (Mirage_crypto.Hash.SHA512.get digests.sha512) in + let dest = Mirage_kv.Key.v sha256 in + KV.rename t.dev ~source ~dest >|= function + | Ok () -> + t.md5s <- SM.add md5 sha256 t.md5s; + t.sha512s <- SM.add sha512 sha256 t.sha512s; + Ok () + | Error e -> Error (`Write_error e) + else + let dest = to_delete_key (hash, csum) in + (* if the checksums mismatch we need to mark the file for deletion *) + KV.rename t.dev ~source ~dest >|= function + | Ok () -> Ok () + | Error e -> + Logs.warn (fun m -> m "Error renaming file %a -> %a: %a" + Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e); + Error (`Bad_checksum (hash, csum)) + + (* on disk, we use a flat file system where the filename is the sha256 of the data *) let init ~verify_sha256 dev dev_md5s dev_sha512s = KV.list dev Mirage_kv.Key.empty >>= function @@ -300,6 +498,12 @@ module Make let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in let idx = ref 1 in + (* XXX: should we do something about pending downloads?? *) + let entries = + List.filter (fun (p, _) -> + not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete)) + entries + in Lwt_list.iter_s (fun (path, typ) -> if !idx mod 10 = 0 then Gc.full_major () ; match typ with @@ -364,25 +568,13 @@ module Make update_caches t >|= fun () -> t - let write t ~url data hm = + let write t ~url data hm digests = let cs = Cstruct.of_string data in let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex in - if - HM.for_all (fun h v -> - let v' = - match h with `MD5 -> md5 | `SHA256 -> sha256 | `SHA512 -> sha512 | _ -> assert false - in - let v = hex_to_string v in - if String.equal v v' then - true - else begin - Logs.err (fun m -> m "%s hash mismatch %s: expected %s, got %s" url - (hash_to_string h) v v'); - false - end) hm + if check_csums_digests hm digests then begin KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function | Ok () -> @@ -393,8 +585,10 @@ module Make | Error e -> Logs.err (fun m -> m "error %a while writing %s (key %s)" KV.pp_write_error e url sha256) - end else + end else begin + Logs.err (fun m -> m "Bad checksum for %s" url); Lwt.return_unit + end let exists t h v = match find_key t h v with @@ -706,12 +900,14 @@ stamp: %S let bad_archives = SSet.of_list Bad.archives let download_archives disk http_client store = + (* FIXME: handle resuming partial downloads *) Git.find_urls store >>= fun urls -> let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in let idx = ref 0 in Lwt_list.iter_p (fun (url, csums) -> Lwt_pool.use pool @@ fun () -> + (* FIXME: check pending and to-delete *) HM.fold (fun h v r -> r >>= function | true -> Disk.exists disk h (hex_to_key v) @@ -724,16 +920,29 @@ stamp: %S incr idx; if !idx mod 10 = 0 then Gc.full_major () ; Logs.info (fun m -> m "downloading %s" url); - let body _response acc data = Lwt.return (acc ^ data) in - Http_mirage_client.request http_client url body "" >>= function - | Ok (resp, body) -> - if resp.status = `OK then begin - Logs.info (fun m -> m "downloaded %s" url); - Disk.write disk ~url body csums - end else begin - Logs.warn (fun m -> m "%s: %a (reason %s)" - url H2.Status.pp_hum resp.status resp.reason); - Lwt.return_unit + let quux, body_init = Disk.init_write csums in + Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function + | Ok (resp, r) -> + begin match Disk.body_length resp, r with + | `Bad_response, _ | _, Error `Bad_response -> + Logs.warn (fun m -> m "%s: %a (reason %s)" + url H2.Status.pp_hum resp.status resp.reason); + Lwt.return_unit + | _, Error `Write_error e -> + Logs.warn (fun m -> m "%s: write error %a" + url KV.pp_write_error e); + Lwt.return_unit + | `Unknown, Ok (digests, _, body) -> + Logs.info (fun m -> m "downloaded %s, now writing..." url); + Disk.write disk ~url body csums digests + | `Fixed _size, Ok (digests, _, _) -> + Logs.info (fun m -> m "downloaded %s" url); + Disk.finalize_write disk quux csums digests >|= function + | Ok () -> () + | Error (`Write_error e) -> + Logs.warn (fun m -> m "Error writing %s: %a" url KV.pp_write_error e) + | Error `Bad_checksum (hash, csum) -> + Logs.err (fun m -> m "%s hash mismatch, expected %s:%s" url (hash_to_string hash) (hex_to_string csum)) end | _ -> Lwt.return_unit) (SM.bindings urls) >>= fun () ->