always use swap, remove the pending / to_delete stuff

This commit is contained in:
Hannes Mehnert 2024-11-04 16:13:36 +01:00
parent 456340562d
commit e59f02a16f
2 changed files with 23 additions and 182 deletions

View file

@ -52,7 +52,7 @@ let update_digests { md5; sha256; sha512 } data =
let init_write csums =
let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests, `Init)
(hash, csum), empty_digests
let digests_to_hm digests =
HM.empty

View file

@ -198,10 +198,6 @@ module Make
dev_swap : Swap.t ;
}
let pending = Mirage_kv.Key.v "pending"
let to_delete = Mirage_kv.Key.v "to-delete"
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
let marshal_sm (sm : string SM.t) =
@ -267,118 +263,28 @@ module Make
in
read_more a Optint.Int63.zero
(*
module HM_running = struct
let empty h =
let module H = (val Mirage_crypto.Hash.module_of h) in
(* We need MD5, SHA256 and SHA512. [h] is likely one of the
aforementioned and in that case we don't compute the same hash twice
*)
HM.empty
|> HM.add `MD5 Mirage_crypto.Hash.MD5.empty
|> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty
|> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty
|> HM.add h H.empty
let feed t data =
HM.map (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.feed v data)
t
let get =
HM.map (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.get v)
end
*)
let content_length_of_string s =
match Int64.of_string s with
| len when len >= 0L -> `Fixed len
| _ | exception _ -> `Bad_response
let body_length headers =
match H2.Headers.get_multi headers "content-length" with
| [] -> `Unknown
| [ x ] -> content_length_of_string x
| hd :: tl ->
(* if there are multiple content-length headers we require them all to be
* exactly equal. *)
if List.for_all (String.equal hd) tl then
content_length_of_string hd
else
`Bad_response
let body_length (response : Http_mirage_client.response) =
if response.status <> `OK then
`Bad_response
else
body_length response.headers
let pending_key (hash, csum) =
match hash with
let key_of_hash_csum (hash, csum) = match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
Mirage_kv.Key.(v (hash_to_string hash) / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
| _ ->
Mirage_kv.Key.(pending / hash_to_string hash / Ohex.encode csum)
Mirage_kv.Key.(v (hash_to_string hash) / Ohex.encode csum)
let to_delete_key (hash, csum) =
let rand = "random" in (* FIXME: generate random string *)
let encoded_csum =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum
| _ ->
Ohex.encode csum
in
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
let init_write t csums =
let quux, csums = Archive_checksum.init_write csums in
let swap = Swap.empty t.dev_swap in
quux, Ok (csums, swap)
let write_partial t (hash, csum) url =
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
let key = pending_key (hash, csum) in
let ( >>>= ) = Lwt_result.bind in
fun response r data ->
Lwt.return r >>>= fun (digests, acc) ->
Lwt.return r >>>= fun (digests, swap) ->
let digests = Archive_checksum.update_digests digests data in
match acc with
| `Init ->
begin match body_length response with
| `Bad_response -> Lwt.return (Error `Bad_response)
| `Fixed size ->
KV.allocate t.dev key (Optint.Int63.of_int64 size)
|> Lwt_result.map_error (fun e -> `Write_error e)
>>>= fun () ->
KV.set_partial t.dev key ~offset:Optint.Int63.zero data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.of_int len in
active_length url len (Int64.to_string size ^ " bytes");
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown ->
active_add_bytes url (String.length data);
let h = Swap.empty t.dev_swap in
Swap.append h data >|= function
| Ok () -> Ok (digests, `Unknown h)
| Error swap_err -> Error (`Swap swap_err)
end
| `Fixed_body (size, offset) ->
KV.set_partial t.dev key ~offset data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.(add offset (of_int len)) in
active_add_bytes url len;
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown h ->
active_add_bytes url (String.length data);
Swap.append h data >|= function
| Ok () -> Ok (digests, `Unknown h)
| Error swap_err -> Error (`Swap swap_err)
active_add_bytes url (String.length data);
Swap.append swap data >|= function
| Ok () -> Ok (digests, swap)
| Error swap_err -> Error (`Swap swap_err)
let check_csums_digests csums digests =
let csums' = Archive_checksum.digests_to_hm digests in
@ -413,34 +319,16 @@ module Make
| Error e ->
Lwt.return (Error (`Write_error e))
let finalize_write t (hash, csum) ~url (body : [ `Unknown of Swap.handle | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests =
let sizes_match, body_size_in_header =
match body with
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true
| `Unknown _ -> true, false
| `Init -> assert false
in
let source = pending_key (hash, csum) in
if check_csums_digests csums digests && sizes_match then
let finalize_write t (hash, csum) ~url swap csums digests =
if check_csums_digests csums digests then
let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256))
and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5))
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
let dest = Mirage_kv.Key.v sha256 in
begin match body with
| `Unknown h ->
Logs.info (fun m -> m "downloaded %s, now writing" url);
Lwt_result.bind
(Lwt.finalize (fun () -> set_from_handle t.dev source h)
(fun () -> Swap.free h))
(fun () ->
KV.rename t.dev ~source ~dest
|> Lwt_result.map_error (fun e -> `Write_error e))
| `Fixed_body (_reported_size, _actual_size) ->
Logs.info (fun m -> m "downloaded %s" url);
KV.rename t.dev ~source ~dest
|> Lwt_result.map_error (fun e -> `Write_error e)
| `Init -> assert false
end >|= function
Logs.info (fun m -> m "downloaded %s, now writing" url);
(Lwt.finalize (fun () -> set_from_handle t.dev dest swap)
(fun () -> Swap.free swap))
>|= function
| Ok () ->
remove_active url;
t.md5s <- SM.add md5 sha256 t.md5s;
@ -453,45 +341,7 @@ module Make
Logs.err (fun m -> m "Write failure for %s: %a" url pp_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Write failure for %s: %a" url pp_error e)
else begin
(if sizes_match then begin
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum));
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum))
end else match body with
| `Fixed_body (reported, actual) ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported);
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported)
| `Unknown _ -> assert false
| `Init -> assert false);
if body_size_in_header then
(* if the checksums mismatch we want to delete the file. We are only
able to do so if it was the latest created file, so we expect and
error. Ideally, we want to match for `Append_only or other errors *)
KV.remove t.dev source >>= function
| Ok () -> Lwt.return_unit
| Error e ->
(* we failed to delete the file so we mark it for deletion *)
let dest = to_delete_key (hash, csum) in
Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a"
Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest);
KV.rename t.dev ~source ~dest >|= function
| Ok () -> ()
| Error e ->
Logs.warn (fun m -> m "Error renaming file %a -> %a: %a"
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e)
else
Lwt.return_unit
end
else Lwt.return_unit
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
@ -516,12 +366,6 @@ module Make
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
let idx = ref 1 in
(* XXX: should we do something about pending downloads?? *)
let entries =
List.filter (fun (p, _) ->
not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete))
entries
in
Lwt_list.iter_s (fun (path, typ) ->
if !idx mod 10 = 0 then Gc.full_major () ;
match typ with
@ -961,7 +805,6 @@ stamp: %S
let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () ->
(* FIXME: check pending and to-delete *)
HM.fold (fun h v r ->
r >>= function
| true -> Disk.exists disk h (hex_to_key v)
@ -974,7 +817,7 @@ stamp: %S
incr idx;
if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url);
let quux, body_init = Archive_checksum.init_write csums in
let quux, body_init = Disk.init_write disk csums in
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
| Ok (resp, r) ->
@ -986,17 +829,15 @@ stamp: %S
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason);
Lwt.return_unit
| Error `Write_error e ->
Logs.err (fun m -> m "%s: write error %a %a"
Logs.err (fun m -> m "%s: write error %a"
url
Mirage_kv.Key.pp (Disk.pending_key quux)
KV.pp_write_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "write error: %a" KV.pp_write_error e);
Lwt.return_unit
| Error `Swap e ->
Logs.err (fun m -> m "%s: swap error %a %a"
Logs.err (fun m -> m "%s: swap error %a"
url
Mirage_kv.Key.pp (Disk.pending_key quux)
Swap.pp_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "swap error: %a" Swap.pp_error e);