8ba4cfae00 | |||
9c50538877 | |||
a47193f147 | |||
1e35bfefbd | |||
a9b8f18192 | |||
2312092e42 | |||
4bec3bfbd8 | |||
f48cc19fc4 | |||
7689397ac3 | |||
e59f02a16f |
4 changed files with 270 additions and 540 deletions
@ -52,7 +52,7 @@ let update_digests { md5; sha256; sha512 } data =
let init_write csums =
let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests, `Init)
(hash, csum), empty_digests
let digests_to_hm digests =
@ -13,17 +13,22 @@ let hex_of_string s =
let decode_digest filename str =
let hex h s =
match hex_of_string s with
| Ok d -> Some (h, d)
| Error `Msg msg ->
Log.warn (fun m -> m "%s invalid hex (%s) %s" filename msg s); None
| Ok d -> Ok (h, d)
| Error `Msg msg as e ->
Log.warn (fun m -> m "%s invalid hex (%s) %s" filename msg s);
match String.split_on_char '=' str with
| [ data ] -> hex `MD5 data
| [ "md5" ; data ] -> hex `MD5 data
| [ "sha256" ; data ] -> hex `SHA256 data
| [ "sha512" ; data ] -> hex `SHA512 data
| [ hash ; _ ] -> Log.warn (fun m -> m "%s unknown hash %s" filename hash); None
| _ -> Log.warn (fun m -> m "%s unexpected hash format %S" filename str); None
| [ hash ; _ ] ->
Log.warn (fun m -> m "%s unknown hash %s" filename hash);
Error (`Msg ("unknown hash " ^ hash))
| _ ->
Log.warn (fun m -> m "%s unexpected hash format %S" filename str);
Error (`Msg ("unexpected hash format " ^ str))
let extract_url_checksum filename items =
let open OpamParserTypes.FullPos in
@ -42,64 +47,73 @@ let extract_url_checksum filename items =
let url =
match url, archive with
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Some url
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Some url
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
| _ ->
Log.warn (fun m -> m "%s neither src nor archive present" filename); None
Log.warn (fun m -> m "%s neither src nor archive present" filename);
Error (`Msg "neither 'src' nor 'archive' present")
let csum =
let csum, csum_errs =
match checksum with
| Some { pelem = Variable (_, { pelem = List { pelem = csums ; _ } ; _ }); _ } ->
let csums =
List.fold_left (fun acc ->
let csums, errs =
List.fold_left (fun (csums, errs) ->
| { pelem = String csum ; _ } ->
begin match decode_digest filename csum with
| None -> acc
| Some (h, v) ->
| Error e -> csums, e :: errs
| Ok (h, v) ->
HM.update h (function
| None -> Some v
| Some v' when String.equal v v' -> None
| Some v' ->
Log.warn (fun m -> m "for %s, hash %s, multiple keys are present: %s %s"
(Option.value ~default:"NONE" url) (hash_to_string h) (Ohex.encode v) (Ohex.encode v'));
(Result.value ~default:"NONE" url) (hash_to_string h) (Ohex.encode v) (Ohex.encode v'));
csums, errs
| _ -> acc) HM.empty csums
| v ->
csums, `Msg (Fmt.str "bad checksum data: %s" (OpamPrinter.FullPos.value v)) :: errs)
(HM.empty, []) csums
Some csums
if HM.is_empty csums then
match errs with
| hd :: tl -> Error hd, tl
| [] -> Error (`Msg "empty checksums"), []
Ok csums, errs
| Some { pelem = Variable (_, { pelem = String csum ; _ }) ; _ } ->
begin match decode_digest filename csum with
| None -> None
| Some (h, v) -> Some (HM.singleton h v)
| Error _ as e -> e, []
| Ok (h, v) -> Ok (HM.singleton h v), []
| _ ->
Log.warn (fun m -> m "couldn't decode checksum in %s" filename);
Error (`Msg "couldn't find or decode 'checksum'"), []
match url, csum with
| Some url, Some cs -> Some (url, cs)
| _ -> None
(match url, csum with
| Ok url, Ok csum -> Ok (url, csum)
| Error _ as e, _
| _, (Error _ as e) -> e), csum_errs
let extract_checksums_and_urls filename opam =
let open OpamParserTypes.FullPos in
List.fold_left (fun acc ->
List.fold_left (fun (csum_urls, errs) ->
| { pelem = Section ({ section_kind = { pelem = "url" ; _ } ; section_items = { pelem = items ; _ } ; _ }) ; _} ->
begin match extract_url_checksum filename items with
| None -> acc
| Some url -> url :: acc
| Error `Msg msg, errs' -> csum_urls, `Msg ("url: " ^ msg) :: errs' @ errs
| Ok url, errs' -> url :: csum_urls, errs' @ errs
| { pelem = Section ({ section_kind = { pelem = "extra-source" ; _ } ; section_name = Some { pelem ; _ } ; section_items = { pelem = items ; _ }; _ }) ; _} ->
Log.debug (fun m -> m "extracting for extra-source %s in %s" filename pelem);
match extract_url_checksum filename items with
| None -> acc
| Some url -> url :: acc
| Error `Msg msg, errs' -> csum_urls, `Msg ("extra-source " ^ pelem ^ " " ^ msg) :: errs' @ errs
| Ok url, errs' -> url :: csum_urls, errs' @ errs
| _ -> acc)
[] opam.file_contents
| _ -> csum_urls, errs)
([], []) opam.file_contents
let extract_urls filename str =
(* in an opam file, there may be:
@ -121,6 +135,6 @@ let extract_urls filename str =
if unavailable then
(Log.debug (fun m -> m "%s is marked unavailable, skipping" filename);
[], [])
extract_checksums_and_urls filename opamfile
@ -101,85 +101,89 @@ module Make
hash_to_string h ^ "=" ^ Ohex.encode v ^ "\n" ^ acc)
hm ""
module Git = struct
let find_contents store =
let rec go store path acc =
Store.list store path >>= function
| Error e ->
Logs.err (fun m -> m "error %a while listing %a"
Store.pp_error e Mirage_kv.Key.pp path);
Lwt.return acc
| Ok steps ->
Lwt_list.fold_left_s (fun acc (step, _) ->
Store.exists store step >>= function
| Error e ->
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e
Mirage_kv.Key.pp step);
Lwt.return acc
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
Lwt.return acc
| Ok Some `Value -> Lwt.return (step :: acc)
| Ok Some `Dictionary -> go store step acc) acc steps
go store Mirage_kv.Key.empty []
let parse_errors = ref SM.empty
let find_urls store =
find_contents store >>= fun paths ->
let opam_paths =
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths
let add_parse_error filename error =
parse_errors := SM.add filename error !parse_errors
module Git = struct
let contents store =
let explore = ref [ Mirage_kv.Key.empty ] in
let more () =
let rec go () =
match !explore with
| [] -> Lwt.return None
| step :: tl ->
explore := tl;
Store.exists store step >>= function
| Error e ->
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e
Mirage_kv.Key.pp step);
go ()
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
go ()
| Ok Some `Value -> Lwt.return (Some step)
| Ok Some `Dictionary ->
Store.list store step >>= function
| Error e ->
Logs.err (fun m -> m "error %a while listing %a"
Store.pp_error e Mirage_kv.Key.pp step);
go ()
| Ok steps ->
explore := fst steps @ !explore;
go ()
go ()
Lwt_list.fold_left_s (fun acc path ->
Store.get store path >|= function
| Ok data ->
(* TODO report parser errors *)
let url_csums = Opam_file.extract_urls (Mirage_kv.Key.to_string path) data in
List.fold_left (fun acc (url, csums) ->
if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
end) acc) acc url_csums
with _ ->
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
Lwt_stream.from more
let find_urls acc path data =
if Mirage_kv.Key.basename path = "opam" then
let path = Mirage_kv.Key.to_string path in
let url_csums, errs = Opam_file.extract_urls path data in
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
List.fold_left (fun acc (url, csums) ->
if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url);
add_parse_error path ("no checksums for " ^ url);
| Error e -> Logs.warn (fun m -> m "Store.get: %a" Store.pp_error e); acc)
SM.empty opam_paths
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
add_parse_error path (Fmt.str
"mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
end) acc) acc url_csums
let active_downloads = ref SM.empty
let add_to_active url ts =
active_downloads := SM.add url (ts, 0, "unknown size") !active_downloads
active_downloads := SM.add url (ts, 0) !active_downloads
let remove_active url =
active_downloads := SM.remove url !active_downloads
let active_length url written length =
match SM.find_opt url !active_downloads with
| None -> ()
| Some (ts, written', _) ->
active_downloads := SM.add url (ts, written + written', length)
let active_add_bytes url written =
match SM.find_opt url !active_downloads with
| None -> ()
| Some (ts, written', l) ->
active_downloads := SM.add url (ts, written + written', l)
| Some (ts, written') ->
active_downloads := SM.add url (ts, written + written')
let failed_downloads = ref SM.empty
@ -188,6 +192,36 @@ module Make
remove_active url;
failed_downloads := SM.add url (ts, reason) !failed_downloads
let pp_failed ppf = function
| `Write_error e ->
|||| ppf "write error: %a" KV.pp_write_error e
| `Swap e ->
|||| ppf "swap error: %a" Swap.pp_error e
| `Bad_checksum (hash, computed, expected) ->
|||| ppf "%s checksum: computed %s expected %s"
(hash_to_string hash)
(Ohex.encode computed)
(Ohex.encode expected)
| `Bad_response (status, reason) ->
|||| ppf "bad response: %a %s" H2.Status.pp_hum status reason
| `Mimic me ->
|||| ppf "mimic: %a" Mimic.pp_error me
let compare_failed a b = match a, b with
| `Write_error _, `Write_error _ -> 0
| `Write_error _, _ -> 1
| _, `Write_error _ -> -1
| `Swap _, `Swap _ -> 0
| `Swap _, _ -> 1
| _, `Swap _ -> -1
| `Bad_checksum _, `Bad_checksum _ -> 0
| `Bad_checksum _, _ -> 1
| _, `Bad_checksum _ -> -1
| `Bad_response _, `Bad_response _ -> 0
| `Bad_response _, _ -> 1
| _, `Bad_response _ -> -1
| `Mimic _, `Mimic _ -> 0
module Disk = struct
type t = {
mutable md5s : string SM.t ;
@ -198,10 +232,6 @@ module Make
dev_swap : Swap.t ;
let pending = Mirage_kv.Key.v "pending"
let to_delete = Mirage_kv.Key.v "to-delete"
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
let marshal_sm (sm : string SM.t) =
@ -267,118 +297,21 @@ module Make
read_more a
module HM_running = struct
let empty h =
let module H = (val Mirage_crypto.Hash.module_of h) in
(* We need MD5, SHA256 and SHA512. [h] is likely one of the
aforementioned and in that case we don't compute the same hash twice
|> HM.add `MD5 Mirage_crypto.Hash.MD5.empty
|> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty
|> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty
|> HM.add h H.empty
let feed t data =
|||| (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.feed v data)
let get =
|||| (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.get v)
let content_length_of_string s =
match Int64.of_string s with
| len when len >= 0L -> `Fixed len
| _ | exception _ -> `Bad_response
let body_length headers =
match H2.Headers.get_multi headers "content-length" with
| [] -> `Unknown
| [ x ] -> content_length_of_string x
| hd :: tl ->
(* if there are multiple content-length headers we require them all to be
* exactly equal. *)
if List.for_all (String.equal hd) tl then
content_length_of_string hd
let body_length (response : Http_mirage_client.response) =
if response.status <> `OK then
body_length response.headers
let pending_key (hash, csum) =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
| _ ->
Mirage_kv.Key.(pending / hash_to_string hash / Ohex.encode csum)
let to_delete_key (hash, csum) =
let rand = "random" in (* FIXME: generate random string *)
let encoded_csum =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum
| _ ->
Ohex.encode csum
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
let init_write t csums =
let quux, csums = Archive_checksum.init_write csums in
let swap = Swap.empty t.dev_swap in
quux, Ok (csums, swap)
let write_partial t (hash, csum) url =
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
let key = pending_key (hash, csum) in
let ( >>>= ) = Lwt_result.bind in
fun response r data ->
Lwt.return r >>>= fun (digests, acc) ->
Lwt.return r >>>= fun (digests, swap) ->
let digests = Archive_checksum.update_digests digests data in
match acc with
| `Init ->
begin match body_length response with
| `Bad_response -> Lwt.return (Error `Bad_response)
| `Fixed size ->
KV.allocate key (Optint.Int63.of_int64 size)
|> Lwt_result.map_error (fun e -> `Write_error e)
>>>= fun () ->
KV.set_partial key data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.of_int len in
active_length url len (Int64.to_string size ^ " bytes");
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown ->
active_add_bytes url (String.length data);
let h = Swap.empty t.dev_swap in
Swap.append h data >|= function
| Ok () -> Ok (digests, `Unknown h)
| Error swap_err -> Error (`Swap swap_err)
| `Fixed_body (size, offset) ->
KV.set_partial key ~offset data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.(add offset (of_int len)) in
active_add_bytes url len;
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown h ->
active_add_bytes url (String.length data);
Swap.append h data >|= function
| Ok () -> Ok (digests, `Unknown h)
| Error swap_err -> Error (`Swap swap_err)
active_add_bytes url (String.length data);
Swap.append swap data >|= function
| Ok () -> Ok (digests, swap)
| Error swap_err -> Error (`Swap swap_err)
let check_csums_digests csums digests =
let csums' = Archive_checksum.digests_to_hm digests in
@ -389,6 +322,12 @@ module Make
let set_from_handle dev dest h =
(* TODO: we need a function in tar which
(a) takes a path
(b) takes a function that reads (from the swap) and writes (to the tar)
(c) once the function is finished, it writes the tar header
-> this would allow us to avoid the rename stuff below
let size = Optint.Int63.of_int64 (Swap.size h) in
KV.allocate dev dest size >>= fun r ->
let rec loop offset =
@ -413,34 +352,20 @@ module Make
| Error e ->
Lwt.return (Error (`Write_error e))
let finalize_write t (hash, csum) ~url (body : [ `Unknown of Swap.handle | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests =
let sizes_match, body_size_in_header =
match body with
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true
| `Unknown _ -> true, false
| `Init -> assert false
let source = pending_key (hash, csum) in
if check_csums_digests csums digests && sizes_match then
let finalize_write t (hash, csum) ~url swap csums digests =
if check_csums_digests csums digests then
let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256))
and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5))
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
let dest = Mirage_kv.Key.v sha256 in
begin match body with
| `Unknown h ->
|||| (fun m -> m "downloaded %s, now writing" url);
(Lwt.finalize (fun () -> set_from_handle source h)
(fun () -> h))
(fun () ->
KV.rename ~source ~dest
|> Lwt_result.map_error (fun e -> `Write_error e))
| `Fixed_body (_reported_size, _actual_size) ->
|||| (fun m -> m "downloaded %s" url);
KV.rename ~source ~dest
|> Lwt_result.map_error (fun e -> `Write_error e)
| `Init -> assert false
end >|= function
|||| (fun m -> m "downloaded %s, now writing" url);
let temp = Mirage_kv.Key.(v "pending" // dest) in
(Lwt.finalize (fun () -> set_from_handle temp swap)
(fun () -> swap))
(fun () -> KV.rename ~source:temp ~dest
|> Lwt_result.map_error (fun e -> `Write_error e))
>|= function
| Ok () ->
remove_active url;
t.md5s <- SM.add md5 sha256 t.md5s;
@ -451,46 +376,17 @@ module Make
| `Swap e -> Swap.pp_error ppf e
Logs.err (fun m -> m "Write failure for %s: %a" url pp_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Write failure for %s: %a" url pp_error e)
match e with
| `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
| `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
else begin
(if sizes_match then begin
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum));
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum))
end else match body with
| `Fixed_body (reported, actual) ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported);
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported)
| `Unknown _ -> assert false
| `Init -> assert false);
if body_size_in_header then
(* if the checksums mismatch we want to delete the file. We are only
able to do so if it was the latest created file, so we expect and
error. Ideally, we want to match for `Append_only or other errors *)
KV.remove source >>= function
| Ok () -> Lwt.return_unit
| Error e ->
(* we failed to delete the file so we mark it for deletion *)
let dest = to_delete_key (hash, csum) in
Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a"
Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest);
KV.rename ~source ~dest >|= function
| Ok () -> ()
| Error e ->
Logs.warn (fun m -> m "Error renaming file %a -> %a: %a"
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e)
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(`Bad_checksum (hash, Archive_checksum.get digests hash, csum));
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum));
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
@ -516,12 +412,6 @@ module Make
let md5s = SSet.of_list ( snd (SM.bindings t.md5s))
and sha512s = SSet.of_list ( snd (SM.bindings t.sha512s)) in
let idx = ref 1 in
(* XXX: should we do something about pending downloads?? *)
let entries =
List.filter (fun (p, _) ->
not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete))
Lwt_list.iter_s (fun (path, typ) ->
if !idx mod 10 = 0 then Gc.full_major () ;
match typ with
@ -530,19 +420,7 @@ module Make
| `Value ->
let open Digestif in
let sha256_final =
if verify_sha256 then
let f s =
let digest = SHA256.(to_raw_string (get s)) in
if not (String.equal (Mirage_kv.Key.basename path) (Ohex.encode digest)) then
Logs.err (fun m -> m "corrupt SHA256 data for %a, \
computed %s (should remove)"
Mirage_kv.Key.pp path (Ohex.encode digest))
Some f
and md5_final =
let md5_final =
if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then
let f s =
let digest = MD5.(to_raw_string (get s)) in
@ -561,26 +439,53 @@ module Make
match sha256_final, md5_final, sha512_final with
| None, None, None -> Lwt.return_unit
| _ ->
let sha256_final =
let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in
if need_to_compute then
let f s =
let digest = SHA256.(to_raw_string (get s)) in
if not (String.equal (Mirage_kv.Key.basename path) (Ohex.encode digest)) then
Logs.err (fun m -> m "corrupt SHA256 data for %a, \
computed %s (will rename)"
Mirage_kv.Key.pp path (Ohex.encode digest));
end else true
Some f
match sha256_final with
| None -> Lwt.return_unit
| Some f ->
read_chunked t `SHA256 path
(fun (sha256, md5, sha512) data ->
( (fun t -> SHA256.feed_string t data) sha256,
(SHA256.feed_string sha256 data,
|||| (fun t -> MD5.feed_string t data) md5,
|||| (fun t -> SHA512.feed_string t data) sha512))
( (fun _ -> SHA256.empty) sha256_final,
|||| (fun _ -> MD5.empty) md5_final,
|||| (fun _ -> SHA512.empty) sha512_final) >|= function
|||| (fun _ -> SHA512.empty) sha512_final) >>= function
| Error e ->
Logs.err (fun m -> m "error %a of %a while computing digests"
KV.pp_error e Mirage_kv.Key.pp path)
KV.pp_error e Mirage_kv.Key.pp path);
| Ok (sha256, md5, sha512) ->
Option.iter (fun f -> f (Option.get sha256)) sha256_final;
Option.iter (fun f -> f (Option.get md5)) md5_final;
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|||| (fun m -> m "added %a" Mirage_kv.Key.pp path))
if not (f sha256) then
(* bad sha256! *)
KV.rename ~source:path ~dest:(Mirage_kv.Key.(v "delete" // path)) >|= function
| Ok () -> ()
| Error we ->
Logs.err (fun m -> m "error %a while renaming %a" KV.pp_write_error we
Mirage_kv.Key.pp path)
else begin
Option.iter (fun f -> f (Option.get md5)) md5_final;
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|||| (fun m -> m "added %a" Mirage_kv.Key.pp path);
entries >>= fun () ->
update_caches t >|= fun () ->
@ -661,22 +566,23 @@ module Make
then Tar.High (High.inj (Lwt.return_ok None))
else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end
let entries_of_git ~mtime store repo =
Git.find_contents store >>= fun paths ->
let entries = Lwt_stream.of_list paths in
let entries_of_git ~mtime store repo urls =
let entries = Git.contents store in
let to_entry path =
Store.get store path >|= function
| Ok data ->
let data =
if Mirage_kv.Key.(equal path (v "repo"))
then repo else data in
then repo else data
let file_mode = 0o644
and mod_time = Int64.of_int mtime
and user_id = 0
and group_id = 0
and size = String.length data in
let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
(Mirage_kv.Key.to_string path) (Int64.of_int size) in
(Mirage_kv.Key.to_string path) (Int64.of_int size) in
urls := Git.find_urls !urls path data;
Some (Some Tar.Header.Ustar, hdr, once data)
| Error _ -> None in
let entries = Lwt_stream.filter_map_s to_entry entries in
@ -685,12 +591,13 @@ module Make
let of_git repo store =
let now = Ptime.v (Pclock.now_d_ps ()) in
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
entries_of_git ~mtime store repo >>= fun entries ->
let urls = ref SM.empty in
entries_of_git ~mtime store repo urls >>= fun entries ->
let t = Tar.out ~level:Ustar entries in
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
let buf = Buffer.create 1024 in
to_buffer buf t >|= function
| Ok () -> Buffer.contents buf
| Ok () -> Buffer.contents buf, !urls
| Error (`Msg msg) -> failwith msg
@ -743,8 +650,8 @@ stamp: %S
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index ->
{ commit_id ; modified ; repo ; index }
Tarball.of_git repo git_kv >|= fun (index, urls) ->
{ commit_id ; modified ; repo ; index }, urls
let update_lock = Lwt_mutex.create ()
@ -757,18 +664,18 @@ stamp: %S
Lwt.return None
| Ok [] ->
|||| (fun m -> m "git changes are empty");
Lwt.return (Some [])
Lwt.return (Some ([], SM.empty))
| Ok changes ->
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
|||| (fun m -> m "git: %s" commit_id);
let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index ->
Tarball.of_git repo git_kv >|= fun (index, urls) ->
t.commit_id <- commit_id ;
t.modified <- modified ;
t.repo <- repo ;
t.index <- index;
Some changes)
Some (changes, urls))
let status disk =
(* report status:
@ -781,27 +688,41 @@ stamp: %S
(SM.cardinal disk.Disk.md5s)
let sort_by_ts a b = a b in
let active_downloads =
let header = "<h2>Active downloads</h2><ul>" in
let content =
SM.fold (fun url (ts, bytes_written, length_or_unknown) acc ->
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to disk, " ^ length_or_unknown ^ "</li>")
^ acc)
!active_downloads ""
SM.bindings !active_downloads |>
List.sort (fun (_, (a, _)) (_, (b, _)) -> sort_by_ts a b) |>
|||| (fun (url, (ts, bytes_written)) ->
"<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to swap</li>")
header ^ content ^ "</ul>"
header ^ String.concat "" content ^ "</ul>"
and failed_downloads =
let header = "<h2>Failed downloads</h2><ul>" in
let content =
SM.fold (fun url (ts, reason) acc ->
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ reason ^ "</li>")
^ acc)
!failed_downloads ""
SM.bindings !failed_downloads |>
List.sort (fun (_, (a, reasona)) (_, (b, reasonb)) ->
match compare_failed reasona reasonb with
| 0 -> sort_by_ts a b
| n -> n) |>
|||| (fun (url, (ts, reason)) ->
Fmt.str "<li>%s: %s error %a"
(Ptime.to_rfc3339 ?tz_offset_s:None ts) url pp_failed reason)
header ^ content ^ "</ul>"
header ^ String.concat "" content ^ "</ul>"
and parse_errors =
let header = "<h2>Parse errors</h2><ul>" in
let content =
SM.bindings !parse_errors |>
List.sort (fun (a, _) (b, _) -> a b) |>
|||| (fun (filename, reason) ->
"<li>" ^ filename ^ ": " ^ reason ^ "</li>")
header ^ String.concat "" content ^ "</ul>"
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>"
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ]
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ; parse_errors ]
^ "</div></body></html>"
let not_modified request (modified, etag) =
@ -951,17 +872,12 @@ stamp: %S
let bad_archives = SSet.of_list Bad.archives
let download_archives parallel_downloads disk http_client store =
let download_archives parallel_downloads disk http_client urls =
(* FIXME: handle resuming partial downloads *)
Git.find_urls store >>= fun urls ->
let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () ->
(* FIXME: check pending and to-delete *)
HM.fold (fun h v r ->
r >>= function
| true -> Disk.exists disk h (hex_to_key v)
@ -974,7 +890,7 @@ stamp: %S
incr idx;
if !idx mod 10 = 0 then Gc.full_major () ;
|||| (fun m -> m "downloading %s" url);
let quux, body_init = Archive_checksum.init_write csums in
let quux, body_init = Disk.init_write disk csums in
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
| Ok (resp, r) ->
@ -983,30 +899,25 @@ stamp: %S
Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason);
(`Bad_response (resp.status, resp.reason));
| Error `Write_error e ->
Logs.err (fun m -> m "%s: write error %a %a"
Logs.err (fun m -> m "%s: write error %a"
Mirage_kv.Key.pp (Disk.pending_key quux)
KV.pp_write_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "write error: %a" KV.pp_write_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
| Error `Swap e ->
Logs.err (fun m -> m "%s: swap error %a %a"
Logs.err (fun m -> m "%s: swap error %a"
Mirage_kv.Key.pp (Disk.pending_key quux)
Swap.pp_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "swap error: %a" Swap.pp_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
| Ok (digests, body) ->
Disk.finalize_write disk quux ~url body csums digests
| Error me ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "mimic error: %a" Mimic.pp_error me);
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
(SM.bindings urls) >>= fun () ->
Disk.update_caches disk >|= fun () ->
@ -1062,14 +973,14 @@ stamp: %S
||| (fun m -> m "Done initializing git state!");
Serve.commit_id git_kv >>= fun commit_id ->
|||| (fun m -> m "git: %s" commit_id);
Serve.create remote git_kv >>= fun serve ->
Serve.create remote git_kv >>= fun (serve, urls) ->
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
let update () =
Serve.update_git ~remote serve git_kv >>= function
| None | Some [] -> Lwt.return_unit
| Some _changes ->
| None | Some ([], _) -> Lwt.return_unit
| Some (_changes, urls) ->
dump_git git_dump git_kv >>= fun () ->
download_archives (K.parallel_downloads ()) disk http_ctx git_kv
download_archives (K.parallel_downloads ()) disk http_ctx urls
let service =
@ -1085,7 +996,7 @@ stamp: %S
go ()
go ());
download_archives (K.parallel_downloads ()) disk http_ctx git_kv >>= fun () ->
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
(th >|= fun _v -> ())
let start block _time _pclock stack git_ctx http_ctx =
Reference in a new issue