diff --git a/mirage/archive_checksum.ml b/mirage/archive_checksum.ml index 9e308b3..ba415bd 100644 --- a/mirage/archive_checksum.ml +++ b/mirage/archive_checksum.ml @@ -1,5 +1,3 @@ - - module Hash = struct type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ] diff --git a/mirage/opam_file.ml b/mirage/opam_file.ml index dea9bb6..7fbfa09 100644 --- a/mirage/opam_file.ml +++ b/mirage/opam_file.ml @@ -35,12 +35,31 @@ let extract_url_checksum filename items = List.find_opt (function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false) items + and mirrors = + List.find_opt + (function { pelem = Variable ({ pelem = "mirrors" ; _ }, _); _ } -> true | _ -> false) + items in let url = match url, archive with | Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url | None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url | _ -> Error (`Msg "neither 'src' nor 'archive' present") + and mirrors = match mirrors with + | None -> [] + | Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ } -> [ url ] + | Some { pelem = Variable (_, { pelem = List { pelem = urls ; _ } ; _ }) } -> + List.fold_left (fun acc -> function + | { pelem = String url ; _ } -> url :: acc + | v -> + Logs.err (fun m -> m "bad mirror data (expected a string in the list): %s" + (OpamPrinter.FullPos.value v)); + acc) + [] urls + | Some v -> + Logs.err (fun m -> m "bad mirror data (expected string or string list): %s" + (OpamPrinter.FullPos.items [ v ])); + [] in let csum, csum_errs = match checksum with @@ -79,7 +98,7 @@ let extract_url_checksum filename items = | _ -> Error (`Msg "couldn't find or decode 'checksum'"), [] in (match url, csum with - | Ok url, Ok csum -> Ok (url, csum) + | Ok url, Ok csum -> Ok (url, csum, mirrors) | Error _ as e, _ | _, (Error _ as e) -> e), csum_errs diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index b100ede..3d62810 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -9,11 +9,27 @@ module K = struct let doc = Arg.info ~doc:"Only check the cache" ["check"] in Mirage_runtime.register_arg Arg.(value & flag doc) - let verify_sha256 = + let skip_download = + let doc = Arg.info ~doc:"Skip downloading archives" ["skip-download"] in + Mirage_runtime.register_arg Arg.(value & flag doc) + + let upstream_caches = + let doc = + "Upstream caches (e.g. https://opam.ocaml.org/cache). \ + For each package first the declared url is attempted. Then, \ + if any, all the declared mirrors are attempted. \ + Finally, the upstream caches are attempted. \ + Note that this does not change the \"archive-mirrors:\" value \ + in the /repo endpoint." + in + let doc = Arg.info ~doc ["upstream-cache"] in + Mirage_runtime.register_arg Arg.(value & opt_all string [] doc) + + let skip_verify_sha256 = let doc = Arg.info - ~doc:"Verify the SHA256 checksums of the cache contents, and \ - re-build the other checksum caches." - ["verify-sha256"] + ~doc:"Skip verification of the SHA256 checksums of the cache contents, \ + and do not re-build the other checksum caches." + ["skip-verify-sha256"] in Mirage_runtime.register_arg Arg.(value & flag doc) @@ -132,31 +148,69 @@ module Make in Lwt_stream.from more + let sha256s = Hashtbl.create 13 + + let empty () = Hashtbl.clear sha256s + let find_urls acc path data = if Mirage_kv.Key.basename path = "opam" then let path = Mirage_kv.Key.to_string path in let url_csums, errs = Opam_file.extract_urls path data in List.iter (fun (`Msg msg) -> add_parse_error path msg) errs; - List.fold_left (fun acc (url, csums) -> + let upstream hm = + HM.fold + (fun hash hash_value set -> + List.fold_left (fun set cache_url -> + let url = + cache_url ^ "/" ^ Archive_checksum.Hash.to_string hash ^ + "/" ^ String.sub hash_value 0 2 ^ "/" ^ hash_value + in + SSet.add url set) + set (K.upstream_caches ())) + hm SSet.empty + in + List.fold_left (fun acc (url, csums, mirrors) -> if HM.cardinal csums = 0 then (add_parse_error path ("no checksums for " ^ url); acc) - else + else begin + let url' = + match HM.find_opt `SHA256 csums with + | None -> url + | Some hash -> + match Hashtbl.find_opt sha256s hash with + | None -> Hashtbl.add sha256s hash url; url + | Some url' -> + if not (String.equal url url') then + Logs.debug (fun m -> m "same hash for url %s and %s" url url'); + url' + in + let mirrors = SSet.of_list mirrors in + let url, mirrors = + if String.equal url url' then + url, mirrors + else + url', SSet.add url mirrors + in SM.update url (function - | None -> Some csums - | Some csums' -> + | None -> Some (csums, mirrors, upstream csums) + | Some (csums', mirrors', upstream_caches') -> if HM.for_all (fun h v -> match HM.find_opt h csums with | None -> true | Some v' -> String.equal v v') csums' then - Some (HM.union (fun _h v _v' -> Some v) csums csums') + Some (HM.union (fun _h v _v' -> Some v) csums csums', + SSet.union mirrors mirrors', + SSet.union (upstream csums) upstream_caches' + ) else begin add_parse_error path (Fmt.str "mismatching hashes for %s: %s vs %s" url (hm_to_s csums') (hm_to_s csums)); None - end) acc) acc url_csums + end) acc + end) acc url_csums else acc @@ -238,16 +292,25 @@ module Make let last_git_status = ref (Error "unknown") module Disk = struct + module KS = Set.Make(Mirage_kv.Key) + type t = { mutable md5s : string SM.t ; mutable sha512s : string SM.t ; + mutable checked : KS.t option ; dev : KV.t ; dev_md5s : Cache.t ; dev_sha512s : Cache.t ; dev_swap : Swap.t ; } - let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap } + let empty dev dev_md5s dev_sha512s dev_swap = + { md5s = SM.empty ; sha512s = SM.empty ; checked = Some KS.empty ; dev; dev_md5s; dev_sha512s ; dev_swap } + + let add_checked t path = + match t.checked with + | None -> () + | Some s -> t.checked <- Some (KS.add path s) let marshal_sm (sm : string SM.t) = let version = char_of_int 1 in @@ -272,18 +335,29 @@ module Make Lwt.return_unit let find_key t h key = - assert (List.length (Mirage_kv.Key.segments key) = 1); - match - match h with - | `MD5 -> - Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s) - | `SHA512 -> - Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s) - | `SHA256 -> Some key - | _ -> None - with - | None -> Error `Not_found - | Some x -> Ok x + if List.length (Mirage_kv.Key.segments key) <> 1 then begin + Logs.warn (fun m -> m "find_key with multiple segments: %a" Mirage_kv.Key.pp key); + Error `Not_found + end else + match + match h with + | `MD5 -> + Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s) + | `SHA512 -> + Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s) + | `SHA256 -> Some key + with + | None -> Error `Not_found + | Some x -> Ok x + + let ready t h key = + match t.checked with + | None -> true + | Some s -> match find_key t h key with + | Ok k -> KS.mem k s + | Error _ -> false + + let completely_checked t = t.checked = None let read_chunked t h v f a = match find_key t h v with @@ -382,7 +456,8 @@ module Make | Ok () -> remove_active url; t.md5s <- SM.add md5 sha256 t.md5s; - t.sha512s <- SM.add sha512 sha256 t.sha512s + t.sha512s <- SM.add sha512 sha256 t.sha512s; + add_checked t dest | Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e) | Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e) else begin @@ -392,22 +467,23 @@ module Make end (* on disk, we use a flat file system where the filename is the sha256 of the data *) - let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap = - KV.list dev Mirage_kv.Key.empty >>= function - | Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e) + let check ~skip_verify_sha256 t = + KV.list t.dev Mirage_kv.Key.empty >>= function + | Error e -> + Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); + Lwt.return_unit | Ok entries -> - let t = empty dev dev_md5s dev_sha512s dev_swap in Cache.read t.dev_md5s >>= fun r -> (match r with | Ok Some s -> - if not verify_sha256 then + if skip_verify_sha256 then Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s) | Ok None -> () | Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e)); Cache.read t.dev_sha512s >>= fun r -> (match r with | Ok Some s -> - if not verify_sha256 then + if skip_verify_sha256 then Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s) | Ok None -> () | Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e)); @@ -438,7 +514,7 @@ module Make None in let sha256_final = - let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in + let need_to_compute = md5_final <> None || sha512_final <> None || not skip_verify_sha256 in if need_to_compute then let f s = let digest = SHA256.(to_raw_string (get s)) in @@ -455,7 +531,9 @@ module Make None in match sha256_final with - | None -> Lwt.return_unit + | None -> + add_checked t path; + Lwt.return_unit | Some f -> read_chunked t `SHA256 path (fun (sha256, md5, sha512) data -> @@ -481,11 +559,12 @@ module Make else begin Option.iter (fun f -> f (Option.get md5)) md5_final; Option.iter (fun f -> f (Option.get sha512)) sha512_final; + add_checked t path; Lwt.return_unit end) entries >>= fun () -> update_caches t >|= fun () -> - t + t.checked <- None let exists t h v = match find_key t h v with @@ -578,6 +657,7 @@ module Make let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in let urls = ref SM.empty in entries_of_git ~mtime store repo urls >>= fun entries -> + Git.empty (); let t = Tar.out ~level:Ustar entries in let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in let buf = Buffer.create 1024 in @@ -841,38 +921,43 @@ stamp: %S | Ok h -> let hash = Mirage_kv.Key.v hash in Lwt.async (fun () -> - (Disk.last_modified store h hash >|= function - | Error _ -> t.modified - | Ok v -> ptime_to_http_date v) >>= fun last_modified -> - if not_modified request (last_modified, Mirage_kv.Key.basename hash) then - let resp = Httpaf.Response.create `Not_modified in - respond_with_empty reqd resp; - Lwt.return_unit - else - Disk.size store h hash >>= function - | Error _ -> - not_found reqd request.Httpaf.Request.target; + if Disk.ready store h hash then + (Disk.last_modified store h hash >|= function + | Error _ -> t.modified + | Ok v -> ptime_to_http_date v) >>= fun last_modified -> + if not_modified request (last_modified, Mirage_kv.Key.basename hash) then + let resp = Httpaf.Response.create `Not_modified in + respond_with_empty reqd resp; Lwt.return_unit - | Ok size -> - let size = Optint.Int63.to_string size in - let mime_type = "application/octet-stream" in - let headers = [ - "content-type", mime_type ; - "etag", Mirage_kv.Key.basename hash ; - "last-modified", last_modified ; - "content-length", size ; - ] - in - let headers = Httpaf.Headers.of_list headers in - let resp = Httpaf.Response.create ~headers `OK in - let body = Httpaf.Reqd.respond_with_streaming reqd resp in - Disk.read_chunked store h hash (fun () chunk -> - let wait, wakeup = Lwt.task () in - (* FIXME: catch exception when body is closed *) - Httpaf.Body.write_string body chunk; - Httpaf.Body.flush body (Lwt.wakeup wakeup); - wait) () >|= fun _ -> - Httpaf.Body.close_writer body) + else + Disk.size store h hash >>= function + | Error _ -> + not_found reqd request.Httpaf.Request.target; + Lwt.return_unit + | Ok size -> + let size = Optint.Int63.to_string size in + let mime_type = "application/octet-stream" in + let headers = [ + "content-type", mime_type ; + "etag", Mirage_kv.Key.basename hash ; + "last-modified", last_modified ; + "content-length", size ; + ] + in + let headers = Httpaf.Headers.of_list headers in + let resp = Httpaf.Response.create ~headers `OK in + let body = Httpaf.Reqd.respond_with_streaming reqd resp in + Disk.read_chunked store h hash (fun () chunk -> + let wait, wakeup = Lwt.task () in + (* FIXME: catch exception when body is closed *) + Httpaf.Body.write_string body chunk; + Httpaf.Body.flush body (Lwt.wakeup wakeup); + wait) () >|= fun _ -> + Httpaf.Body.close_writer body + else begin + not_found reqd request.Httpaf.Request.target; + Lwt.return_unit + end) end | _ -> Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target); @@ -881,12 +966,11 @@ stamp: %S end let download_archives parallel_downloads disk http_client urls = - (* FIXME: handle resuming partial downloads *) reset_failed_downloads (); remaining_downloads := SM.cardinal urls; archives := SM.cardinal urls; let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in - Lwt_list.iter_p (fun (url, csums) -> + Lwt_list.iter_p (fun (url, (csums, mirrors, upstream_caches)) -> Lwt_pool.use pool @@ fun () -> HM.fold (fun h v r -> r >>= function @@ -897,29 +981,51 @@ stamp: %S decr remaining_downloads; Lwt.return_unit | false -> - let quux, body_init = Disk.init_write disk csums in - add_to_active url (Ptime.v (Pclock.now_d_ps ())); - Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function - | Ok (resp, r) -> - decr remaining_downloads; - begin match r with - | Error `Bad_response -> - add_failed url (Ptime.v (Pclock.now_d_ps ())) - (`Bad_response (resp.status, resp.reason)); + let rec download url mirrors upstream_caches = + let retry () = + if SSet.is_empty mirrors && SSet.is_empty upstream_caches then begin + decr remaining_downloads; Lwt.return_unit - | Error `Write_error e -> - add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e); - Lwt.return_unit - | Error `Swap e -> - add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e); - Lwt.return_unit - | Ok (digests, body) -> - Disk.finalize_write disk quux ~url body csums digests - end - | Error me -> - decr remaining_downloads; - add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me); - Lwt.return_unit) + end else if SSet.is_empty mirrors then + let elt, upstream_caches = + let e = SSet.min_elt upstream_caches in + e, SSet.remove e upstream_caches + in + download elt mirrors upstream_caches + else + let elt, mirrors = + let e = SSet.min_elt mirrors in + e, SSet.remove e mirrors + in + download elt mirrors upstream_caches + in + let quux, body_init = Disk.init_write disk csums in + add_to_active url (Ptime.v (Pclock.now_d_ps ())); + if not (K.skip_download ()) then + Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function + | Ok (resp, r) -> + begin match r with + | Error `Bad_response -> + add_failed url (Ptime.v (Pclock.now_d_ps ())) + (`Bad_response (resp.status, resp.reason)); + retry () + | Error `Write_error e -> + add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e); + retry () + | Error `Swap e -> + add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e); + retry () + | Ok (digests, body) -> + decr remaining_downloads; + Disk.finalize_write disk quux ~url body csums digests + end + | Error me -> + add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me); + retry () + else + retry () + in + download url mirrors upstream_caches) (SM.bindings urls) >>= fun () -> Disk.update_caches disk >|= fun () -> Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls)) @@ -957,10 +1063,10 @@ stamp: %S Cache.connect sha512s >>= fun sha512s -> Swap.connect swap >>= fun swap -> Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv)); - Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk -> + let disk = Disk.empty kv md5s sha512s swap in let remote = K.remote () in if K.check () then - Lwt.return_unit + Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk else begin Logs.info (fun m -> m "Initializing git state. This may take a while..."); @@ -968,23 +1074,27 @@ stamp: %S Lwt.return (Error ()) else restore_git ~remote git_dump git_ctx) >>= function - | Ok git_kv -> Lwt.return git_kv + | Ok git_kv -> Lwt.return (false, git_kv) | Error () -> Git_kv.connect git_ctx remote >>= fun git_kv -> - dump_git git_dump git_kv >|= fun () -> - git_kv - end >>= fun git_kv -> + Lwt.return (true, git_kv) + end >>= fun (need_dump, git_kv) -> Logs.info (fun m -> m "Done initializing git state!"); Serve.commit_id git_kv >>= fun commit_id -> Logs.info (fun m -> m "git: %s" commit_id); Serve.create remote git_kv >>= fun (serve, urls) -> Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t -> let update () = - Serve.update_git ~remote serve git_kv >>= function - | None | Some ([], _) -> Lwt.return_unit - | Some (_changes, urls) -> - dump_git git_dump git_kv >>= fun () -> - download_archives (K.parallel_downloads ()) disk http_ctx urls + if Disk.completely_checked disk then + Serve.update_git ~remote serve git_kv >>= function + | None | Some ([], _) -> Lwt.return_unit + | Some (_changes, urls) -> + dump_git git_dump git_kv >>= fun () -> + download_archives (K.parallel_downloads ()) disk http_ctx urls + else begin + Logs.warn (fun m -> m "disk is not ready yet, thus not updating"); + Lwt.return_unit + end in let service = Paf.http_service @@ -993,6 +1103,14 @@ stamp: %S in let `Initialized th = Paf.serve service t in Logs.info (fun f -> f "listening on %d/HTTP" (K.port ())); + Lwt.join [ + (if need_dump then begin + Logs.info (fun m -> m "dumping git state %s" commit_id); + dump_git git_dump git_kv + end else + Lwt.return_unit) ; + (Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk) + ] >>= fun () -> Lwt.async (fun () -> let rec go () = Time.sleep_ns (Duration.of_hour 1) >>= fun () ->