stream git contents, also make the tarball and the find_urls in one go

This commit is contained in:
Hannes Mehnert 2024-11-04 17:17:52 +01:00
parent 2312092e42
commit a9b8f18192

View file

@ -102,62 +102,67 @@ module Make
hm ""
module Git = struct
let find_contents store =
let rec go store path acc =
Store.list store path >>= function
| Error e ->
Logs.err (fun m -> m "error %a while listing %a"
Store.pp_error e Mirage_kv.Key.pp path);
Lwt.return acc
| Ok steps ->
Lwt_list.fold_left_s (fun acc (step, _) ->
Store.exists store step >>= function
let contents store =
let explore = ref [ Mirage_kv.Key.empty ] in
let more () =
let rec go () =
match !explore with
| [] -> Lwt.return None
| step :: tl ->
explore := tl;
Store.exists store step >>= function
| Error e ->
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e
Mirage_kv.Key.pp step);
go ()
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
go ()
| Ok Some `Value -> Lwt.return (Some step)
| Ok Some `Dictionary ->
Store.list store step >>= function
| Error e ->
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e
Mirage_kv.Key.pp step);
Lwt.return acc
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
Lwt.return acc
| Ok Some `Value -> Lwt.return (step :: acc)
| Ok Some `Dictionary -> go store step acc) acc steps
Logs.err (fun m -> m "error %a while listing %a"
Store.pp_error e Mirage_kv.Key.pp step);
go ()
| Ok steps ->
explore := !explore @ List.map fst steps;
go ()
in
go ()
in
go store Mirage_kv.Key.empty []
Lwt_stream.from more
let find_urls acc path data =
if Mirage_kv.Key.basename path = "opam" then
(* TODO: parser errors are logged (should be reported to status page) *)
(try
let url_csums = Opam_file.extract_urls (Mirage_kv.Key.to_string path) data in
List.fold_left (fun acc (url, csums) ->
if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
else
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
csums'
then
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
None
end) acc) acc url_csums
with exn ->
Logs.warn (fun m -> m "some error in %a, ignoring %s"
Mirage_kv.Key.pp path (Printexc.to_string exn));
acc)
else
acc
let find_urls store =
find_contents store >>= fun paths ->
let opam_paths =
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths
in
Lwt_list.fold_left_s (fun acc path ->
Store.get store path >|= function
| Ok data ->
(* TODO report parser errors *)
(try
let url_csums = Opam_file.extract_urls (Mirage_kv.Key.to_string path) data in
List.fold_left (fun acc (url, csums) ->
if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
else
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
csums'
then
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
None
end) acc) acc url_csums
with _ ->
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
acc)
| Error e -> Logs.warn (fun m -> m "Store.get: %a" Store.pp_error e); acc)
SM.empty opam_paths
end
let active_downloads = ref SM.empty
@ -506,22 +511,23 @@ module Make
then Tar.High (High.inj (Lwt.return_ok None))
else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end
let entries_of_git ~mtime store repo =
Git.find_contents store >>= fun paths ->
let entries = Lwt_stream.of_list paths in
let entries_of_git ~mtime store repo urls =
let entries = Git.contents store in
let to_entry path =
Store.get store path >|= function
| Ok data ->
let data =
if Mirage_kv.Key.(equal path (v "repo"))
then repo else data in
then repo else data
in
let file_mode = 0o644
and mod_time = Int64.of_int mtime
and user_id = 0
and group_id = 0
and size = String.length data in
let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
(Mirage_kv.Key.to_string path) (Int64.of_int size) in
(Mirage_kv.Key.to_string path) (Int64.of_int size) in
urls := Git.find_urls !urls path data;
Some (Some Tar.Header.Ustar, hdr, once data)
| Error _ -> None in
let entries = Lwt_stream.filter_map_s to_entry entries in
@ -530,12 +536,13 @@ module Make
let of_git repo store =
let now = Ptime.v (Pclock.now_d_ps ()) in
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
entries_of_git ~mtime store repo >>= fun entries ->
let urls = ref SM.empty in
entries_of_git ~mtime store repo urls >>= fun entries ->
let t = Tar.out ~level:Ustar entries in
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
let buf = Buffer.create 1024 in
to_buffer buf t >|= function
| Ok () -> Buffer.contents buf
| Ok () -> Buffer.contents buf, !urls
| Error (`Msg msg) -> failwith msg
end
@ -588,8 +595,8 @@ stamp: %S
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index ->
{ commit_id ; modified ; repo ; index }
Tarball.of_git repo git_kv >|= fun (index, urls) ->
{ commit_id ; modified ; repo ; index }, urls
let update_lock = Lwt_mutex.create ()
@ -602,18 +609,18 @@ stamp: %S
Lwt.return None
| Ok [] ->
Logs.info (fun m -> m "git changes are empty");
Lwt.return (Some [])
Lwt.return (Some ([], SM.empty))
| Ok changes ->
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
Logs.info (fun m -> m "git: %s" commit_id);
let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index ->
Tarball.of_git repo git_kv >|= fun (index, urls) ->
t.commit_id <- commit_id ;
t.modified <- modified ;
t.repo <- repo ;
t.index <- index;
Some changes)
Some (changes, urls))
let status disk =
(* report status:
@ -796,9 +803,8 @@ stamp: %S
end
let download_archives parallel_downloads disk http_client store =
let download_archives parallel_downloads disk http_client urls =
(* FIXME: handle resuming partial downloads *)
Git.find_urls store >>= fun urls ->
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) ->
@ -901,14 +907,14 @@ stamp: %S
Logs.info (fun m -> m "Done initializing git state!");
Serve.commit_id git_kv >>= fun commit_id ->
Logs.info (fun m -> m "git: %s" commit_id);
Serve.create remote git_kv >>= fun serve ->
Serve.create remote git_kv >>= fun (serve, urls) ->
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
let update () =
Serve.update_git ~remote serve git_kv >>= function
| None | Some [] -> Lwt.return_unit
| Some _changes ->
| None | Some ([], _) -> Lwt.return_unit
| Some (_changes, urls) ->
dump_git git_dump git_kv >>= fun () ->
download_archives (K.parallel_downloads ()) disk http_ctx git_kv
download_archives (K.parallel_downloads ()) disk http_ctx urls
in
let service =
Paf.http_service
@ -924,7 +930,7 @@ stamp: %S
go ()
in
go ());
download_archives (K.parallel_downloads ()) disk http_ctx git_kv >>= fun () ->
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
(th >|= fun _v -> ())
let start block _time _pclock stack git_ctx http_ctx =