From e59f02a16f41ec7f58c8dad1bfef61e8bcc66bf7 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Mon, 4 Nov 2024 16:13:36 +0100 Subject: [PATCH] always use swap, remove the pending / to_delete stuff --- mirage/archive_checksum.ml | 2 +- mirage/unikernel.ml | 203 ++++--------------------------------- 2 files changed, 23 insertions(+), 182 deletions(-) diff --git a/mirage/archive_checksum.ml b/mirage/archive_checksum.ml index fbff9a8..9e308b3 100644 --- a/mirage/archive_checksum.ml +++ b/mirage/archive_checksum.ml @@ -52,7 +52,7 @@ let update_digests { md5; sha256; sha512 } data = let init_write csums = let hash, csum = HM.max_binding csums in - (hash, csum), Ok (empty_digests, `Init) + (hash, csum), empty_digests let digests_to_hm digests = HM.empty diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index e95436e..6c203fc 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -198,10 +198,6 @@ module Make dev_swap : Swap.t ; } - let pending = Mirage_kv.Key.v "pending" - - let to_delete = Mirage_kv.Key.v "to-delete" - let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap } let marshal_sm (sm : string SM.t) = @@ -267,118 +263,28 @@ module Make in read_more a Optint.Int63.zero - (* - module HM_running = struct - - let empty h = - let module H = (val Mirage_crypto.Hash.module_of h) in - (* We need MD5, SHA256 and SHA512. [h] is likely one of the - aforementioned and in that case we don't compute the same hash twice - *) - HM.empty - |> HM.add `MD5 Mirage_crypto.Hash.MD5.empty - |> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty - |> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty - |> HM.add h H.empty - - let feed t data = - HM.map (fun h v -> - let module H = (val Mirage_crypto.Hash.module_of h) in - H.feed v data) - t - - let get = - HM.map (fun h v -> - let module H = (val Mirage_crypto.Hash.module_of h) in - H.get v) - - - end - *) - - let content_length_of_string s = - match Int64.of_string s with - | len when len >= 0L -> `Fixed len - | _ | exception _ -> `Bad_response - - let body_length headers = - match H2.Headers.get_multi headers "content-length" with - | [] -> `Unknown - | [ x ] -> content_length_of_string x - | hd :: tl -> - (* if there are multiple content-length headers we require them all to be - * exactly equal. *) - if List.for_all (String.equal hd) tl then - content_length_of_string hd - else - `Bad_response - - let body_length (response : Http_mirage_client.response) = - if response.status <> `OK then - `Bad_response - else - body_length response.headers - - let pending_key (hash, csum) = - match hash with + let key_of_hash_csum (hash, csum) = match hash with | `SHA512 -> (* We can't use hex because the filename would become too long for tar *) - Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum) + Mirage_kv.Key.(v (hash_to_string hash) / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum) | _ -> - Mirage_kv.Key.(pending / hash_to_string hash / Ohex.encode csum) + Mirage_kv.Key.(v (hash_to_string hash) / Ohex.encode csum) - let to_delete_key (hash, csum) = - let rand = "random" in (* FIXME: generate random string *) - let encoded_csum = - match hash with - | `SHA512 -> - (* We can't use hex because the filename would become too long for tar *) - Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum - | _ -> - Ohex.encode csum - in - Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand)) + let init_write t csums = + let quux, csums = Archive_checksum.init_write csums in + let swap = Swap.empty t.dev_swap in + quux, Ok (csums, swap) let write_partial t (hash, csum) url = (* XXX: we may be in trouble if different hash functions are used for the same archive *) - let key = pending_key (hash, csum) in let ( >>>= ) = Lwt_result.bind in fun response r data -> - Lwt.return r >>>= fun (digests, acc) -> + Lwt.return r >>>= fun (digests, swap) -> let digests = Archive_checksum.update_digests digests data in - match acc with - | `Init -> - begin match body_length response with - | `Bad_response -> Lwt.return (Error `Bad_response) - | `Fixed size -> - KV.allocate t.dev key (Optint.Int63.of_int64 size) - |> Lwt_result.map_error (fun e -> `Write_error e) - >>>= fun () -> - KV.set_partial t.dev key ~offset:Optint.Int63.zero data - |> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () -> - let len = String.length data in - let offset = Optint.Int63.of_int len in - active_length url len (Int64.to_string size ^ " bytes"); - Lwt.return_ok (digests, `Fixed_body (size, offset)) - | `Unknown -> - active_add_bytes url (String.length data); - let h = Swap.empty t.dev_swap in - Swap.append h data >|= function - | Ok () -> Ok (digests, `Unknown h) - | Error swap_err -> Error (`Swap swap_err) - end - | `Fixed_body (size, offset) -> - KV.set_partial t.dev key ~offset data - |> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () -> - let len = String.length data in - let offset = Optint.Int63.(add offset (of_int len)) in - active_add_bytes url len; - Lwt.return_ok (digests, `Fixed_body (size, offset)) - | `Unknown h -> - active_add_bytes url (String.length data); - Swap.append h data >|= function - | Ok () -> Ok (digests, `Unknown h) - | Error swap_err -> Error (`Swap swap_err) + active_add_bytes url (String.length data); + Swap.append swap data >|= function + | Ok () -> Ok (digests, swap) + | Error swap_err -> Error (`Swap swap_err) let check_csums_digests csums digests = let csums' = Archive_checksum.digests_to_hm digests in @@ -413,34 +319,16 @@ module Make | Error e -> Lwt.return (Error (`Write_error e)) - let finalize_write t (hash, csum) ~url (body : [ `Unknown of Swap.handle | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests = - let sizes_match, body_size_in_header = - match body with - | `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true - | `Unknown _ -> true, false - | `Init -> assert false - in - let source = pending_key (hash, csum) in - if check_csums_digests csums digests && sizes_match then + let finalize_write t (hash, csum) ~url swap csums digests = + if check_csums_digests csums digests then let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256)) and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5)) and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in let dest = Mirage_kv.Key.v sha256 in - begin match body with - | `Unknown h -> - Logs.info (fun m -> m "downloaded %s, now writing" url); - Lwt_result.bind - (Lwt.finalize (fun () -> set_from_handle t.dev source h) - (fun () -> Swap.free h)) - (fun () -> - KV.rename t.dev ~source ~dest - |> Lwt_result.map_error (fun e -> `Write_error e)) - | `Fixed_body (_reported_size, _actual_size) -> - Logs.info (fun m -> m "downloaded %s" url); - KV.rename t.dev ~source ~dest - |> Lwt_result.map_error (fun e -> `Write_error e) - | `Init -> assert false - end >|= function + Logs.info (fun m -> m "downloaded %s, now writing" url); + (Lwt.finalize (fun () -> set_from_handle t.dev dest swap) + (fun () -> Swap.free swap)) + >|= function | Ok () -> remove_active url; t.md5s <- SM.add md5 sha256 t.md5s; @@ -453,45 +341,7 @@ module Make Logs.err (fun m -> m "Write failure for %s: %a" url pp_error e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (Fmt.str "Write failure for %s: %a" url pp_error e) - else begin - (if sizes_match then begin - add_failed url (Ptime.v (Pclock.now_d_ps ())) - (Fmt.str "Bad checksum %s:%s: computed %s expected %s" url - (hash_to_string hash) - (Ohex.encode (Archive_checksum.get digests hash)) - (Ohex.encode csum)); - Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url - (hash_to_string hash) - (Ohex.encode (Archive_checksum.get digests hash)) - (Ohex.encode csum)) - end else match body with - | `Fixed_body (reported, actual) -> - add_failed url (Ptime.v (Pclock.now_d_ps ())) - (Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes" - url Optint.Int63.pp actual reported); - Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes" - url Optint.Int63.pp actual reported) - | `Unknown _ -> assert false - | `Init -> assert false); - if body_size_in_header then - (* if the checksums mismatch we want to delete the file. We are only - able to do so if it was the latest created file, so we expect and - error. Ideally, we want to match for `Append_only or other errors *) - KV.remove t.dev source >>= function - | Ok () -> Lwt.return_unit - | Error e -> - (* we failed to delete the file so we mark it for deletion *) - let dest = to_delete_key (hash, csum) in - Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a" - Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest); - KV.rename t.dev ~source ~dest >|= function - | Ok () -> () - | Error e -> - Logs.warn (fun m -> m "Error renaming file %a -> %a: %a" - Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e) - else - Lwt.return_unit - end + else Lwt.return_unit (* on disk, we use a flat file system where the filename is the sha256 of the data *) let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap = @@ -516,12 +366,6 @@ module Make let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in let idx = ref 1 in - (* XXX: should we do something about pending downloads?? *) - let entries = - List.filter (fun (p, _) -> - not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete)) - entries - in Lwt_list.iter_s (fun (path, typ) -> if !idx mod 10 = 0 then Gc.full_major () ; match typ with @@ -961,7 +805,6 @@ stamp: %S let idx = ref 0 in Lwt_list.iter_p (fun (url, csums) -> Lwt_pool.use pool @@ fun () -> - (* FIXME: check pending and to-delete *) HM.fold (fun h v r -> r >>= function | true -> Disk.exists disk h (hex_to_key v) @@ -974,7 +817,7 @@ stamp: %S incr idx; if !idx mod 10 = 0 then Gc.full_major () ; Logs.info (fun m -> m "downloading %s" url); - let quux, body_init = Archive_checksum.init_write csums in + let quux, body_init = Disk.init_write disk csums in add_to_active url (Ptime.v (Pclock.now_d_ps ())); Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function | Ok (resp, r) -> @@ -986,17 +829,15 @@ stamp: %S (Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason); Lwt.return_unit | Error `Write_error e -> - Logs.err (fun m -> m "%s: write error %a %a" + Logs.err (fun m -> m "%s: write error %a" url - Mirage_kv.Key.pp (Disk.pending_key quux) KV.pp_write_error e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (Fmt.str "write error: %a" KV.pp_write_error e); Lwt.return_unit | Error `Swap e -> - Logs.err (fun m -> m "%s: swap error %a %a" + Logs.err (fun m -> m "%s: swap error %a" url - Mirage_kv.Key.pp (Disk.pending_key quux) Swap.pp_error e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (Fmt.str "swap error: %a" Swap.pp_error e);