take mirrors into account (#13) and allow upstream-caches (#5)

This is done by introducing a set of alternative download locations.
This commit is contained in:
Hannes Mehnert 2024-11-14 17:40:22 +01:00
parent c376a4b70e
commit 6a0ae2bcab
3 changed files with 77 additions and 30 deletions

View file

@ -1,5 +1,3 @@
module Hash = struct module Hash = struct
type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ] type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ]

View file

@ -35,12 +35,31 @@ let extract_url_checksum filename items =
List.find_opt List.find_opt
(function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false) (function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false)
items items
and mirrors =
List.find_opt
(function { pelem = Variable ({ pelem = "mirrors" ; _ }, _); _ } -> true | _ -> false)
items
in in
let url = let url =
match url, archive with match url, archive with
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url | Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url | None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
| _ -> Error (`Msg "neither 'src' nor 'archive' present") | _ -> Error (`Msg "neither 'src' nor 'archive' present")
and mirrors = match mirrors with
| None -> []
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ } -> [ url ]
| Some { pelem = Variable (_, { pelem = List { pelem = urls ; _ } ; _ }) } ->
List.fold_left (fun acc -> function
| { pelem = String url ; _ } -> url :: acc
| v ->
Logs.err (fun m -> m "bad mirror data (expected a string in the list): %s"
(OpamPrinter.FullPos.value v));
acc)
[] urls
| Some v ->
Logs.err (fun m -> m "bad mirror data (expected string or string list): %s"
(OpamPrinter.FullPos.items [ v ]));
[]
in in
let csum, csum_errs = let csum, csum_errs =
match checksum with match checksum with
@ -79,7 +98,7 @@ let extract_url_checksum filename items =
| _ -> Error (`Msg "couldn't find or decode 'checksum'"), [] | _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
in in
(match url, csum with (match url, csum with
| Ok url, Ok csum -> Ok (url, csum) | Ok url, Ok csum -> Ok (url, csum, mirrors)
| Error _ as e, _ | Error _ as e, _
| _, (Error _ as e) -> e), csum_errs | _, (Error _ as e) -> e), csum_errs

View file

@ -9,6 +9,10 @@ module K = struct
let doc = Arg.info ~doc:"Only check the cache" ["check"] in let doc = Arg.info ~doc:"Only check the cache" ["check"] in
Mirage_runtime.register_arg Arg.(value & flag doc) Mirage_runtime.register_arg Arg.(value & flag doc)
let upstream_caches =
let doc = Arg.info ~doc:"Upstream caches (e.g. https://opam.ocaml.org/cache)" ["upstream-cache"] in
Mirage_runtime.register_arg Arg.(value & opt_all string [] doc)
let skip_verify_sha256 = let skip_verify_sha256 =
let doc = Arg.info let doc = Arg.info
~doc:"Skip verification of the SHA256 checksums of the cache contents, \ ~doc:"Skip verification of the SHA256 checksums of the cache contents, \
@ -137,20 +141,33 @@ module Make
let path = Mirage_kv.Key.to_string path in let path = Mirage_kv.Key.to_string path in
let url_csums, errs = Opam_file.extract_urls path data in let url_csums, errs = Opam_file.extract_urls path data in
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs; List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
List.fold_left (fun acc (url, csums) -> let upstream hm =
List.fold_left (fun set (hash, hash_value) ->
List.fold_left (fun set cache_url ->
let url =
cache_url ^ "/" ^ Archive_checksum.Hash.to_string hash ^
"/" ^ String.sub hash_value 0 2 ^ "/" ^ hash_value
in
SSet.add url set)
set (K.upstream_caches ()))
SSet.empty (HM.bindings hm)
in
List.fold_left (fun acc (url, csums, mirrors) ->
if HM.cardinal csums = 0 then if HM.cardinal csums = 0 then
(add_parse_error path ("no checksums for " ^ url); (add_parse_error path ("no checksums for " ^ url);
acc) acc)
else else
let mirrors = SSet.of_list mirrors in
SM.update url (function SM.update url (function
| None -> Some csums | None -> Some (csums, SSet.union mirrors (upstream csums))
| Some csums' -> | Some (csums', mirrors') ->
if HM.for_all (fun h v -> if HM.for_all (fun h v ->
match HM.find_opt h csums with match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v') | None -> true | Some v' -> String.equal v v')
csums' csums'
then then
Some (HM.union (fun _h v _v' -> Some v) csums csums') Some (HM.union (fun _h v _v' -> Some v) csums csums',
SSet.union mirrors mirrors')
else begin else begin
add_parse_error path (Fmt.str add_parse_error path (Fmt.str
"mismatching hashes for %s: %s vs %s" "mismatching hashes for %s: %s vs %s"
@ -915,7 +932,7 @@ stamp: %S
remaining_downloads := SM.cardinal urls; remaining_downloads := SM.cardinal urls;
archives := SM.cardinal urls; archives := SM.cardinal urls;
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
Lwt_list.iter_p (fun (url, csums) -> Lwt_list.iter_p (fun (url, (csums, mirrors)) ->
Lwt_pool.use pool @@ fun () -> Lwt_pool.use pool @@ fun () ->
HM.fold (fun h v r -> HM.fold (fun h v r ->
r >>= function r >>= function
@ -926,29 +943,42 @@ stamp: %S
decr remaining_downloads; decr remaining_downloads;
Lwt.return_unit Lwt.return_unit
| false -> | false ->
let quux, body_init = Disk.init_write disk csums in let rec download url mirrors =
add_to_active url (Ptime.v (Pclock.now_d_ps ())); let retry () =
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function if SSet.is_empty mirrors then begin
| Ok (resp, r) -> decr remaining_downloads;
decr remaining_downloads;
begin match r with
| Error `Bad_response ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(`Bad_response (resp.status, resp.reason));
Lwt.return_unit Lwt.return_unit
| Error `Write_error e -> end else
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e); let elt, mirrors =
Lwt.return_unit let e = SSet.min_elt mirrors in
| Error `Swap e -> e, SSet.remove e mirrors
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e); in
Lwt.return_unit download elt mirrors
| Ok (digests, body) -> in
Disk.finalize_write disk quux ~url body csums digests let quux, body_init = Disk.init_write disk csums in
end add_to_active url (Ptime.v (Pclock.now_d_ps ()));
| Error me -> Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
decr remaining_downloads; | Ok (resp, r) ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me); begin match r with
Lwt.return_unit) | Error `Bad_response ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(`Bad_response (resp.status, resp.reason));
retry ()
| Error `Write_error e ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
retry ()
| Error `Swap e ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
retry ()
| Ok (digests, body) ->
decr remaining_downloads;
Disk.finalize_write disk quux ~url body csums digests
end
| Error me ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
retry ()
in
download url mirrors)
(SM.bindings urls) >>= fun () -> (SM.bindings urls) >>= fun () ->
Disk.update_caches disk >|= fun () -> Disk.update_caches disk >|= fun () ->
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls)) Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))