revise startup, address urls pointing to same sha256 and support mirrors (upstream and in opam file) #24
|
@ -1,5 +1,3 @@
|
||||||
|
|
||||||
|
|
||||||
module Hash = struct
|
module Hash = struct
|
||||||
type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ]
|
type t = (* OpamHash.kind = *) [ `MD5 | `SHA256 | `SHA512 ]
|
||||||
|
|
||||||
|
|
|
@ -35,12 +35,31 @@ let extract_url_checksum filename items =
|
||||||
List.find_opt
|
List.find_opt
|
||||||
(function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false)
|
(function { pelem = Variable ({ pelem = "checksum" ; _ }, _); _ } -> true | _ -> false)
|
||||||
items
|
items
|
||||||
|
and mirrors =
|
||||||
|
List.find_opt
|
||||||
|
(function { pelem = Variable ({ pelem = "mirrors" ; _ }, _); _ } -> true | _ -> false)
|
||||||
|
items
|
||||||
in
|
in
|
||||||
let url =
|
let url =
|
||||||
match url, archive with
|
match url, archive with
|
||||||
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
|
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
|
||||||
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
|
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
|
||||||
| _ -> Error (`Msg "neither 'src' nor 'archive' present")
|
| _ -> Error (`Msg "neither 'src' nor 'archive' present")
|
||||||
|
and mirrors = match mirrors with
|
||||||
|
| None -> []
|
||||||
|
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ } -> [ url ]
|
||||||
|
| Some { pelem = Variable (_, { pelem = List { pelem = urls ; _ } ; _ }) } ->
|
||||||
|
List.fold_left (fun acc -> function
|
||||||
|
| { pelem = String url ; _ } -> url :: acc
|
||||||
|
|||||||
|
| v ->
|
||||||
|
Logs.err (fun m -> m "bad mirror data (expected a string in the list): %s"
|
||||||
|
(OpamPrinter.FullPos.value v));
|
||||||
|
acc)
|
||||||
|
[] urls
|
||||||
|
| Some v ->
|
||||||
|
Logs.err (fun m -> m "bad mirror data (expected string or string list): %s"
|
||||||
|
(OpamPrinter.FullPos.items [ v ]));
|
||||||
|
[]
|
||||||
in
|
in
|
||||||
let csum, csum_errs =
|
let csum, csum_errs =
|
||||||
match checksum with
|
match checksum with
|
||||||
|
@ -79,7 +98,7 @@ let extract_url_checksum filename items =
|
||||||
| _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
|
| _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
|
||||||
in
|
in
|
||||||
(match url, csum with
|
(match url, csum with
|
||||||
| Ok url, Ok csum -> Ok (url, csum)
|
| Ok url, Ok csum -> Ok (url, csum, mirrors)
|
||||||
reynir
commented
The opam source code does The opam source code does `url :: mirrors` FWIW. Just an observation; not a request for change.
hannes
commented
We keep the url in the map as key, and just carry around a set of other urls for the same artifact. Not sure whether it is worth to have the url as key in the map anymore, though we discovered some opam packages that used the same archive with different checksums... We keep the url in the map as key, and just carry around a set of other urls for the same artifact.
Not sure whether it is worth to have the url as key in the map anymore, though we discovered some opam packages that used the same archive with different checksums...
|
|||||||
| Error _ as e, _
|
| Error _ as e, _
|
||||||
| _, (Error _ as e) -> e), csum_errs
|
| _, (Error _ as e) -> e), csum_errs
|
||||||
|
|
||||||
|
|
|
@ -9,11 +9,27 @@ module K = struct
|
||||||
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
|
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
|
||||||
Mirage_runtime.register_arg Arg.(value & flag doc)
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||||
|
|
||||||
let verify_sha256 =
|
let skip_download =
|
||||||
|
let doc = Arg.info ~doc:"Skip downloading archives" ["skip-download"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||||
|
|
||||||
|
let upstream_caches =
|
||||||
|
let doc =
|
||||||
hannes marked this conversation as resolved
Outdated
reynir
commented
"Upstream caches to use internally (e.g. https://opam.ocaml.org/cache). This makes opam-mirror try the cache(s) before going to the source and mirrors. This does not change the published "archive-mirrors:" value in the /repo endpoint." "Upstream caches to use internally (e.g. https://opam.ocaml.org/cache). This makes opam-mirror try the cache(s) before going to the source and mirrors. This does *not* change the published "archive-mirrors:" value in the /repo endpoint."
hannes
commented
actually, if I'm not misguided, we first go to the source, and only thereafter to mirror(s). Is this a good semantics? actually, if I'm not misguided, we first go to the source, and only thereafter to mirror(s). Is this a good semantics?
hannes
commented
And please feel free to push the documentation updates directly to the branch. And please feel free to push the documentation updates directly to the branch.
reynir
commented
Hm it turns out the exact semantics is a bit complicated to explain. You are right that we first to go the source! Then we go to the mirrors or the cache depending on how the URLs sort as they both go into a string set. I will give this some thought and suggest a change. Hm it turns out the exact semantics is a bit complicated to explain. You are right that we first to go the source! Then we go to the mirrors or the cache **depending on how the URLs sort** as they both go into a string set.
I will give this some thought and suggest a change.
reynir
commented
Latest commit splits mirror URLs from upstream cache URLs so the mirror URLs are tried first before the upstream caches, and adds a longer description of the option. Latest commit splits mirror URLs from upstream cache URLs so the mirror URLs are tried first before the upstream caches, and adds a longer description of the option.
|
|||||||
|
"Upstream caches (e.g. https://opam.ocaml.org/cache). \
|
||||||
|
For each package first the declared url is attempted. Then, \
|
||||||
|
if any, all the declared mirrors are attempted. \
|
||||||
|
Finally, the upstream caches are attempted. \
|
||||||
|
Note that this does not change the \"archive-mirrors:\" value \
|
||||||
|
in the /repo endpoint."
|
||||||
|
in
|
||||||
|
let doc = Arg.info ~doc ["upstream-cache"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & opt_all string [] doc)
|
||||||
|
|
||||||
|
let skip_verify_sha256 =
|
||||||
let doc = Arg.info
|
let doc = Arg.info
|
||||||
~doc:"Verify the SHA256 checksums of the cache contents, and \
|
~doc:"Skip verification of the SHA256 checksums of the cache contents, \
|
||||||
re-build the other checksum caches."
|
and do not re-build the other checksum caches."
|
||||||
["verify-sha256"]
|
["skip-verify-sha256"]
|
||||||
in
|
in
|
||||||
Mirage_runtime.register_arg Arg.(value & flag doc)
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||||
|
|
||||||
|
@ -132,31 +148,69 @@ module Make
|
||||||
in
|
in
|
||||||
Lwt_stream.from more
|
Lwt_stream.from more
|
||||||
|
|
||||||
|
let sha256s = Hashtbl.create 13
|
||||||
|
|
||||||
|
let empty () = Hashtbl.clear sha256s
|
||||||
hannes marked this conversation as resolved
Outdated
reynir
commented
Maybe worth using Maybe worth using `HM.fold`
hannes
commented
Indeed, currently it is a List.fold_left of "HM.bindings hm". So yes, this could be nicer code. :) Indeed, currently it is a List.fold_left of "HM.bindings hm". So yes, this could be nicer code. :)
|
|||||||
|
|
||||||
let find_urls acc path data =
|
let find_urls acc path data =
|
||||||
if Mirage_kv.Key.basename path = "opam" then
|
if Mirage_kv.Key.basename path = "opam" then
|
||||||
let path = Mirage_kv.Key.to_string path in
|
let path = Mirage_kv.Key.to_string path in
|
||||||
let url_csums, errs = Opam_file.extract_urls path data in
|
let url_csums, errs = Opam_file.extract_urls path data in
|
||||||
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
|
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
|
||||||
List.fold_left (fun acc (url, csums) ->
|
let upstream hm =
|
||||||
|
HM.fold
|
||||||
|
(fun hash hash_value set ->
|
||||||
|
List.fold_left (fun set cache_url ->
|
||||||
|
let url =
|
||||||
|
cache_url ^ "/" ^ Archive_checksum.Hash.to_string hash ^
|
||||||
|
"/" ^ String.sub hash_value 0 2 ^ "/" ^ hash_value
|
||||||
|
in
|
||||||
|
SSet.add url set)
|
||||||
|
set (K.upstream_caches ()))
|
||||||
|
hm SSet.empty
|
||||||
|
in
|
||||||
|
List.fold_left (fun acc (url, csums, mirrors) ->
|
||||||
if HM.cardinal csums = 0 then
|
if HM.cardinal csums = 0 then
|
||||||
(add_parse_error path ("no checksums for " ^ url);
|
(add_parse_error path ("no checksums for " ^ url);
|
||||||
acc)
|
acc)
|
||||||
else
|
else begin
|
||||||
|
let url' =
|
||||||
|
match HM.find_opt `SHA256 csums with
|
||||||
|
| None -> url
|
||||||
|
| Some hash ->
|
||||||
|
match Hashtbl.find_opt sha256s hash with
|
||||||
|
| None -> Hashtbl.add sha256s hash url; url
|
||||||
|
| Some url' ->
|
||||||
|
if not (String.equal url url') then
|
||||||
|
Logs.debug (fun m -> m "same hash for url %s and %s" url url');
|
||||||
|
url'
|
||||||
|
in
|
||||||
|
let mirrors = SSet.of_list mirrors in
|
||||||
|
let url, mirrors =
|
||||||
|
if String.equal url url' then
|
||||||
|
url, mirrors
|
||||||
|
else
|
||||||
|
url', SSet.add url mirrors
|
||||||
|
in
|
||||||
SM.update url (function
|
SM.update url (function
|
||||||
hannes
commented
not doing not doing `upstream csums` here since we did that for the previous entry already. downside is if there's an artifact that has some hashes in some file, and more hashes in another file, we won't get the other hashes.
|
|||||||
| None -> Some csums
|
| None -> Some (csums, mirrors, upstream csums)
|
||||||
| Some csums' ->
|
| Some (csums', mirrors', upstream_caches') ->
|
||||||
if HM.for_all (fun h v ->
|
if HM.for_all (fun h v ->
|
||||||
match HM.find_opt h csums with
|
match HM.find_opt h csums with
|
||||||
| None -> true | Some v' -> String.equal v v')
|
| None -> true | Some v' -> String.equal v v')
|
||||||
csums'
|
csums'
|
||||||
then
|
then
|
||||||
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
Some (HM.union (fun _h v _v' -> Some v) csums csums',
|
||||||
|
SSet.union mirrors mirrors',
|
||||||
|
SSet.union (upstream csums) upstream_caches'
|
||||||
|
)
|
||||||
else begin
|
else begin
|
||||||
add_parse_error path (Fmt.str
|
add_parse_error path (Fmt.str
|
||||||
"mismatching hashes for %s: %s vs %s"
|
"mismatching hashes for %s: %s vs %s"
|
||||||
url (hm_to_s csums') (hm_to_s csums));
|
url (hm_to_s csums') (hm_to_s csums));
|
||||||
None
|
None
|
||||||
end) acc) acc url_csums
|
end) acc
|
||||||
|
end) acc url_csums
|
||||||
else
|
else
|
||||||
acc
|
acc
|
||||||
|
|
||||||
|
@ -238,16 +292,25 @@ module Make
|
||||||
let last_git_status = ref (Error "unknown")
|
let last_git_status = ref (Error "unknown")
|
||||||
|
|
||||||
module Disk = struct
|
module Disk = struct
|
||||||
|
module KS = Set.Make(Mirage_kv.Key)
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
mutable md5s : string SM.t ;
|
mutable md5s : string SM.t ;
|
||||||
mutable sha512s : string SM.t ;
|
mutable sha512s : string SM.t ;
|
||||||
|
mutable checked : KS.t option ;
|
||||||
dev : KV.t ;
|
dev : KV.t ;
|
||||||
dev_md5s : Cache.t ;
|
dev_md5s : Cache.t ;
|
||||||
dev_sha512s : Cache.t ;
|
dev_sha512s : Cache.t ;
|
||||||
dev_swap : Swap.t ;
|
dev_swap : Swap.t ;
|
||||||
}
|
}
|
||||||
|
|
||||||
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
let empty dev dev_md5s dev_sha512s dev_swap =
|
||||||
|
{ md5s = SM.empty ; sha512s = SM.empty ; checked = Some KS.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
||||||
|
|
||||||
|
let add_checked t path =
|
||||||
|
match t.checked with
|
||||||
|
| None -> ()
|
||||||
|
| Some s -> t.checked <- Some (KS.add path s)
|
||||||
|
|
||||||
let marshal_sm (sm : string SM.t) =
|
let marshal_sm (sm : string SM.t) =
|
||||||
let version = char_of_int 1 in
|
let version = char_of_int 1 in
|
||||||
|
@ -272,18 +335,29 @@ module Make
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
let find_key t h key =
|
let find_key t h key =
|
||||||
assert (List.length (Mirage_kv.Key.segments key) = 1);
|
if List.length (Mirage_kv.Key.segments key) <> 1 then begin
|
||||||
match
|
Logs.warn (fun m -> m "find_key with multiple segments: %a" Mirage_kv.Key.pp key);
|
||||||
match h with
|
Error `Not_found
|
||||||
| `MD5 ->
|
end else
|
||||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
match
|
||||||
| `SHA512 ->
|
match h with
|
||||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
| `MD5 ->
|
||||||
| `SHA256 -> Some key
|
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
||||||
| _ -> None
|
| `SHA512 ->
|
||||||
with
|
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
||||||
| None -> Error `Not_found
|
| `SHA256 -> Some key
|
||||||
| Some x -> Ok x
|
with
|
||||||
|
| None -> Error `Not_found
|
||||||
|
| Some x -> Ok x
|
||||||
|
|
||||||
|
let ready t h key =
|
||||||
|
match t.checked with
|
||||||
|
| None -> true
|
||||||
|
| Some s -> match find_key t h key with
|
||||||
|
| Ok k -> KS.mem k s
|
||||||
|
| Error _ -> false
|
||||||
|
|
||||||
|
let completely_checked t = t.checked = None
|
||||||
|
|
||||||
let read_chunked t h v f a =
|
let read_chunked t h v f a =
|
||||||
match find_key t h v with
|
match find_key t h v with
|
||||||
|
@ -382,7 +456,8 @@ module Make
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
remove_active url;
|
remove_active url;
|
||||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
t.sha512s <- SM.add sha512 sha256 t.sha512s;
|
||||||
|
add_checked t dest
|
||||||
| Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
|
| Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
|
||||||
| Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
|
| Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
|
||||||
else begin
|
else begin
|
||||||
|
@ -392,22 +467,23 @@ module Make
|
||||||
end
|
end
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
|
let check ~skip_verify_sha256 t =
|
||||||
KV.list dev Mirage_kv.Key.empty >>= function
|
KV.list t.dev Mirage_kv.Key.empty >>= function
|
||||||
| Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e)
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a listing kv" KV.pp_error e);
|
||||||
|
Lwt.return_unit
|
||||||
| Ok entries ->
|
| Ok entries ->
|
||||||
let t = empty dev dev_md5s dev_sha512s dev_swap in
|
|
||||||
Cache.read t.dev_md5s >>= fun r ->
|
Cache.read t.dev_md5s >>= fun r ->
|
||||||
(match r with
|
(match r with
|
||||||
| Ok Some s ->
|
| Ok Some s ->
|
||||||
if not verify_sha256 then
|
if skip_verify_sha256 then
|
||||||
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
||||||
| Ok None -> ()
|
| Ok None -> ()
|
||||||
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
||||||
Cache.read t.dev_sha512s >>= fun r ->
|
Cache.read t.dev_sha512s >>= fun r ->
|
||||||
(match r with
|
(match r with
|
||||||
| Ok Some s ->
|
| Ok Some s ->
|
||||||
if not verify_sha256 then
|
if skip_verify_sha256 then
|
||||||
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
||||||
| Ok None -> ()
|
| Ok None -> ()
|
||||||
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
||||||
|
@ -438,7 +514,7 @@ module Make
|
||||||
None
|
None
|
||||||
in
|
in
|
||||||
let sha256_final =
|
let sha256_final =
|
||||||
let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in
|
let need_to_compute = md5_final <> None || sha512_final <> None || not skip_verify_sha256 in
|
||||||
if need_to_compute then
|
if need_to_compute then
|
||||||
let f s =
|
let f s =
|
||||||
let digest = SHA256.(to_raw_string (get s)) in
|
let digest = SHA256.(to_raw_string (get s)) in
|
||||||
|
@ -455,7 +531,9 @@ module Make
|
||||||
None
|
None
|
||||||
in
|
in
|
||||||
match sha256_final with
|
match sha256_final with
|
||||||
| None -> Lwt.return_unit
|
| None ->
|
||||||
|
add_checked t path;
|
||||||
|
Lwt.return_unit
|
||||||
| Some f ->
|
| Some f ->
|
||||||
read_chunked t `SHA256 path
|
read_chunked t `SHA256 path
|
||||||
(fun (sha256, md5, sha512) data ->
|
(fun (sha256, md5, sha512) data ->
|
||||||
|
@ -481,11 +559,12 @@ module Make
|
||||||
else begin
|
else begin
|
||||||
Option.iter (fun f -> f (Option.get md5)) md5_final;
|
Option.iter (fun f -> f (Option.get md5)) md5_final;
|
||||||
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|
||||||
|
add_checked t path;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end)
|
end)
|
||||||
entries >>= fun () ->
|
entries >>= fun () ->
|
||||||
update_caches t >|= fun () ->
|
update_caches t >|= fun () ->
|
||||||
t
|
t.checked <- None
|
||||||
|
|
||||||
let exists t h v =
|
let exists t h v =
|
||||||
match find_key t h v with
|
match find_key t h v with
|
||||||
|
@ -578,6 +657,7 @@ module Make
|
||||||
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
||||||
let urls = ref SM.empty in
|
let urls = ref SM.empty in
|
||||||
entries_of_git ~mtime store repo urls >>= fun entries ->
|
entries_of_git ~mtime store repo urls >>= fun entries ->
|
||||||
|
Git.empty ();
|
||||||
let t = Tar.out ~level:Ustar entries in
|
let t = Tar.out ~level:Ustar entries in
|
||||||
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
|
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
|
||||||
let buf = Buffer.create 1024 in
|
let buf = Buffer.create 1024 in
|
||||||
|
@ -841,38 +921,43 @@ stamp: %S
|
||||||
| Ok h ->
|
| Ok h ->
|
||||||
let hash = Mirage_kv.Key.v hash in
|
let hash = Mirage_kv.Key.v hash in
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
(Disk.last_modified store h hash >|= function
|
if Disk.ready store h hash then
|
||||||
| Error _ -> t.modified
|
(Disk.last_modified store h hash >|= function
|
||||||
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
| Error _ -> t.modified
|
||||||
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
||||||
respond_with_empty reqd resp;
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
Lwt.return_unit
|
respond_with_empty reqd resp;
|
||||||
else
|
|
||||||
Disk.size store h hash >>= function
|
|
||||||
| Error _ ->
|
|
||||||
not_found reqd request.Httpaf.Request.target;
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok size ->
|
else
|
||||||
let size = Optint.Int63.to_string size in
|
Disk.size store h hash >>= function
|
||||||
let mime_type = "application/octet-stream" in
|
| Error _ ->
|
||||||
let headers = [
|
not_found reqd request.Httpaf.Request.target;
|
||||||
"content-type", mime_type ;
|
Lwt.return_unit
|
||||||
"etag", Mirage_kv.Key.basename hash ;
|
| Ok size ->
|
||||||
"last-modified", last_modified ;
|
let size = Optint.Int63.to_string size in
|
||||||
"content-length", size ;
|
let mime_type = "application/octet-stream" in
|
||||||
]
|
let headers = [
|
||||||
in
|
"content-type", mime_type ;
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
"etag", Mirage_kv.Key.basename hash ;
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
"last-modified", last_modified ;
|
||||||
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
"content-length", size ;
|
||||||
Disk.read_chunked store h hash (fun () chunk ->
|
]
|
||||||
let wait, wakeup = Lwt.task () in
|
in
|
||||||
(* FIXME: catch exception when body is closed *)
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
Httpaf.Body.write_string body chunk;
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
||||||
wait) () >|= fun _ ->
|
Disk.read_chunked store h hash (fun () chunk ->
|
||||||
Httpaf.Body.close_writer body)
|
let wait, wakeup = Lwt.task () in
|
||||||
|
(* FIXME: catch exception when body is closed *)
|
||||||
|
Httpaf.Body.write_string body chunk;
|
||||||
|
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
||||||
|
wait) () >|= fun _ ->
|
||||||
|
Httpaf.Body.close_writer body
|
||||||
|
else begin
|
||||||
|
not_found reqd request.Httpaf.Request.target;
|
||||||
|
Lwt.return_unit
|
||||||
|
end)
|
||||||
end
|
end
|
||||||
| _ ->
|
| _ ->
|
||||||
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
||||||
|
@ -881,12 +966,11 @@ stamp: %S
|
||||||
end
|
end
|
||||||
|
|
||||||
let download_archives parallel_downloads disk http_client urls =
|
let download_archives parallel_downloads disk http_client urls =
|
||||||
(* FIXME: handle resuming partial downloads *)
|
|
||||||
reset_failed_downloads ();
|
reset_failed_downloads ();
|
||||||
remaining_downloads := SM.cardinal urls;
|
remaining_downloads := SM.cardinal urls;
|
||||||
archives := SM.cardinal urls;
|
archives := SM.cardinal urls;
|
||||||
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
|
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
|
||||||
Lwt_list.iter_p (fun (url, csums) ->
|
Lwt_list.iter_p (fun (url, (csums, mirrors, upstream_caches)) ->
|
||||||
Lwt_pool.use pool @@ fun () ->
|
Lwt_pool.use pool @@ fun () ->
|
||||||
HM.fold (fun h v r ->
|
HM.fold (fun h v r ->
|
||||||
r >>= function
|
r >>= function
|
||||||
|
@ -897,29 +981,51 @@ stamp: %S
|
||||||
decr remaining_downloads;
|
decr remaining_downloads;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| false ->
|
| false ->
|
||||||
let quux, body_init = Disk.init_write disk csums in
|
let rec download url mirrors upstream_caches =
|
||||||
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
let retry () =
|
||||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
if SSet.is_empty mirrors && SSet.is_empty upstream_caches then begin
|
||||||
| Ok (resp, r) ->
|
decr remaining_downloads;
|
||||||
decr remaining_downloads;
|
|
||||||
begin match r with
|
|
||||||
| Error `Bad_response ->
|
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(`Bad_response (resp.status, resp.reason));
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error `Write_error e ->
|
end else if SSet.is_empty mirrors then
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
|
let elt, upstream_caches =
|
||||||
Lwt.return_unit
|
let e = SSet.min_elt upstream_caches in
|
||||||
| Error `Swap e ->
|
e, SSet.remove e upstream_caches
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
|
in
|
||||||
Lwt.return_unit
|
download elt mirrors upstream_caches
|
||||||
| Ok (digests, body) ->
|
else
|
||||||
Disk.finalize_write disk quux ~url body csums digests
|
let elt, mirrors =
|
||||||
end
|
let e = SSet.min_elt mirrors in
|
||||||
| Error me ->
|
e, SSet.remove e mirrors
|
||||||
decr remaining_downloads;
|
in
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
|
download elt mirrors upstream_caches
|
||||||
Lwt.return_unit)
|
in
|
||||||
|
let quux, body_init = Disk.init_write disk csums in
|
||||||
|
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
||||||
|
if not (K.skip_download ()) then
|
||||||
|
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
||||||
|
| Ok (resp, r) ->
|
||||||
|
begin match r with
|
||||||
|
| Error `Bad_response ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(`Bad_response (resp.status, resp.reason));
|
||||||
|
retry ()
|
||||||
|
| Error `Write_error e ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
|
||||||
|
retry ()
|
||||||
|
| Error `Swap e ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
|
||||||
|
retry ()
|
||||||
|
| Ok (digests, body) ->
|
||||||
|
decr remaining_downloads;
|
||||||
|
Disk.finalize_write disk quux ~url body csums digests
|
||||||
|
end
|
||||||
|
| Error me ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
|
||||||
|
retry ()
|
||||||
|
else
|
||||||
|
retry ()
|
||||||
|
in
|
||||||
|
download url mirrors upstream_caches)
|
||||||
(SM.bindings urls) >>= fun () ->
|
(SM.bindings urls) >>= fun () ->
|
||||||
Disk.update_caches disk >|= fun () ->
|
Disk.update_caches disk >|= fun () ->
|
||||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||||
|
@ -957,10 +1063,10 @@ stamp: %S
|
||||||
Cache.connect sha512s >>= fun sha512s ->
|
Cache.connect sha512s >>= fun sha512s ->
|
||||||
Swap.connect swap >>= fun swap ->
|
Swap.connect swap >>= fun swap ->
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk ->
|
let disk = Disk.empty kv md5s sha512s swap in
|
||||||
let remote = K.remote () in
|
let remote = K.remote () in
|
||||||
if K.check () then
|
if K.check () then
|
||||||
Lwt.return_unit
|
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
||||||
else
|
else
|
||||||
begin
|
begin
|
||||||
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
||||||
|
@ -968,23 +1074,27 @@ stamp: %S
|
||||||
Lwt.return (Error ())
|
Lwt.return (Error ())
|
||||||
else
|
else
|
||||||
restore_git ~remote git_dump git_ctx) >>= function
|
restore_git ~remote git_dump git_ctx) >>= function
|
||||||
| Ok git_kv -> Lwt.return git_kv
|
| Ok git_kv -> Lwt.return (false, git_kv)
|
||||||
| Error () ->
|
| Error () ->
|
||||||
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
||||||
dump_git git_dump git_kv >|= fun () ->
|
Lwt.return (true, git_kv)
|
||||||
git_kv
|
end >>= fun (need_dump, git_kv) ->
|
||||||
end >>= fun git_kv ->
|
|
||||||
Logs.info (fun m -> m "Done initializing git state!");
|
Logs.info (fun m -> m "Done initializing git state!");
|
||||||
Serve.commit_id git_kv >>= fun commit_id ->
|
Serve.commit_id git_kv >>= fun commit_id ->
|
||||||
Logs.info (fun m -> m "git: %s" commit_id);
|
Logs.info (fun m -> m "git: %s" commit_id);
|
||||||
Serve.create remote git_kv >>= fun (serve, urls) ->
|
Serve.create remote git_kv >>= fun (serve, urls) ->
|
||||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||||
let update () =
|
let update () =
|
||||||
Serve.update_git ~remote serve git_kv >>= function
|
if Disk.completely_checked disk then
|
||||||
| None | Some ([], _) -> Lwt.return_unit
|
Serve.update_git ~remote serve git_kv >>= function
|
||||||
| Some (_changes, urls) ->
|
| None | Some ([], _) -> Lwt.return_unit
|
||||||
dump_git git_dump git_kv >>= fun () ->
|
| Some (_changes, urls) ->
|
||||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
dump_git git_dump git_kv >>= fun () ->
|
||||||
|
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||||
|
else begin
|
||||||
|
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
||||||
|
Lwt.return_unit
|
||||||
|
end
|
||||||
in
|
in
|
||||||
let service =
|
let service =
|
||||||
Paf.http_service
|
Paf.http_service
|
||||||
|
@ -993,6 +1103,14 @@ stamp: %S
|
||||||
in
|
in
|
||||||
let `Initialized th = Paf.serve service t in
|
let `Initialized th = Paf.serve service t in
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||||
|
Lwt.join [
|
||||||
|
(if need_dump then begin
|
||||||
|
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||||
|
dump_git git_dump git_kv
|
||||||
|
end else
|
||||||
|
Lwt.return_unit) ;
|
||||||
|
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||||
|
] >>= fun () ->
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let rec go () =
|
let rec go () =
|
||||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||||
|
|
I don't think the order is significant, but it's worth noting we reverse the order here.