revise startup, address urls pointing to same sha256 and support mirrors (upstream and in opam file) #24

Merged
hannes merged 7 commits from startup into main 2024-11-20 10:38:22 +00:00
3 changed files with 237 additions and 102 deletions

View file

@ -1,5 +1,3 @@
module Hash = struct module Hash = struct
type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ] type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ]

View file

@ -35,12 +35,31 @@ let extract_url_checksum filename items =
List.find_opt List.find_opt
(function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false) (function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false)
items items
and mirrors =
List.find_opt
(function { pelem = Variable ({ pelem = "mirrors" ; _ }, _); _ } -> true | _ -> false)
items
in in
let url = let url =
match url, archive with match url, archive with
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url | Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url | None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
| _ -> Error (`Msg "neither 'src' nor 'archive' present") | _ -> Error (`Msg "neither 'src' nor 'archive' present")
and mirrors = match mirrors with
| None -> []
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ } -> [ url ]
| Some { pelem = Variable (_, { pelem = List { pelem = urls ; _ } ; _ }) } ->
List.fold_left (fun acc -> function
| { pelem = String url ; _ } -> url :: acc
Review

I don't think the order is significant, but it's worth noting we reverse the order here.

I don't think the order is significant, but it's worth noting we reverse the order here.
| v ->
Logs.err (fun m -> m "bad mirror data (expected a string in the list): %s"
(OpamPrinter.FullPos.value v));
acc)
[] urls
| Some v ->
Logs.err (fun m -> m "bad mirror data (expected string or string list): %s"
(OpamPrinter.FullPos.items [ v ]));
[]
in in
let csum, csum_errs = let csum, csum_errs =
match checksum with match checksum with
@ -79,7 +98,7 @@ let extract_url_checksum filename items =
| _ -> Error (`Msg "couldn't find or decode 'checksum'"), [] | _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
in in
(match url, csum with (match url, csum with
| Ok url, Ok csum -> Ok (url, csum) | Ok url, Ok csum -> Ok (url, csum, mirrors)
Review

The opam source code does url :: mirrors FWIW. Just an observation; not a request for change.

The opam source code does `url :: mirrors` FWIW. Just an observation; not a request for change.
Review

We keep the url in the map as key, and just carry around a set of other urls for the same artifact.

Not sure whether it is worth to have the url as key in the map anymore, though we discovered some opam packages that used the same archive with different checksums...

We keep the url in the map as key, and just carry around a set of other urls for the same artifact. Not sure whether it is worth to have the url as key in the map anymore, though we discovered some opam packages that used the same archive with different checksums...
| Error _ as e, _ | Error _ as e, _
| _, (Error _ as e) -> e), csum_errs | _, (Error _ as e) -> e), csum_errs

View file

@ -9,11 +9,27 @@ module K = struct
let doc = Arg.info ~doc:"Only check the cache" ["check"] in let doc = Arg.info ~doc:"Only check the cache" ["check"] in
Mirage_runtime.register_arg Arg.(value & flag doc) Mirage_runtime.register_arg Arg.(value & flag doc)
let verify_sha256 = let skip_download =
let doc = Arg.info ~doc:"Skip downloading archives" ["skip-download"] in
Mirage_runtime.register_arg Arg.(value & flag doc)
let upstream_caches =
let doc =
"Upstream caches (e.g. https://opam.ocaml.org/cache). \
For each package first the declared url is attempted. Then, \
if any, all the declared mirrors are attempted. \
Finally, the upstream caches are attempted. \
Note that this does not change the \"archive-mirrors:\" value \
in the /repo endpoint."
in
let doc = Arg.info ~doc ["upstream-cache"] in
Mirage_runtime.register_arg Arg.(value & opt_all string [] doc)
let skip_verify_sha256 =
let doc = Arg.info let doc = Arg.info
~doc:"Verify the SHA256 checksums of the cache contents, and \ ~doc:"Skip verification of the SHA256 checksums of the cache contents, \
re-build the other checksum caches." and do not re-build the other checksum caches."
["verify-sha256"] ["skip-verify-sha256"]
in in
Mirage_runtime.register_arg Arg.(value & flag doc) Mirage_runtime.register_arg Arg.(value & flag doc)
@ -132,31 +148,69 @@ module Make
in in
Lwt_stream.from more Lwt_stream.from more
let sha256s = Hashtbl.create 13
let empty () = Hashtbl.clear sha256s
let find_urls acc path data = let find_urls acc path data =
if Mirage_kv.Key.basename path = "opam" then if Mirage_kv.Key.basename path = "opam" then
let path = Mirage_kv.Key.to_string path in let path = Mirage_kv.Key.to_string path in
let url_csums, errs = Opam_file.extract_urls path data in let url_csums, errs = Opam_file.extract_urls path data in
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs; List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
List.fold_left (fun acc (url, csums) -> let upstream hm =
HM.fold
(fun hash hash_value set ->
List.fold_left (fun set cache_url ->
let url =
cache_url ^ "/" ^ Archive_checksum.Hash.to_string hash ^
"/" ^ String.sub hash_value 0 2 ^ "/" ^ hash_value
in
SSet.add url set)
set (K.upstream_caches ()))
hm SSet.empty
in
List.fold_left (fun acc (url, csums, mirrors) ->
if HM.cardinal csums = 0 then if HM.cardinal csums = 0 then
(add_parse_error path ("no checksums for " ^ url); (add_parse_error path ("no checksums for " ^ url);
acc) acc)
else begin
let url' =
match HM.find_opt `SHA256 csums with
| None -> url
| Some hash ->
match Hashtbl.find_opt sha256s hash with
| None -> Hashtbl.add sha256s hash url; url
| Some url' ->
if not (String.equal url url') then
Logs.debug (fun m -> m "same hash for url %s and %s" url url');
url'
in
let mirrors = SSet.of_list mirrors in
let url, mirrors =
if String.equal url url' then
url, mirrors
else else
url', SSet.add url mirrors
in
SM.update url (function SM.update url (function
| None -> Some csums | None -> Some (csums, mirrors, upstream csums)
| Some csums' -> | Some (csums', mirrors', upstream_caches') ->
if HM.for_all (fun h v -> if HM.for_all (fun h v ->
match HM.find_opt h csums with match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v') | None -> true | Some v' -> String.equal v v')
csums' csums'
then then
Some (HM.union (fun _h v _v' -> Some v) csums csums') Some (HM.union (fun _h v _v' -> Some v) csums csums',
SSet.union mirrors mirrors',
SSet.union (upstream csums) upstream_caches'
)
else begin else begin
add_parse_error path (Fmt.str add_parse_error path (Fmt.str
"mismatching hashes for %s: %s vs %s" "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums)); url (hm_to_s csums') (hm_to_s csums));
None None
end) acc) acc url_csums end) acc
end) acc url_csums
else else
acc acc
@ -238,16 +292,25 @@ module Make
let last_git_status = ref (Error "unknown") let last_git_status = ref (Error "unknown")
module Disk = struct module Disk = struct
module KS = Set.Make(Mirage_kv.Key)
type t = { type t = {
mutable md5s : string SM.t ; mutable md5s : string SM.t ;
mutable sha512s : string SM.t ; mutable sha512s : string SM.t ;
mutable checked : KS.t option ;
dev : KV.t ; dev : KV.t ;
dev_md5s : Cache.t ; dev_md5s : Cache.t ;
dev_sha512s : Cache.t ; dev_sha512s : Cache.t ;
dev_swap : Swap.t ; dev_swap : Swap.t ;
} }
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap } let empty dev dev_md5s dev_sha512s dev_swap =
{ md5s = SM.empty ; sha512s = SM.empty ; checked = Some KS.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
let add_checked t path =
match t.checked with
| None -> ()
| Some s -> t.checked <- Some (KS.add path s)
let marshal_sm (sm : string SM.t) = let marshal_sm (sm : string SM.t) =
let version = char_of_int 1 in let version = char_of_int 1 in
@ -272,7 +335,10 @@ module Make
Lwt.return_unit Lwt.return_unit
let find_key t h key = let find_key t h key =
assert (List.length (Mirage_kv.Key.segments key) = 1); if List.length (Mirage_kv.Key.segments key) <> 1 then begin
Logs.warn (fun m -> m "find_key with multiple segments: %a" Mirage_kv.Key.pp key);
Error `Not_found
end else
match match
match h with match h with
| `MD5 -> | `MD5 ->
@ -280,11 +346,19 @@ module Make
| `SHA512 -> | `SHA512 ->
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s) Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
| `SHA256 -> Some key | `SHA256 -> Some key
| _ -> None
with with
| None -> Error `Not_found | None -> Error `Not_found
| Some x -> Ok x | Some x -> Ok x
let ready t h key =
match t.checked with
| None -> true
| Some s -> match find_key t h key with
| Ok k -> KS.mem k s
| Error _ -> false
let completely_checked t = t.checked = None
let read_chunked t h v f a = let read_chunked t h v f a =
match find_key t h v with match find_key t h v with
| Error `Not_found -> | Error `Not_found ->
@ -382,7 +456,8 @@ module Make
| Ok () -> | Ok () ->
remove_active url; remove_active url;
t.md5s <- SM.add md5 sha256 t.md5s; t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s t.sha512s <- SM.add sha512 sha256 t.sha512s;
add_checked t dest
| Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e) | Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
| Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e) | Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
else begin else begin
@ -392,22 +467,23 @@ module Make
end end
(* on disk, we use a flat file system where the filename is the sha256 of the data *) (* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap = let check ~skip_verify_sha256 t =
KV.list dev Mirage_kv.Key.empty >>= function KV.list t.dev Mirage_kv.Key.empty >>= function
| Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e) | Error e ->
Logs.err (fun m -> m "error %a listing kv" KV.pp_error e);
Lwt.return_unit
| Ok entries -> | Ok entries ->
let t = empty dev dev_md5s dev_sha512s dev_swap in
Cache.read t.dev_md5s >>= fun r -> Cache.read t.dev_md5s >>= fun r ->
(match r with (match r with
| Ok Some s -> | Ok Some s ->
if not verify_sha256 then if skip_verify_sha256 then
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s) Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
| Ok None -> () | Ok None -> ()
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e)); | Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
Cache.read t.dev_sha512s >>= fun r -> Cache.read t.dev_sha512s >>= fun r ->
(match r with (match r with
| Ok Some s -> | Ok Some s ->
if not verify_sha256 then if skip_verify_sha256 then
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s) Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
| Ok None -> () | Ok None -> ()
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e)); | Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
@ -438,7 +514,7 @@ module Make
None None
in in
let sha256_final = let sha256_final =
let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in let need_to_compute = md5_final <> None || sha512_final <> None || not skip_verify_sha256 in
if need_to_compute then if need_to_compute then
let f s = let f s =
let digest = SHA256.(to_raw_string (get s)) in let digest = SHA256.(to_raw_string (get s)) in
@ -455,7 +531,9 @@ module Make
None None
in in
match sha256_final with match sha256_final with
| None -> Lwt.return_unit | None ->
add_checked t path;
Lwt.return_unit
| Some f -> | Some f ->
read_chunked t `SHA256 path read_chunked t `SHA256 path
(fun (sha256, md5, sha512) data -> (fun (sha256, md5, sha512) data ->
@ -481,11 +559,12 @@ module Make
else begin else begin
Option.iter (fun f -> f (Option.get md5)) md5_final; Option.iter (fun f -> f (Option.get md5)) md5_final;
Option.iter (fun f -> f (Option.get sha512)) sha512_final; Option.iter (fun f -> f (Option.get sha512)) sha512_final;
add_checked t path;
Lwt.return_unit Lwt.return_unit
end) end)
entries >>= fun () -> entries >>= fun () ->
update_caches t >|= fun () -> update_caches t >|= fun () ->
t t.checked <- None
let exists t h v = let exists t h v =
match find_key t h v with match find_key t h v with
@ -578,6 +657,7 @@ module Make
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
let urls = ref SM.empty in let urls = ref SM.empty in
entries_of_git ~mtime store repo urls >>= fun entries -> entries_of_git ~mtime store repo urls >>= fun entries ->
Git.empty ();
let t = Tar.out ~level:Ustar entries in let t = Tar.out ~level:Ustar entries in
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
let buf = Buffer.create 1024 in let buf = Buffer.create 1024 in
@ -841,6 +921,7 @@ stamp: %S
| Ok h -> | Ok h ->
let hash = Mirage_kv.Key.v hash in let hash = Mirage_kv.Key.v hash in
Lwt.async (fun () -> Lwt.async (fun () ->
if Disk.ready store h hash then
(Disk.last_modified store h hash >|= function (Disk.last_modified store h hash >|= function
| Error _ -> t.modified | Error _ -> t.modified
| Ok v -> ptime_to_http_date v) >>= fun last_modified -> | Ok v -> ptime_to_http_date v) >>= fun last_modified ->
@ -872,7 +953,11 @@ stamp: %S
Httpaf.Body.write_string body chunk; Httpaf.Body.write_string body chunk;
Httpaf.Body.flush body (Lwt.wakeup wakeup); Httpaf.Body.flush body (Lwt.wakeup wakeup);
wait) () >|= fun _ -> wait) () >|= fun _ ->
Httpaf.Body.close_writer body) Httpaf.Body.close_writer body
else begin
not_found reqd request.Httpaf.Request.target;
Lwt.return_unit
end)
end end
| _ -> | _ ->
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target); Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
@ -881,12 +966,11 @@ stamp: %S
end end
let download_archives parallel_downloads disk http_client urls = let download_archives parallel_downloads disk http_client urls =
(* FIXME: handle resuming partial downloads *)
reset_failed_downloads (); reset_failed_downloads ();
remaining_downloads := SM.cardinal urls; remaining_downloads := SM.cardinal urls;
archives := SM.cardinal urls; archives := SM.cardinal urls;
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
Lwt_list.iter_p (fun (url, csums) -> Lwt_list.iter_p (fun (url, (csums, mirrors, upstream_caches)) ->
Lwt_pool.use pool @@ fun () -> Lwt_pool.use pool @@ fun () ->
HM.fold (fun h v r -> HM.fold (fun h v r ->
r >>= function r >>= function
@ -897,29 +981,51 @@ stamp: %S
decr remaining_downloads; decr remaining_downloads;
Lwt.return_unit Lwt.return_unit
| false -> | false ->
let rec download url mirrors upstream_caches =
let retry () =
if SSet.is_empty mirrors && SSet.is_empty upstream_caches then begin
decr remaining_downloads;
Lwt.return_unit
end else if SSet.is_empty mirrors then
let elt, upstream_caches =
let e = SSet.min_elt upstream_caches in
e, SSet.remove e upstream_caches
in
download elt mirrors upstream_caches
else
let elt, mirrors =
let e = SSet.min_elt mirrors in
e, SSet.remove e mirrors
in
download elt mirrors upstream_caches
in
let quux, body_init = Disk.init_write disk csums in let quux, body_init = Disk.init_write disk csums in
add_to_active url (Ptime.v (Pclock.now_d_ps ())); add_to_active url (Ptime.v (Pclock.now_d_ps ()));
if not (K.skip_download ()) then
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
| Ok (resp, r) -> | Ok (resp, r) ->
decr remaining_downloads;
begin match r with begin match r with
| Error `Bad_response -> | Error `Bad_response ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) add_failed url (Ptime.v (Pclock.now_d_ps ()))
(`Bad_response (resp.status, resp.reason)); (`Bad_response (resp.status, resp.reason));
Lwt.return_unit retry ()
| Error `Write_error e -> | Error `Write_error e ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
Lwt.return_unit retry ()
| Error `Swap e -> | Error `Swap e ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
Lwt.return_unit retry ()
| Ok (digests, body) -> | Ok (digests, body) ->
decr remaining_downloads;
Disk.finalize_write disk quux ~url body csums digests Disk.finalize_write disk quux ~url body csums digests
end end
| Error me -> | Error me ->
decr remaining_downloads;
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me); add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
Lwt.return_unit) retry ()
else
retry ()
in
download url mirrors upstream_caches)
(SM.bindings urls) >>= fun () -> (SM.bindings urls) >>= fun () ->
Disk.update_caches disk >|= fun () -> Disk.update_caches disk >|= fun () ->
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls)) Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
@ -957,10 +1063,10 @@ stamp: %S
Cache.connect sha512s >>= fun sha512s -> Cache.connect sha512s >>= fun sha512s ->
Swap.connect swap >>= fun swap -> Swap.connect swap >>= fun swap ->
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv)); Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk -> let disk = Disk.empty kv md5s sha512s swap in
let remote = K.remote () in let remote = K.remote () in
if K.check () then if K.check () then
Lwt.return_unit Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
else else
begin begin
Logs.info (fun m -> m "Initializing git state. This may take a while..."); Logs.info (fun m -> m "Initializing git state. This may take a while...");
@ -968,23 +1074,27 @@ stamp: %S
Lwt.return (Error ()) Lwt.return (Error ())
else else
restore_git ~remote git_dump git_ctx) >>= function restore_git ~remote git_dump git_ctx) >>= function
| Ok git_kv -> Lwt.return git_kv | Ok git_kv -> Lwt.return (false, git_kv)
| Error () -> | Error () ->
Git_kv.connect git_ctx remote >>= fun git_kv -> Git_kv.connect git_ctx remote >>= fun git_kv ->
dump_git git_dump git_kv >|= fun () -> Lwt.return (true, git_kv)
git_kv end >>= fun (need_dump, git_kv) ->
end >>= fun git_kv ->
Logs.info (fun m -> m "Done initializing git state!"); Logs.info (fun m -> m "Done initializing git state!");
Serve.commit_id git_kv >>= fun commit_id -> Serve.commit_id git_kv >>= fun commit_id ->
Logs.info (fun m -> m "git: %s" commit_id); Logs.info (fun m -> m "git: %s" commit_id);
Serve.create remote git_kv >>= fun (serve, urls) -> Serve.create remote git_kv >>= fun (serve, urls) ->
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t -> Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
let update () = let update () =
if Disk.completely_checked disk then
Serve.update_git ~remote serve git_kv >>= function Serve.update_git ~remote serve git_kv >>= function
| None | Some ([], _) -> Lwt.return_unit | None | Some ([], _) -> Lwt.return_unit
| Some (_changes, urls) -> | Some (_changes, urls) ->
dump_git git_dump git_kv >>= fun () -> dump_git git_dump git_kv >>= fun () ->
download_archives (K.parallel_downloads ()) disk http_ctx urls download_archives (K.parallel_downloads ()) disk http_ctx urls
else begin
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
Lwt.return_unit
end
in in
let service = let service =
Paf.http_service Paf.http_service
@ -993,6 +1103,14 @@ stamp: %S
in in
let `Initialized th = Paf.serve service t in let `Initialized th = Paf.serve service t in
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ())); Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
Lwt.join [
(if need_dump then begin
Logs.info (fun m -> m "dumping git state %s" commit_id);
dump_git git_dump git_kv
end else
Lwt.return_unit) ;
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
] >>= fun () ->
Lwt.async (fun () -> Lwt.async (fun () ->
let rec go () = let rec go () =
Time.sleep_ns (Duration.of_hour 1) >>= fun () -> Time.sleep_ns (Duration.of_hour 1) >>= fun () ->