Merge pull request 'revise startup, address urls pointing to same sha256 and support mirrors (upstream and in opam file)' (#24) from startup into main
Reviewed-on: #24
This commit is contained in:
commit
2edc311a33
3 changed files with 237 additions and 102 deletions
|
@ -1,5 +1,3 @@
|
||||||
|
|
||||||
|
|
||||||
module Hash = struct
|
module Hash = struct
|
||||||
type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ]
|
type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ]
|
||||||
|
|
||||||
|
|
|
@ -35,12 +35,31 @@ let extract_url_checksum filename items =
|
||||||
List.find_opt
|
List.find_opt
|
||||||
(function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false)
|
(function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false)
|
||||||
items
|
items
|
||||||
|
and mirrors =
|
||||||
|
List.find_opt
|
||||||
|
(function { pelem = Variable ({ pelem = "mirrors" ; _ }, _); _ } -> true | _ -> false)
|
||||||
|
items
|
||||||
in
|
in
|
||||||
let url =
|
let url =
|
||||||
match url, archive with
|
match url, archive with
|
||||||
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
|
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
|
||||||
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
|
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
|
||||||
| _ -> Error (`Msg "neither 'src' nor 'archive' present")
|
| _ -> Error (`Msg "neither 'src' nor 'archive' present")
|
||||||
|
and mirrors = match mirrors with
|
||||||
|
| None -> []
|
||||||
|
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ } -> [ url ]
|
||||||
|
| Some { pelem = Variable (_, { pelem = List { pelem = urls ; _ } ; _ }) } ->
|
||||||
|
List.fold_left (fun acc -> function
|
||||||
|
| { pelem = String url ; _ } -> url :: acc
|
||||||
|
| v ->
|
||||||
|
Logs.err (fun m -> m "bad mirror data (expected a string in the list): %s"
|
||||||
|
(OpamPrinter.FullPos.value v));
|
||||||
|
acc)
|
||||||
|
[] urls
|
||||||
|
| Some v ->
|
||||||
|
Logs.err (fun m -> m "bad mirror data (expected string or string list): %s"
|
||||||
|
(OpamPrinter.FullPos.items [ v ]));
|
||||||
|
[]
|
||||||
in
|
in
|
||||||
let csum, csum_errs =
|
let csum, csum_errs =
|
||||||
match checksum with
|
match checksum with
|
||||||
|
@ -79,7 +98,7 @@ let extract_url_checksum filename items =
|
||||||
| _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
|
| _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
|
||||||
in
|
in
|
||||||
(match url, csum with
|
(match url, csum with
|
||||||
| Ok url, Ok csum -> Ok (url, csum)
|
| Ok url, Ok csum -> Ok (url, csum, mirrors)
|
||||||
| Error _ as e, _
|
| Error _ as e, _
|
||||||
| _, (Error _ as e) -> e), csum_errs
|
| _, (Error _ as e) -> e), csum_errs
|
||||||
|
|
||||||
|
|
|
@ -9,11 +9,27 @@ module K = struct
|
||||||
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
|
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
|
||||||
Mirage_runtime.register_arg Arg.(value & flag doc)
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||||
|
|
||||||
let verify_sha256 =
|
let skip_download =
|
||||||
|
let doc = Arg.info ~doc:"Skip downloading archives" ["skip-download"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||||
|
|
||||||
|
let upstream_caches =
|
||||||
|
let doc =
|
||||||
|
"Upstream caches (e.g. https://opam.ocaml.org/cache). \
|
||||||
|
For each package first the declared url is attempted. Then, \
|
||||||
|
if any, all the declared mirrors are attempted. \
|
||||||
|
Finally, the upstream caches are attempted. \
|
||||||
|
Note that this does not change the \"archive-mirrors:\" value \
|
||||||
|
in the /repo endpoint."
|
||||||
|
in
|
||||||
|
let doc = Arg.info ~doc ["upstream-cache"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & opt_all string [] doc)
|
||||||
|
|
||||||
|
let skip_verify_sha256 =
|
||||||
let doc = Arg.info
|
let doc = Arg.info
|
||||||
~doc:"Verify the SHA256 checksums of the cache contents, and \
|
~doc:"Skip verification of the SHA256 checksums of the cache contents, \
|
||||||
re-build the other checksum caches."
|
and do not re-build the other checksum caches."
|
||||||
["verify-sha256"]
|
["skip-verify-sha256"]
|
||||||
in
|
in
|
||||||
Mirage_runtime.register_arg Arg.(value & flag doc)
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||||
|
|
||||||
|
@ -132,31 +148,69 @@ module Make
|
||||||
in
|
in
|
||||||
Lwt_stream.from more
|
Lwt_stream.from more
|
||||||
|
|
||||||
|
let sha256s = Hashtbl.create 13
|
||||||
|
|
||||||
|
let empty () = Hashtbl.clear sha256s
|
||||||
|
|
||||||
let find_urls acc path data =
|
let find_urls acc path data =
|
||||||
if Mirage_kv.Key.basename path = "opam" then
|
if Mirage_kv.Key.basename path = "opam" then
|
||||||
let path = Mirage_kv.Key.to_string path in
|
let path = Mirage_kv.Key.to_string path in
|
||||||
let url_csums, errs = Opam_file.extract_urls path data in
|
let url_csums, errs = Opam_file.extract_urls path data in
|
||||||
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
|
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
|
||||||
List.fold_left (fun acc (url, csums) ->
|
let upstream hm =
|
||||||
|
HM.fold
|
||||||
|
(fun hash hash_value set ->
|
||||||
|
List.fold_left (fun set cache_url ->
|
||||||
|
let url =
|
||||||
|
cache_url ^ "/" ^ Archive_checksum.Hash.to_string hash ^
|
||||||
|
"/" ^ String.sub hash_value 0 2 ^ "/" ^ hash_value
|
||||||
|
in
|
||||||
|
SSet.add url set)
|
||||||
|
set (K.upstream_caches ()))
|
||||||
|
hm SSet.empty
|
||||||
|
in
|
||||||
|
List.fold_left (fun acc (url, csums, mirrors) ->
|
||||||
if HM.cardinal csums = 0 then
|
if HM.cardinal csums = 0 then
|
||||||
(add_parse_error path ("no checksums for " ^ url);
|
(add_parse_error path ("no checksums for " ^ url);
|
||||||
acc)
|
acc)
|
||||||
else
|
else begin
|
||||||
|
let url' =
|
||||||
|
match HM.find_opt `SHA256 csums with
|
||||||
|
| None -> url
|
||||||
|
| Some hash ->
|
||||||
|
match Hashtbl.find_opt sha256s hash with
|
||||||
|
| None -> Hashtbl.add sha256s hash url; url
|
||||||
|
| Some url' ->
|
||||||
|
if not (String.equal url url') then
|
||||||
|
Logs.debug (fun m -> m "same hash for url %s and %s" url url');
|
||||||
|
url'
|
||||||
|
in
|
||||||
|
let mirrors = SSet.of_list mirrors in
|
||||||
|
let url, mirrors =
|
||||||
|
if String.equal url url' then
|
||||||
|
url, mirrors
|
||||||
|
else
|
||||||
|
url', SSet.add url mirrors
|
||||||
|
in
|
||||||
SM.update url (function
|
SM.update url (function
|
||||||
| None -> Some csums
|
| None -> Some (csums, mirrors, upstream csums)
|
||||||
| Some csums' ->
|
| Some (csums', mirrors', upstream_caches') ->
|
||||||
if HM.for_all (fun h v ->
|
if HM.for_all (fun h v ->
|
||||||
match HM.find_opt h csums with
|
match HM.find_opt h csums with
|
||||||
| None -> true | Some v' -> String.equal v v')
|
| None -> true | Some v' -> String.equal v v')
|
||||||
csums'
|
csums'
|
||||||
then
|
then
|
||||||
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
Some (HM.union (fun _h v _v' -> Some v) csums csums',
|
||||||
|
SSet.union mirrors mirrors',
|
||||||
|
SSet.union (upstream csums) upstream_caches'
|
||||||
|
)
|
||||||
else begin
|
else begin
|
||||||
add_parse_error path (Fmt.str
|
add_parse_error path (Fmt.str
|
||||||
"mismatching hashes for %s: %s vs %s"
|
"mismatching hashes for %s: %s vs %s"
|
||||||
url (hm_to_s csums') (hm_to_s csums));
|
url (hm_to_s csums') (hm_to_s csums));
|
||||||
None
|
None
|
||||||
end) acc) acc url_csums
|
end) acc
|
||||||
|
end) acc url_csums
|
||||||
else
|
else
|
||||||
acc
|
acc
|
||||||
|
|
||||||
|
@ -238,16 +292,25 @@ module Make
|
||||||
let last_git_status = ref (Error "unknown")
|
let last_git_status = ref (Error "unknown")
|
||||||
|
|
||||||
module Disk = struct
|
module Disk = struct
|
||||||
|
module KS = Set.Make(Mirage_kv.Key)
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
mutable md5s : string SM.t ;
|
mutable md5s : string SM.t ;
|
||||||
mutable sha512s : string SM.t ;
|
mutable sha512s : string SM.t ;
|
||||||
|
mutable checked : KS.t option ;
|
||||||
dev : KV.t ;
|
dev : KV.t ;
|
||||||
dev_md5s : Cache.t ;
|
dev_md5s : Cache.t ;
|
||||||
dev_sha512s : Cache.t ;
|
dev_sha512s : Cache.t ;
|
||||||
dev_swap : Swap.t ;
|
dev_swap : Swap.t ;
|
||||||
}
|
}
|
||||||
|
|
||||||
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
let empty dev dev_md5s dev_sha512s dev_swap =
|
||||||
|
{ md5s = SM.empty ; sha512s = SM.empty ; checked = Some KS.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
||||||
|
|
||||||
|
let add_checked t path =
|
||||||
|
match t.checked with
|
||||||
|
| None -> ()
|
||||||
|
| Some s -> t.checked <- Some (KS.add path s)
|
||||||
|
|
||||||
let marshal_sm (sm : string SM.t) =
|
let marshal_sm (sm : string SM.t) =
|
||||||
let version = char_of_int 1 in
|
let version = char_of_int 1 in
|
||||||
|
@ -272,18 +335,29 @@ module Make
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
let find_key t h key =
|
let find_key t h key =
|
||||||
assert (List.length (Mirage_kv.Key.segments key) = 1);
|
if List.length (Mirage_kv.Key.segments key) <> 1 then begin
|
||||||
match
|
Logs.warn (fun m -> m "find_key with multiple segments: %a" Mirage_kv.Key.pp key);
|
||||||
match h with
|
Error `Not_found
|
||||||
| `MD5 ->
|
end else
|
||||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
match
|
||||||
| `SHA512 ->
|
match h with
|
||||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
| `MD5 ->
|
||||||
| `SHA256 -> Some key
|
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
||||||
| _ -> None
|
| `SHA512 ->
|
||||||
with
|
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
||||||
| None -> Error `Not_found
|
| `SHA256 -> Some key
|
||||||
| Some x -> Ok x
|
with
|
||||||
|
| None -> Error `Not_found
|
||||||
|
| Some x -> Ok x
|
||||||
|
|
||||||
|
let ready t h key =
|
||||||
|
match t.checked with
|
||||||
|
| None -> true
|
||||||
|
| Some s -> match find_key t h key with
|
||||||
|
| Ok k -> KS.mem k s
|
||||||
|
| Error _ -> false
|
||||||
|
|
||||||
|
let completely_checked t = t.checked = None
|
||||||
|
|
||||||
let read_chunked t h v f a =
|
let read_chunked t h v f a =
|
||||||
match find_key t h v with
|
match find_key t h v with
|
||||||
|
@ -382,7 +456,8 @@ module Make
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
remove_active url;
|
remove_active url;
|
||||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
t.sha512s <- SM.add sha512 sha256 t.sha512s;
|
||||||
|
add_checked t dest
|
||||||
| Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
|
| Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
|
||||||
| Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
|
| Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
|
||||||
else begin
|
else begin
|
||||||
|
@ -392,22 +467,23 @@ module Make
|
||||||
end
|
end
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
|
let check ~skip_verify_sha256 t =
|
||||||
KV.list dev Mirage_kv.Key.empty >>= function
|
KV.list t.dev Mirage_kv.Key.empty >>= function
|
||||||
| Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e)
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a listing kv" KV.pp_error e);
|
||||||
|
Lwt.return_unit
|
||||||
| Ok entries ->
|
| Ok entries ->
|
||||||
let t = empty dev dev_md5s dev_sha512s dev_swap in
|
|
||||||
Cache.read t.dev_md5s >>= fun r ->
|
Cache.read t.dev_md5s >>= fun r ->
|
||||||
(match r with
|
(match r with
|
||||||
| Ok Some s ->
|
| Ok Some s ->
|
||||||
if not verify_sha256 then
|
if skip_verify_sha256 then
|
||||||
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
||||||
| Ok None -> ()
|
| Ok None -> ()
|
||||||
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
||||||
Cache.read t.dev_sha512s >>= fun r ->
|
Cache.read t.dev_sha512s >>= fun r ->
|
||||||
(match r with
|
(match r with
|
||||||
| Ok Some s ->
|
| Ok Some s ->
|
||||||
if not verify_sha256 then
|
if skip_verify_sha256 then
|
||||||
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
||||||
| Ok None -> ()
|
| Ok None -> ()
|
||||||
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
||||||
|
@ -438,7 +514,7 @@ module Make
|
||||||
None
|
None
|
||||||
in
|
in
|
||||||
let sha256_final =
|
let sha256_final =
|
||||||
let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in
|
let need_to_compute = md5_final <> None || sha512_final <> None || not skip_verify_sha256 in
|
||||||
if need_to_compute then
|
if need_to_compute then
|
||||||
let f s =
|
let f s =
|
||||||
let digest = SHA256.(to_raw_string (get s)) in
|
let digest = SHA256.(to_raw_string (get s)) in
|
||||||
|
@ -455,7 +531,9 @@ module Make
|
||||||
None
|
None
|
||||||
in
|
in
|
||||||
match sha256_final with
|
match sha256_final with
|
||||||
| None -> Lwt.return_unit
|
| None ->
|
||||||
|
add_checked t path;
|
||||||
|
Lwt.return_unit
|
||||||
| Some f ->
|
| Some f ->
|
||||||
read_chunked t `SHA256 path
|
read_chunked t `SHA256 path
|
||||||
(fun (sha256, md5, sha512) data ->
|
(fun (sha256, md5, sha512) data ->
|
||||||
|
@ -481,11 +559,12 @@ module Make
|
||||||
else begin
|
else begin
|
||||||
Option.iter (fun f -> f (Option.get md5)) md5_final;
|
Option.iter (fun f -> f (Option.get md5)) md5_final;
|
||||||
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|
||||||
|
add_checked t path;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end)
|
end)
|
||||||
entries >>= fun () ->
|
entries >>= fun () ->
|
||||||
update_caches t >|= fun () ->
|
update_caches t >|= fun () ->
|
||||||
t
|
t.checked <- None
|
||||||
|
|
||||||
let exists t h v =
|
let exists t h v =
|
||||||
match find_key t h v with
|
match find_key t h v with
|
||||||
|
@ -578,6 +657,7 @@ module Make
|
||||||
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
||||||
let urls = ref SM.empty in
|
let urls = ref SM.empty in
|
||||||
entries_of_git ~mtime store repo urls >>= fun entries ->
|
entries_of_git ~mtime store repo urls >>= fun entries ->
|
||||||
|
Git.empty ();
|
||||||
let t = Tar.out ~level:Ustar entries in
|
let t = Tar.out ~level:Ustar entries in
|
||||||
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
|
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
|
||||||
let buf = Buffer.create 1024 in
|
let buf = Buffer.create 1024 in
|
||||||
|
@ -841,38 +921,43 @@ stamp: %S
|
||||||
| Ok h ->
|
| Ok h ->
|
||||||
let hash = Mirage_kv.Key.v hash in
|
let hash = Mirage_kv.Key.v hash in
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
(Disk.last_modified store h hash >|= function
|
if Disk.ready store h hash then
|
||||||
| Error _ -> t.modified
|
(Disk.last_modified store h hash >|= function
|
||||||
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
| Error _ -> t.modified
|
||||||
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
||||||
respond_with_empty reqd resp;
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
Lwt.return_unit
|
respond_with_empty reqd resp;
|
||||||
else
|
|
||||||
Disk.size store h hash >>= function
|
|
||||||
| Error _ ->
|
|
||||||
not_found reqd request.Httpaf.Request.target;
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok size ->
|
else
|
||||||
let size = Optint.Int63.to_string size in
|
Disk.size store h hash >>= function
|
||||||
let mime_type = "application/octet-stream" in
|
| Error _ ->
|
||||||
let headers = [
|
not_found reqd request.Httpaf.Request.target;
|
||||||
"content-type", mime_type ;
|
Lwt.return_unit
|
||||||
"etag", Mirage_kv.Key.basename hash ;
|
| Ok size ->
|
||||||
"last-modified", last_modified ;
|
let size = Optint.Int63.to_string size in
|
||||||
"content-length", size ;
|
let mime_type = "application/octet-stream" in
|
||||||
]
|
let headers = [
|
||||||
in
|
"content-type", mime_type ;
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
"etag", Mirage_kv.Key.basename hash ;
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
"last-modified", last_modified ;
|
||||||
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
"content-length", size ;
|
||||||
Disk.read_chunked store h hash (fun () chunk ->
|
]
|
||||||
let wait, wakeup = Lwt.task () in
|
in
|
||||||
(* FIXME: catch exception when body is closed *)
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
Httpaf.Body.write_string body chunk;
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
||||||
wait) () >|= fun _ ->
|
Disk.read_chunked store h hash (fun () chunk ->
|
||||||
Httpaf.Body.close_writer body)
|
let wait, wakeup = Lwt.task () in
|
||||||
|
(* FIXME: catch exception when body is closed *)
|
||||||
|
Httpaf.Body.write_string body chunk;
|
||||||
|
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
||||||
|
wait) () >|= fun _ ->
|
||||||
|
Httpaf.Body.close_writer body
|
||||||
|
else begin
|
||||||
|
not_found reqd request.Httpaf.Request.target;
|
||||||
|
Lwt.return_unit
|
||||||
|
end)
|
||||||
end
|
end
|
||||||
| _ ->
|
| _ ->
|
||||||
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
||||||
|
@ -881,12 +966,11 @@ stamp: %S
|
||||||
end
|
end
|
||||||
|
|
||||||
let download_archives parallel_downloads disk http_client urls =
|
let download_archives parallel_downloads disk http_client urls =
|
||||||
(* FIXME: handle resuming partial downloads *)
|
|
||||||
reset_failed_downloads ();
|
reset_failed_downloads ();
|
||||||
remaining_downloads := SM.cardinal urls;
|
remaining_downloads := SM.cardinal urls;
|
||||||
archives := SM.cardinal urls;
|
archives := SM.cardinal urls;
|
||||||
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
|
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
|
||||||
Lwt_list.iter_p (fun (url, csums) ->
|
Lwt_list.iter_p (fun (url, (csums, mirrors, upstream_caches)) ->
|
||||||
Lwt_pool.use pool @@ fun () ->
|
Lwt_pool.use pool @@ fun () ->
|
||||||
HM.fold (fun h v r ->
|
HM.fold (fun h v r ->
|
||||||
r >>= function
|
r >>= function
|
||||||
|
@ -897,29 +981,51 @@ stamp: %S
|
||||||
decr remaining_downloads;
|
decr remaining_downloads;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| false ->
|
| false ->
|
||||||
let quux, body_init = Disk.init_write disk csums in
|
let rec download url mirrors upstream_caches =
|
||||||
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
let retry () =
|
||||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
if SSet.is_empty mirrors && SSet.is_empty upstream_caches then begin
|
||||||
| Ok (resp, r) ->
|
decr remaining_downloads;
|
||||||
decr remaining_downloads;
|
|
||||||
begin match r with
|
|
||||||
| Error `Bad_response ->
|
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(`Bad_response (resp.status, resp.reason));
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error `Write_error e ->
|
end else if SSet.is_empty mirrors then
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
|
let elt, upstream_caches =
|
||||||
Lwt.return_unit
|
let e = SSet.min_elt upstream_caches in
|
||||||
| Error `Swap e ->
|
e, SSet.remove e upstream_caches
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
|
in
|
||||||
Lwt.return_unit
|
download elt mirrors upstream_caches
|
||||||
| Ok (digests, body) ->
|
else
|
||||||
Disk.finalize_write disk quux ~url body csums digests
|
let elt, mirrors =
|
||||||
end
|
let e = SSet.min_elt mirrors in
|
||||||
| Error me ->
|
e, SSet.remove e mirrors
|
||||||
decr remaining_downloads;
|
in
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
|
download elt mirrors upstream_caches
|
||||||
Lwt.return_unit)
|
in
|
||||||
|
let quux, body_init = Disk.init_write disk csums in
|
||||||
|
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
||||||
|
if not (K.skip_download ()) then
|
||||||
|
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
||||||
|
| Ok (resp, r) ->
|
||||||
|
begin match r with
|
||||||
|
| Error `Bad_response ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(`Bad_response (resp.status, resp.reason));
|
||||||
|
retry ()
|
||||||
|
| Error `Write_error e ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
|
||||||
|
retry ()
|
||||||
|
| Error `Swap e ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
|
||||||
|
retry ()
|
||||||
|
| Ok (digests, body) ->
|
||||||
|
decr remaining_downloads;
|
||||||
|
Disk.finalize_write disk quux ~url body csums digests
|
||||||
|
end
|
||||||
|
| Error me ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
|
||||||
|
retry ()
|
||||||
|
else
|
||||||
|
retry ()
|
||||||
|
in
|
||||||
|
download url mirrors upstream_caches)
|
||||||
(SM.bindings urls) >>= fun () ->
|
(SM.bindings urls) >>= fun () ->
|
||||||
Disk.update_caches disk >|= fun () ->
|
Disk.update_caches disk >|= fun () ->
|
||||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||||
|
@ -957,10 +1063,10 @@ stamp: %S
|
||||||
Cache.connect sha512s >>= fun sha512s ->
|
Cache.connect sha512s >>= fun sha512s ->
|
||||||
Swap.connect swap >>= fun swap ->
|
Swap.connect swap >>= fun swap ->
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk ->
|
let disk = Disk.empty kv md5s sha512s swap in
|
||||||
let remote = K.remote () in
|
let remote = K.remote () in
|
||||||
if K.check () then
|
if K.check () then
|
||||||
Lwt.return_unit
|
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
||||||
else
|
else
|
||||||
begin
|
begin
|
||||||
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
||||||
|
@ -968,23 +1074,27 @@ stamp: %S
|
||||||
Lwt.return (Error ())
|
Lwt.return (Error ())
|
||||||
else
|
else
|
||||||
restore_git ~remote git_dump git_ctx) >>= function
|
restore_git ~remote git_dump git_ctx) >>= function
|
||||||
| Ok git_kv -> Lwt.return git_kv
|
| Ok git_kv -> Lwt.return (false, git_kv)
|
||||||
| Error () ->
|
| Error () ->
|
||||||
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
||||||
dump_git git_dump git_kv >|= fun () ->
|
Lwt.return (true, git_kv)
|
||||||
git_kv
|
end >>= fun (need_dump, git_kv) ->
|
||||||
end >>= fun git_kv ->
|
|
||||||
Logs.info (fun m -> m "Done initializing git state!");
|
Logs.info (fun m -> m "Done initializing git state!");
|
||||||
Serve.commit_id git_kv >>= fun commit_id ->
|
Serve.commit_id git_kv >>= fun commit_id ->
|
||||||
Logs.info (fun m -> m "git: %s" commit_id);
|
Logs.info (fun m -> m "git: %s" commit_id);
|
||||||
Serve.create remote git_kv >>= fun (serve, urls) ->
|
Serve.create remote git_kv >>= fun (serve, urls) ->
|
||||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||||
let update () =
|
let update () =
|
||||||
Serve.update_git ~remote serve git_kv >>= function
|
if Disk.completely_checked disk then
|
||||||
| None | Some ([], _) -> Lwt.return_unit
|
Serve.update_git ~remote serve git_kv >>= function
|
||||||
| Some (_changes, urls) ->
|
| None | Some ([], _) -> Lwt.return_unit
|
||||||
dump_git git_dump git_kv >>= fun () ->
|
| Some (_changes, urls) ->
|
||||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
dump_git git_dump git_kv >>= fun () ->
|
||||||
|
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||||
|
else begin
|
||||||
|
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
||||||
|
Lwt.return_unit
|
||||||
|
end
|
||||||
in
|
in
|
||||||
let service =
|
let service =
|
||||||
Paf.http_service
|
Paf.http_service
|
||||||
|
@ -993,6 +1103,14 @@ stamp: %S
|
||||||
in
|
in
|
||||||
let `Initialized th = Paf.serve service t in
|
let `Initialized th = Paf.serve service t in
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||||
|
Lwt.join [
|
||||||
|
(if need_dump then begin
|
||||||
|
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||||
|
dump_git git_dump git_kv
|
||||||
|
end else
|
||||||
|
Lwt.return_unit) ;
|
||||||
|
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||||
|
] >>= fun () ->
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let rec go () =
|
let rec go () =
|
||||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||||
|
|
Loading…
Reference in a new issue