diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index 1e1307b..9b53550 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -102,62 +102,67 @@ module Make hm "" module Git = struct - let find_contents store = - let rec go store path acc = - Store.list store path >>= function - | Error e -> - Logs.err (fun m -> m "error %a while listing %a" - Store.pp_error e Mirage_kv.Key.pp path); - Lwt.return acc - | Ok steps -> - Lwt_list.fold_left_s (fun acc (step, _) -> - Store.exists store step >>= function + let contents store = + let explore = ref [ Mirage_kv.Key.empty ] in + let more () = + let rec go () = + match !explore with + | [] -> Lwt.return None + | step :: tl -> + explore := tl; + Store.exists store step >>= function + | Error e -> + Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e + Mirage_kv.Key.pp step); + go () + | Ok None -> + Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step); + go () + | Ok Some `Value -> Lwt.return (Some step) + | Ok Some `Dictionary -> + Store.list store step >>= function | Error e -> - Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e - Mirage_kv.Key.pp step); - Lwt.return acc - | Ok None -> - Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step); - Lwt.return acc - | Ok Some `Value -> Lwt.return (step :: acc) - | Ok Some `Dictionary -> go store step acc) acc steps + Logs.err (fun m -> m "error %a while listing %a" + Store.pp_error e Mirage_kv.Key.pp step); + go () + | Ok steps -> + explore := !explore @ List.map fst steps; + go () + in + go () in - go store Mirage_kv.Key.empty [] + Lwt_stream.from more + + let find_urls acc path data = + if Mirage_kv.Key.basename path = "opam" then + (* TODO: parser errors are logged (should be reported to status page) *) + (try + let url_csums = Opam_file.extract_urls (Mirage_kv.Key.to_string path) data in + List.fold_left (fun acc (url, csums) -> + if HM.cardinal csums = 0 then + (Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc) + else + SM.update url (function + | None -> Some csums + | Some csums' -> + if HM.for_all (fun h v -> + match HM.find_opt h csums with + | None -> true | Some v' -> String.equal v v') + csums' + then + Some (HM.union (fun _h v _v' -> Some v) csums csums') + else begin + Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s" + url (hm_to_s csums') (hm_to_s csums)); + None + end) acc) acc url_csums + with exn -> + Logs.warn (fun m -> m "some error in %a, ignoring %s" + Mirage_kv.Key.pp path (Printexc.to_string exn)); + acc) + else + acc - let find_urls store = - find_contents store >>= fun paths -> - let opam_paths = - List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths - in - Lwt_list.fold_left_s (fun acc path -> - Store.get store path >|= function - | Ok data -> - (* TODO report parser errors *) - (try - let url_csums = Opam_file.extract_urls (Mirage_kv.Key.to_string path) data in - List.fold_left (fun acc (url, csums) -> - if HM.cardinal csums = 0 then - (Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc) - else - SM.update url (function - | None -> Some csums - | Some csums' -> - if HM.for_all (fun h v -> - match HM.find_opt h csums with - | None -> true | Some v' -> String.equal v v') - csums' - then - Some (HM.union (fun _h v _v' -> Some v) csums csums') - else begin - Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s" - url (hm_to_s csums') (hm_to_s csums)); - None - end) acc) acc url_csums - with _ -> - Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path); - acc) - | Error e -> Logs.warn (fun m -> m "Store.get: %a" Store.pp_error e); acc) - SM.empty opam_paths end let active_downloads = ref SM.empty @@ -506,22 +511,23 @@ module Make then Tar.High (High.inj (Lwt.return_ok None)) else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end - let entries_of_git ~mtime store repo = - Git.find_contents store >>= fun paths -> - let entries = Lwt_stream.of_list paths in + let entries_of_git ~mtime store repo urls = + let entries = Git.contents store in let to_entry path = Store.get store path >|= function | Ok data -> let data = if Mirage_kv.Key.(equal path (v "repo")) - then repo else data in + then repo else data + in let file_mode = 0o644 and mod_time = Int64.of_int mtime and user_id = 0 and group_id = 0 and size = String.length data in let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id - (Mirage_kv.Key.to_string path) (Int64.of_int size) in + (Mirage_kv.Key.to_string path) (Int64.of_int size) in + urls := Git.find_urls !urls path data; Some (Some Tar.Header.Ustar, hdr, once data) | Error _ -> None in let entries = Lwt_stream.filter_map_s to_entry entries in @@ -530,12 +536,13 @@ module Make let of_git repo store = let now = Ptime.v (Pclock.now_d_ps ()) in let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in - entries_of_git ~mtime store repo >>= fun entries -> + let urls = ref SM.empty in + entries_of_git ~mtime store repo urls >>= fun entries -> let t = Tar.out ~level:Ustar entries in let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in let buf = Buffer.create 1024 in to_buffer buf t >|= function - | Ok () -> Buffer.contents buf + | Ok () -> Buffer.contents buf, !urls | Error (`Msg msg) -> failwith msg end @@ -588,8 +595,8 @@ stamp: %S commit_id git_kv >>= fun commit_id -> modified git_kv >>= fun modified -> let repo = repo remote commit_id in - Tarball.of_git repo git_kv >|= fun index -> - { commit_id ; modified ; repo ; index } + Tarball.of_git repo git_kv >|= fun (index, urls) -> + { commit_id ; modified ; repo ; index }, urls let update_lock = Lwt_mutex.create () @@ -602,18 +609,18 @@ stamp: %S Lwt.return None | Ok [] -> Logs.info (fun m -> m "git changes are empty"); - Lwt.return (Some []) + Lwt.return (Some ([], SM.empty)) | Ok changes -> commit_id git_kv >>= fun commit_id -> modified git_kv >>= fun modified -> Logs.info (fun m -> m "git: %s" commit_id); let repo = repo remote commit_id in - Tarball.of_git repo git_kv >|= fun index -> + Tarball.of_git repo git_kv >|= fun (index, urls) -> t.commit_id <- commit_id ; t.modified <- modified ; t.repo <- repo ; t.index <- index; - Some changes) + Some (changes, urls)) let status disk = (* report status: @@ -796,9 +803,8 @@ stamp: %S end - let download_archives parallel_downloads disk http_client store = + let download_archives parallel_downloads disk http_client urls = (* FIXME: handle resuming partial downloads *) - Git.find_urls store >>= fun urls -> let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in let idx = ref 0 in Lwt_list.iter_p (fun (url, csums) -> @@ -901,14 +907,14 @@ stamp: %S Logs.info (fun m -> m "Done initializing git state!"); Serve.commit_id git_kv >>= fun commit_id -> Logs.info (fun m -> m "git: %s" commit_id); - Serve.create remote git_kv >>= fun serve -> + Serve.create remote git_kv >>= fun (serve, urls) -> Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t -> let update () = Serve.update_git ~remote serve git_kv >>= function - | None | Some [] -> Lwt.return_unit - | Some _changes -> + | None | Some ([], _) -> Lwt.return_unit + | Some (_changes, urls) -> dump_git git_dump git_kv >>= fun () -> - download_archives (K.parallel_downloads ()) disk http_ctx git_kv + download_archives (K.parallel_downloads ()) disk http_ctx urls in let service = Paf.http_service @@ -924,7 +930,7 @@ stamp: %S go () in go ()); - download_archives (K.parallel_downloads ()) disk http_ctx git_kv >>= fun () -> + download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () -> (th >|= fun _v -> ()) let start block _time _pclock stack git_ctx http_ctx =