Compare commits

...

17 commits

Author SHA1 Message Date
96368097e9 Merge pull request 'Use swapfs' (#16) from swap into main
Reviewed-on: #16
2024-11-08 12:54:01 +00:00
1f9e3e6e23 log less, now that status is around 2024-11-08 13:51:33 +01:00
1e75be6900 remove logging from opam_file (now reported to status) 2024-11-08 13:46:05 +01:00
c3d5c74075 use K.remote, not the entire repo 2024-11-06 13:15:10 +01:00
be87d19797 reset errors, and sort failures 2024-11-06 13:08:28 +01:00
37008e81f3 organize failures into sections 2024-11-06 12:57:43 +01:00
8ba4cfae00 group download failures by error 2024-11-04 18:43:50 +01:00
9c50538877 record and preserve opam file parsing issues 2024-11-04 18:43:21 +01:00
a47193f147 tweaks 2024-11-04 17:33:50 +01:00
1e35bfefbd initialization: potentially rename bad data 2024-11-04 17:33:33 +01:00
a9b8f18192 stream git contents, also make the tarball and the find_urls in one go 2024-11-04 17:17:52 +01:00
2312092e42 first write to a temporary filename, and rename later 2024-11-04 17:17:37 +01:00
4bec3bfbd8 restore checksum failure error 2024-11-04 16:50:42 +01:00
f48cc19fc4 drop superfluous 'unknown' 2024-11-04 16:46:01 +01:00
7689397ac3 remove bad archive list 2024-11-04 16:14:09 +01:00
e59f02a16f always use swap, remove the pending / to_delete stuff 2024-11-04 16:13:36 +01:00
456340562d Use swapfs 2024-11-01 14:35:08 +01:00
6 changed files with 374 additions and 608 deletions

View file

@ -52,7 +52,7 @@ let update_digests { md5; sha256; sha512 } data =
let init_write csums = let init_write csums =
let hash, csum = HM.max_binding csums in let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests, `Init) (hash, csum), empty_digests
let digests_to_hm digests = let digests_to_hm digests =
HM.empty HM.empty

View file

@ -1,195 +0,0 @@
let archives =
let too_big =
[ "https://github.com/Opsian/opsian-ocaml/releases/download/0.1/0.1.tar.gz" ]
and hash_mismatch = [
"http://cdn.skylable.com/source/libres3-1.3.tar.gz" ;
"http://cdn.skylable.com/source/libres3-0.3.tar.gz" ;
"http://cdn.skylable.com/source/libres3-1.2.tar.gz" ;
"http://cdn.skylable.com/source/libres3-0.9.tar.gz" ;
"http://cdn.skylable.com/source/libres3-0.2.tar.gz" ;
"http://cdn.skylable.com/source/libres3-1.0.tar.gz" ;
"http://cdn.skylable.com/source/libres3-1.1.tar.gz" ;
"http://cdn.skylable.com/source/libres3-0.1.tar.gz" ;
"https://github.com/lemaetech/http-cookie/releases/download/v3.0.0/http-cookie-v3.0.0.tbz" ;
"http://oqamldebug.forge.ocamlcore.org/oqamldebug-0.9.4.tar.gz" ;
"http://oqamldebug.forge.ocamlcore.org/oqamldebug-0.9.2.tar.gz" ;
"http://oqamldebug.forge.ocamlcore.org/oqamldebug-0.9.3.tar.gz" ;
"http://oqamldebug.forge.ocamlcore.org/oqamldebug-0.9.5.tar.gz" ;
"http://oqamldebug.forge.ocamlcore.org/oqamldebug-0.9.1.tar.gz" ;
"https://github.com/OCamlPro/ezjs_fetch/archive/0.1.tar.gz" ;
"http://github.com/OCamlPro/typerex-build/archive/1.99.13-beta.tar.gz" ;
"https://github.com/mirage/dyntype/tarball/dyntype-0.8.5" ;
"https://github.com/mirage/dyntype/tarball/dyntype-0.8.3" ;
"https://github.com/mirage/dyntype/tarball/dyntype-0.8.2" ;
"https://github.com/mirage/dyntype/tarball/dyntype-0.8.4" ;
"https://github.com/mirage/mirage-http-unix/archive/v1.0.0.tar.gz" ;
"http://github.com/OCamlPro/typerex-build/archive/1.99.15-beta.tar.gz" ;
"http://github.com/OCamlPro/typerex-build/archive/1.99.14-beta.tar.gz" ;
"https://github.com/paulpatault/ocamlog/archive/v0.1.tar.gz" ;
"https://github.com/pveber/OCaml-R/archive/pre-nyc-refactoring.tar.gz" ;
"https://github.com/paulpatault/ocamlog/archive/v0.2.tar.gz" ;
"http://github.com/OCamlPro/typerex-build/archive/1.99.16-beta.tar.gz" ;
"https://github.com/FStarLang/kremlin/archive/v0.9.6.0.zip" ;
"https://gitlab.com/dailambda/plebeia/-/archive/2.0.2/plebeia-2.0.2.tar.gz" ;
"https://github.com/mirleft/ocaml-tls/archive/0.5.0.tar.gz" ;
"https://github.com/eth-sri/ELINA/archive/1.3.tar.gz" ;
"https://gitlab.com/trustworthy-refactoring/refactorer/-/archive/0.1/refactorer-0.1.zip" ;
"https://github.com/completium/archetype-lang/archive/1.3.3.tar.gz" ;
"https://github.com/chetmurthy/pa_ppx/archive/0.01.tar.gz" ;
"https://github.com/chambart/ocaml-1/archive/lto.tar.gz" ;
"https://github.com/Kappa-Dev/KaSim/archive/v3.5-250915.tar.gz" ;
"https://github.com/bsansouci/bsb-native/archive/1.9.4.tar.gz";
"https://github.com/sanette/oplot/archive/0.7.tar.gz";
"https://github.com/ulrikstrid/ocaml-cookie/releases/download/0.1.8/session-cookie-lwt-0.1.8.tbz";
]
and bad_request = [
"http://cgit.freedesktop.org/cairo-ocaml/snapshot/cairo-ocaml-1.2.0.tar.gz"
]
and not_found = [
"http://pw374.github.io/distrib/frag/frag-0.1.0.tar.gz" ;
"http://pw374.github.io/distrib/glical/glical-0.0.3.tar.gz" ;
"http://pw374.github.io/distrib/glical/glical-0.0.4.tar.gz" ;
"http://pw374.github.io/distrib/glical/glical-0.0.1.tar.gz" ;
"http://pw374.github.io/distrib/glical/glical-0.0.2.tar.gz" ;
"http://pw374.github.io/distrib/glical/glical-0.0.5.tar.gz" ;
"http://pw374.github.io/distrib/glical/glical-0.0.7.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.1.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.0.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.2.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.7.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.3.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.8.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.4.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.1.5.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.2.0.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.2.1.tar.gz" ;
"http://pw374.github.io/distrib/mpp/mpp-0.3.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.3.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.4.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.5.4.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.5.5.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.5.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.6.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.6.2.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.6.3.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.6.4.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.6.5.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.7.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.7.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.7.2.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.7.4.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.7.3.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.7.5.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.8.2.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.8.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.8.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.7.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.0.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.0.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.1.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.1.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.1.2.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.0.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.2.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.4.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.5.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.6.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.2.3.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.3.0.tar.gz" ;
"http://zoggy.github.com/ocamldot/ocamldot-1.0.tar.gz" ;
"http://zoggy.github.io/stog/stog-0.4.tar.gz" ;
"http://zoggy.github.io/genet/genet-0.6.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.6.1.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.4.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.6.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.5.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-0.9.3.tar.gz" ;
"http://pw374.github.io/distrib/omd/omd-1.1.3.tar.gz" ;
"http://coccinelle.lip6.fr/distrib/coccinelle-1.0.0-rc22.tgz" ;
"http://coccinelle.lip6.fr/distrib/coccinelle-1.0.0-rc21.tgz" ;
"http://coccinelle.lip6.fr/distrib/coccinelle-1.0.0.tgz" ;
"http://proverif.inria.fr/proverif1.96pl1.tar.gz" ;
"http://proverif.inria.fr/proverif1.97.tar.gz" ;
"http://proverif.inria.fr/proverif1.98.tar.gz" ;
"http://proverif.inria.fr/proverif1.97pl3.tar.gz" ;
"http://proverif.inria.fr/proverif1.98pl1.tar.gz" ;
"http://proverif.inria.fr/proverif1.97pl1.tar.gz" ;
"https://github.com/jrochel/eliom/archive/6.4.0.tar.gz" ;
"https://github.com/drjdn/ocaml_lua_parser/archive/1.0.1.tar.gz" ;
"https://github.com/sagotch/To.ml/archive/v1.0.0.tar.gz" ;
"https://zoggy.github.io/ocaml-rdf/ocaml-rdf-0.9.0.tar.gz" ;
"https://github.com/sagotch/To.ml/archive/v2.1.0.tar.gz" ;
"https://github.com/sagotch/To.ml/archive/v2.0.0.tar.gz" ;
"https://zoggy.github.io/ocaml-taglog/taglog-0.1.0.tar.gz" ;
"https://zoggy.github.io/ocaml-taglog/taglog-0.2.0.tar.gz" ;
"https://zoggy.github.io/ocf/ocf-0.3.0.tar.gz" ;
"https://zoggy.github.io/ojs-base/ojs-base-0.1.0.tar.gz" ;
"https://zoggy.github.io/stog/plugins/stog-writing-0.8.0.tar.gz" ;
"https://zoggy.github.io/stog/stog-0.13.0.tar.gz" ;
"https://zoggy.github.io/ocaml-taglog/taglog-0.3.0.tar.gz" ;
"https://zoggy.github.io/ocf/ocf-0.1.0.tar.gz" ;
"https://opam.ocaml.org/cache/md5/24/24b163eb77e6832747dccd6cc8a5d57c" ;
]
and forbidden = [
"https://gforge.inria.fr/frs/download.php/33440/heptagon-1.00.06.tar.gz" ;
"https://gforge.inria.fr/frs/download.php/file/33677/dose3-3.2.2.tar.gz" ;
"https://gforge.inria.fr/frs/download.php/file/34920/javalib-2.3.1.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/36092/javalib-2.3.2.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/36093/sawja-1.5.2.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/37154/javalib-2.3.4.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/37403/sawja-1.5.3.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/36307/javalib-2.3.3.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/37655/javalib-2.3.5.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/37656/sawja-1.5.4.tar.bz2" ;
"https://gforge.inria.fr/frs/download.php/file/34921/sawja-1.5.1.tar.bz2" ;
]
and three_o_o = [
"https://github.com/Gbury/dolmen/archive/v0.4.tar.gz" ;
"https://github.com/Stevendeo/Pilat/archive/1.3.tar.gz" ;
"https://github.com/OCamlPro/ocp-indent/archive/1.5.tar.gz" ;
"https://github.com/backtracking/combine/archive/release-0.6.zip" ;
"https://github.com/cakeplus/pa_comprehension/archive/0.4.tar.gz" ;
"https://github.com/cakeplus/mparser/archive/1.0.tar.gz" ;
"https://github.com/chenyukang/rubytt/archive/v0.1.tar.gz" ;
"https://github.com/cakeplus/pa_where/archive/0.4.tar.gz" ;
"https://github.com/metaocaml/ber-metaocaml/archive/ber-n102.tar.gz" ;
"https://github.com/cakeplus/pa_solution/archive/0.5.tar.gz" ;
"https://github.com/cakeplus/mparser/archive/1.2.1.tar.gz" ;
"https://github.com/cakeplus/pa_solution/archive/0.7.tar.gz" ;
"https://github.com/cakeplus/pa_solution/archive/0.6.tar.gz" ;
"https://github.com/mirage/mirage-tcpip/archive/v2.8.1.tar.gz" ;
"https://github.com/modlfo/pla/archive/v1.4.tar.gz" ;
"https://github.com/murmour/pa_qualified/archive/0.5.tar.gz" ;
"https://github.com/ocaml-ppx/ocamlformat/archive/v0.2.tar.gz" ;
"https://github.com/murmour/pa_qualified/archive/0.6.tar.gz" ;
"https://github.com/ocaml-ppx/ocamlformat/archive/support.0.2.tar.gz" ;
"https://github.com/ocaml/oloop/archive/0.1.2.tar.gz" ;
"https://github.com/cakeplus/mparser/archive/1.0.1.tar.gz" ;
"https://github.com/cakeplus/mparser/archive/1.1.tar.gz" ;
"https://github.com/savonet/ocaml-ffmpeg/archive/v1.0.0-rc1.tar.gz" ;
"https://github.com/ocaml/opam2web/archive/2.0.tar.gz" ;
"https://github.com/savonet/ocaml-ffmpeg/archive/v1.0.0.tar.gz" ;
]
and five_o_three = [ "https://gitlab.com/gasche/build_path_prefix_map/repository/0.2/archive.tar.gz" ]
and is_ftp = [ "ftp://ftp.netbsd.org/pub/pkgsrc/distfiles/wyrd-1.4.6.tar.gz" ]
and connect_fails = [
"http://godi.0ok.org/godi-backup/shcaml-0.1.3.tar.gz" ;
"http://www.first.in-berlin.de/software/tools/apalogretrieve/apalogretrieve-0-9-6_4.tgz" ;
"https://cavale.enseeiht.fr/osdp/osdp-0.5.4.tgz" ;
"https://cavale.enseeiht.fr/osdp/osdp-0.6.0.tgz" ;
"https://cavale.enseeiht.fr/osdp/osdp-1.0.0.tgz" ;
]
in
too_big @ hash_mismatch @ bad_request @ not_found @ forbidden @ three_o_o @ five_o_three @ is_ftp @ connect_fails

View file

@ -18,6 +18,7 @@ let mirror =
package "gptar" ; package "gptar" ;
package "oneffs" ; package "oneffs" ;
package "digestif" ; package "digestif" ;
package "swapfs" ;
] ]
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> alpn_client @-> job) (block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> alpn_client @-> job)

View file

@ -1,6 +1,3 @@
let src = Logs.Src.create "opam-file.opam-mirror" ~doc:"Opam file decoding in opam-mirror"
module Log = (val Logs.src_log src : Logs.LOG)
module HM = Archive_checksum.HM module HM = Archive_checksum.HM
let hash_to_string = Archive_checksum.Hash.to_string let hash_to_string = Archive_checksum.Hash.to_string
@ -13,17 +10,16 @@ let hex_of_string s =
let decode_digest filename str = let decode_digest filename str =
let hex h s = let hex h s =
match hex_of_string s with match hex_of_string s with
| Ok d -> Some (h, d) | Ok d -> Ok (h, d)
| Error `Msg msg -> | Error _ as e -> e
Log.warn (fun m -> m "%s invalid hex (%s) %s" filename msg s); None
in in
match String.split_on_char '=' str with match String.split_on_char '=' str with
| [ data ] -> hex `MD5 data | [ data ] -> hex `MD5 data
| [ "md5" ; data ] -> hex `MD5 data | [ "md5" ; data ] -> hex `MD5 data
| [ "sha256" ; data ] -> hex `SHA256 data | [ "sha256" ; data ] -> hex `SHA256 data
| [ "sha512" ; data ] -> hex `SHA512 data | [ "sha512" ; data ] -> hex `SHA512 data
| [ hash ; _ ] -> Log.warn (fun m -> m "%s unknown hash %s" filename hash); None | [ hash ; _ ] -> Error (`Msg ("unknown hash " ^ hash))
| _ -> Log.warn (fun m -> m "%s unexpected hash format %S" filename str); None | _ -> Error (`Msg ("unexpected hash format " ^ str))
let extract_url_checksum filename items = let extract_url_checksum filename items =
let open OpamParserTypes.FullPos in let open OpamParserTypes.FullPos in
@ -42,64 +38,68 @@ let extract_url_checksum filename items =
in in
let url = let url =
match url, archive with match url, archive with
| Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Some url | Some { pelem = Variable (_, { pelem = String url ; _ }) ; _ }, None -> Ok url
| None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Some url | None, Some { pelem = Variable (_, { pelem = String url ; _ }); _ } -> Ok url
| _ -> | _ -> Error (`Msg "neither 'src' nor 'archive' present")
Log.warn (fun m -> m "%s neither src nor archive present" filename); None
in in
let csum = let csum, csum_errs =
match checksum with match checksum with
| Some { pelem = Variable (_, { pelem = List { pelem = csums ; _ } ; _ }); _ } -> | Some { pelem = Variable (_, { pelem = List { pelem = csums ; _ } ; _ }); _ } ->
let csums = let csums, errs =
List.fold_left (fun acc -> List.fold_left (fun (csums, errs) ->
function function
| { pelem = String csum ; _ } -> | { pelem = String csum ; _ } ->
begin match decode_digest filename csum with begin match decode_digest filename csum with
| None -> acc | Error e -> csums, e :: errs
| Some (h, v) -> | Ok (h, v) ->
HM.update h (function HM.update h (function
| None -> Some v | None -> Some v
| Some v' when String.equal v v' -> None | Some v' when String.equal v v' -> None
| Some v' -> | Some v' ->
Log.warn (fun m -> m "for %s, hash %s, multiple keys are present: %s %s" Logs.warn (fun m -> m "for %s, hash %s, multiple keys are present: %s %s"
(Option.value ~default:"NONE" url) (hash_to_string h) (Ohex.encode v) (Ohex.encode v')); (Result.value ~default:"NONE" url) (hash_to_string h) (Ohex.encode v) (Ohex.encode v'));
None) None)
acc csums, errs
end end
| _ -> acc) HM.empty csums | v ->
csums, `Msg (Fmt.str "bad checksum data: %s" (OpamPrinter.FullPos.value v)) :: errs)
(HM.empty, []) csums
in in
Some csums if HM.is_empty csums then
match errs with
| hd :: tl -> Error hd, tl
| [] -> Error (`Msg "empty checksums"), []
else
Ok csums, errs
| Some { pelem = Variable (_, { pelem = String csum ; _ }) ; _ } -> | Some { pelem = Variable (_, { pelem = String csum ; _ }) ; _ } ->
begin match decode_digest filename csum with begin match decode_digest filename csum with
| None -> None | Error _ as e -> e, []
| Some (h, v) -> Some (HM.singleton h v) | Ok (h, v) -> Ok (HM.singleton h v), []
end end
| _ -> | _ -> Error (`Msg "couldn't find or decode 'checksum'"), []
Log.warn (fun m -> m "couldn't decode checksum in %s" filename);
None
in in
match url, csum with (match url, csum with
| Some url, Some cs -> Some (url, cs) | Ok url, Ok csum -> Ok (url, csum)
| _ -> None | Error _ as e, _
| _, (Error _ as e) -> e), csum_errs
let extract_checksums_and_urls filename opam = let extract_checksums_and_urls filename opam =
let open OpamParserTypes.FullPos in let open OpamParserTypes.FullPos in
List.fold_left (fun acc -> List.fold_left (fun (csum_urls, errs) ->
function function
| { pelem = Section ({ section_kind = { pelem = "url" ; _ } ; section_items = { pelem = items ; _ } ; _ }) ; _} -> | { pelem = Section ({ section_kind = { pelem = "url" ; _ } ; section_items = { pelem = items ; _ } ; _ }) ; _} ->
begin match extract_url_checksum filename items with begin match extract_url_checksum filename items with
| None -> acc | Error `Msg msg, errs' -> csum_urls, `Msg ("url: " ^ msg) :: errs' @ errs
| Some url -> url :: acc | Ok url, errs' -> url :: csum_urls, errs' @ errs
end end
| { pelem = Section ({ section_kind = { pelem = "extra-source" ; _ } ; section_name = Some { pelem ; _ } ; section_items = { pelem = items ; _ }; _ }) ; _} -> | { pelem = Section ({ section_kind = { pelem = "extra-source" ; _ } ; section_name = Some { pelem ; _ } ; section_items = { pelem = items ; _ }; _ }) ; _} ->
begin begin
Log.debug (fun m -> m "extracting for extra-source %s in %s" filename pelem);
match extract_url_checksum filename items with match extract_url_checksum filename items with
| None -> acc | Error `Msg msg, errs' -> csum_urls, `Msg ("extra-source " ^ pelem ^ " " ^ msg) :: errs' @ errs
| Some url -> url :: acc | Ok url, errs' -> url :: csum_urls, errs' @ errs
end end
| _ -> acc) | _ -> csum_urls, errs)
[] opam.file_contents ([], []) opam.file_contents
let extract_urls filename str = let extract_urls filename str =
(* in an opam file, there may be: (* in an opam file, there may be:
@ -120,7 +120,6 @@ let extract_urls filename str =
opamfile.file_contents opamfile.file_contents
in in
if unavailable then if unavailable then
(Log.debug (fun m -> m "%s is marked unavailable, skipping" filename); [], []
[])
else else
extract_checksums_and_urls filename opamfile extract_checksums_and_urls filename opamfile

View file

@ -7,12 +7,14 @@ module Make(BLOCK : Mirage_block.S) = struct
type partitions = { type partitions = {
tar : Part.t ; tar : Part.t ;
swap : Part.t ;
git_dump : Part.t ; git_dump : Part.t ;
md5s : Part.t ; md5s : Part.t ;
sha512s : Part.t ; sha512s : Part.t ;
} }
(* I just made these ones up... *) (* I just made these ones up... *)
let swap_guid = Uuidm.of_string "76515dc1-953f-4c59-8b41-90011bdddfcd" |> Option.get
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
@ -54,33 +56,37 @@ module Make(BLOCK : Mirage_block.S) = struct
let connect block = let connect block =
let* info = BLOCK.get_info block in let* info = BLOCK.get_info block in
let* gpt = read_partition_table info block in let* gpt = read_partition_table info block in
let tar, git_dump, md5s, sha512s = let tar, swap, git_dump, md5s, sha512s =
match match
List.fold_left List.fold_left
(fun (tar, git_dump, md5s, sha512s) p -> (fun (tar, swap, git_dump, md5s, sha512s) p ->
if String.equal p.Gpt.Partition.name if String.equal p.Gpt.Partition.name
(utf16be_of_ascii "tar") (utf16be_of_ascii "tar")
then then
(Some p, git_dump, md5s, sha512s) (Some p, swap, git_dump, md5s, sha512s)
else if String.equal p.name else if String.equal p.name
(utf16be_of_ascii "git_dump") (utf16be_of_ascii "git_dump")
then then
(tar, Some p, md5s, sha512s) (tar, swap, Some p, md5s, sha512s)
else if String.equal p.name else if String.equal p.name
(utf16be_of_ascii "md5s") (utf16be_of_ascii "md5s")
then then
(tar, git_dump, Some p, sha512s) (tar, swap, git_dump, Some p, sha512s)
else if String.equal p.name else if String.equal p.name
(utf16be_of_ascii "sha512s") (utf16be_of_ascii "sha512s")
then then
(tar, git_dump, md5s, Some p) (tar, swap, git_dump, md5s, Some p)
else if String.equal p.name
(utf16be_of_ascii "swap")
then
(tar, Some p, git_dump, md5s, sha512s)
else else
Format.kasprintf failwith "Unknown partition %S" p.name) Format.kasprintf failwith "Unknown partition %S" p.name)
(None, None, None, None) (None, None, None, None, None)
gpt.partitions gpt.partitions
with with
| (Some tar, Some git_dump, Some md5s, Some sha512s) -> | (Some tar, Some swap, Some git_dump, Some md5s, Some sha512s) ->
(tar, git_dump, md5s, sha512s) (tar, swap, git_dump, md5s, sha512s)
| _ -> | _ ->
failwith "not all partitions found :(" failwith "not all partitions found :("
in in
@ -91,11 +97,11 @@ module Make(BLOCK : Mirage_block.S) = struct
let (part, _after) = Part.subpartition len after in let (part, _after) = Part.subpartition len after in
part part
in in
let tar = get_part tar and git_dump = get_part git_dump let tar = get_part tar and swap = get_part swap and git_dump = get_part git_dump
and md5s = get_part md5s and sha512s = get_part sha512s in and md5s = get_part md5s and sha512s = get_part sha512s in
{ tar ; git_dump ; md5s ; sha512s } { tar ; swap; git_dump ; md5s ; sha512s }
let format block ~sectors_cache ~sectors_git = let format block ~sectors_cache ~sectors_git ~sectors_swap =
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
let ( let*? ) = Lwt_result.bind in let ( let*? ) = Lwt_result.bind in
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT (* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
@ -144,18 +150,27 @@ module Make(BLOCK : Mirage_block.S) = struct
(Int64.pred md5s.starting_lba) (Int64.pred md5s.starting_lba)
|> Result.get_ok |> Result.get_ok
in in
let swap =
Gpt.Partition.make
~name:(utf16be_of_ascii "swap")
~type_guid:swap_guid
~attributes
(Int64.sub git_dump.starting_lba sectors_swap)
(Int64.pred git_dump.starting_lba)
|> Result.get_ok
in
let tar = let tar =
Gpt.Partition.make Gpt.Partition.make
~name:(utf16be_of_ascii "tar") ~name:(utf16be_of_ascii "tar")
~type_guid:tar_guid ~type_guid:tar_guid
~attributes ~attributes
empty.first_usable_lba empty.first_usable_lba
(Int64.pred git_dump.starting_lba) (Int64.pred swap.starting_lba)
|> Result.get_ok |> Result.get_ok
in in
let gpt = let gpt =
let partitions = let partitions =
[ tar; git_dump; md5s; sha512s ] [ tar; swap; git_dump; md5s; sha512s ]
in in
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|> Result.get_ok |> Result.get_ok

View file

@ -53,6 +53,11 @@ module K = struct
let doc = Arg.info ~doc ["sectors-git"] in let doc = Arg.info ~doc ["sectors-git"] in
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 40L (mul 2L 1024L)) doc) Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 40L (mul 2L 1024L)) doc)
let sectors_swap =
let doc = "Number of sectors reserved for swap. Only used with --initialize-disk" in
let doc = Arg.info ~doc ["sectors-swap"] in
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 1024L 2048L) doc)
let initialize_disk = let initialize_disk =
let doc = "Initialize the disk with a partition table. THIS IS DESTRUCTIVE!" in let doc = "Initialize the disk with a partition table. THIS IS DESTRUCTIVE!" in
let doc = Arg.info ~doc ["initialize-disk"] in let doc = Arg.info ~doc ["initialize-disk"] in
@ -75,6 +80,7 @@ module Make
module Part = Partitions.Make(BLOCK) module Part = Partitions.Make(BLOCK)
module KV = Tar_mirage.Make_KV_RW(Pclock)(Part) module KV = Tar_mirage.Make_KV_RW(Pclock)(Part)
module Cache = OneFFS.Make(Part) module Cache = OneFFS.Make(Part)
module Swap = Swapfs.Make(Part)
module Store = Git_kv.Make(Pclock) module Store = Git_kv.Make(Pclock)
module SM = Map.Make(String) module SM = Map.Make(String)
@ -95,93 +101,134 @@ module Make
hash_to_string h ^ "=" ^ Ohex.encode v ^ "\n" ^ acc) hash_to_string h ^ "=" ^ Ohex.encode v ^ "\n" ^ acc)
hm "" hm ""
module Git = struct let parse_errors = ref SM.empty
let find_contents store =
let rec go store path acc =
Store.list store path >>= function
| Error e ->
Logs.err (fun m -> m "error %a while listing %a"
Store.pp_error e Mirage_kv.Key.pp path);
Lwt.return acc
| Ok steps ->
Lwt_list.fold_left_s (fun acc (step, _) ->
Store.exists store step >>= function
| Error e ->
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e
Mirage_kv.Key.pp step);
Lwt.return acc
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
Lwt.return acc
| Ok Some `Value -> Lwt.return (step :: acc)
| Ok Some `Dictionary -> go store step acc) acc steps
in
go store Mirage_kv.Key.empty []
let find_urls store = let reset_parse_errors () = parse_errors := SM.empty
find_contents store >>= fun paths ->
let opam_paths = let add_parse_error filename error =
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths parse_errors := SM.add filename error !parse_errors
module Git = struct
let contents store =
let explore = ref [ Mirage_kv.Key.empty ] in
let more () =
let rec go () =
match !explore with
| [] -> Lwt.return None
| step :: tl ->
explore := tl;
Store.exists store step >>= function
| Error e -> go ()
| Ok None -> go ()
| Ok Some `Value -> Lwt.return (Some step)
| Ok Some `Dictionary ->
Store.list store step >>= function
| Error e -> go ()
| Ok steps ->
explore := List.map fst steps @ !explore;
go ()
in
go ()
in in
Lwt_list.fold_left_s (fun acc path -> Lwt_stream.from more
Store.get store path >|= function
| Ok data -> let find_urls acc path data =
(* TODO report parser errors *) if Mirage_kv.Key.basename path = "opam" then
(try let path = Mirage_kv.Key.to_string path in
let url_csums = Opam_file.extract_urls (Mirage_kv.Key.to_string path) data in let url_csums, errs = Opam_file.extract_urls path data in
List.fold_left (fun acc (url, csums) -> List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
if HM.cardinal csums = 0 then List.fold_left (fun acc (url, csums) ->
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc) if HM.cardinal csums = 0 then
else (add_parse_error path ("no checksums for " ^ url);
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
csums'
then
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
None
end) acc) acc url_csums
with _ ->
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
acc) acc)
| Error e -> Logs.warn (fun m -> m "Store.get: %a" Store.pp_error e); acc) else
SM.empty opam_paths SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
csums'
then
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
add_parse_error path (Fmt.str
"mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
None
end) acc) acc url_csums
else
acc
end end
let active_downloads = ref SM.empty let active_downloads = ref SM.empty
let add_to_active url ts = let add_to_active url ts =
active_downloads := SM.add url (ts, 0, "unknown size") !active_downloads active_downloads := SM.add url (ts, 0) !active_downloads
let remove_active url = let remove_active url =
active_downloads := SM.remove url !active_downloads active_downloads := SM.remove url !active_downloads
let active_length url written length =
match SM.find_opt url !active_downloads with
| None -> ()
| Some (ts, written', _) ->
active_downloads := SM.add url (ts, written + written', length)
!active_downloads
let active_add_bytes url written = let active_add_bytes url written =
match SM.find_opt url !active_downloads with match SM.find_opt url !active_downloads with
| None -> () | None -> ()
| Some (ts, written', l) -> | Some (ts, written') ->
active_downloads := SM.add url (ts, written + written', l) active_downloads := SM.add url (ts, written + written')
!active_downloads !active_downloads
let failed_downloads = ref SM.empty let failed_downloads = ref SM.empty
let reset_failed_downloads () = failed_downloads := SM.empty
let add_failed url ts reason = let add_failed url ts reason =
remove_active url; remove_active url;
failed_downloads := SM.add url (ts, reason) !failed_downloads failed_downloads := SM.add url (ts, reason) !failed_downloads
let pp_failed ppf = function
| `Write_error e ->
KV.pp_write_error ppf e
| `Swap e ->
Swap.pp_error ppf e
| `Bad_checksum (hash, computed, expected) ->
Fmt.pf ppf "%s checksum: computed %s expected %s"
(hash_to_string hash)
(Ohex.encode computed)
(Ohex.encode expected)
| `Bad_response (status, reason) ->
Fmt.pf ppf "%a %s" H2.Status.pp_hum status reason
| `Mimic me ->
Mimic.pp_error ppf me
let key_of_failed = function
| `Write_error _ -> `Write_error
| `Swap _ -> `Swap
| `Bad_checksum _ -> `Bad_checksum
| `Bad_response _ -> `Bad_response
| `Mimic _ -> `Mimic
let compare_failed_key a b = match a, b with
| `Write_error, `Write_error -> 0
| `Write_error, _ -> -1
| _, `Write_error -> 1
| `Swap, `Swap -> 0
| `Swap, _ -> -1
| _, `Swap -> 1
| `Bad_checksum, `Bad_checksum -> 0
| `Bad_checksum, _ -> -1
| _, `Bad_checksum -> 1
| `Bad_response, `Bad_response -> 0
| `Bad_response, _ -> -1
| _, `Bad_response -> 1
| `Mimic, `Mimic -> 0
let pp_key ppf = function
| `Write_error -> Fmt.pf ppf "Write error"
| `Swap -> Fmt.pf ppf "Swap error"
| `Bad_checksum -> Fmt.pf ppf "Bad checksum"
| `Bad_response -> Fmt.pf ppf "Bad response"
| `Mimic -> Fmt.pf ppf "Mimic"
module Disk = struct module Disk = struct
type t = { type t = {
mutable md5s : string SM.t ; mutable md5s : string SM.t ;
@ -189,13 +236,10 @@ module Make
dev : KV.t ; dev : KV.t ;
dev_md5s : Cache.t ; dev_md5s : Cache.t ;
dev_sha512s : Cache.t ; dev_sha512s : Cache.t ;
dev_swap : Swap.t ;
} }
let pending = Mirage_kv.Key.v "pending" let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
let to_delete = Mirage_kv.Key.v "to-delete"
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
let marshal_sm (sm : string SM.t) = let marshal_sm (sm : string SM.t) =
let version = char_of_int 1 in let version = char_of_int 1 in
@ -210,11 +254,11 @@ module Make
let update_caches t = let update_caches t =
Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r -> Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r ->
(match r with (match r with
| Ok () -> Logs.info (fun m -> m "Set 'md5s'") | Ok () -> ()
| Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e)); | Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e));
Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r -> Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r ->
match r with match r with
| Ok () -> Logs.info (fun m -> m "Set 'sha512s'"); Lwt.return_unit | Ok () -> Lwt.return_unit
| Error e -> | Error e ->
Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e); Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e);
Lwt.return_unit Lwt.return_unit
@ -240,8 +284,6 @@ module Make
| Ok key -> | Ok key ->
KV.size t.dev key >>= function KV.size t.dev key >>= function
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Lwt.return (Error (`Not_found key)) Lwt.return (Error (`Not_found key))
| Ok len -> | Ok len ->
let chunk_size = 4096 in let chunk_size = 4096 in
@ -252,121 +294,27 @@ module Make
f a data >>= fun a -> f a data >>= fun a ->
read_more a Optint.Int63.(add offset (of_int chunk_size)) read_more a Optint.Int63.(add offset (of_int chunk_size))
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Lwt.return (Error e) Lwt.return (Error e)
else else
Lwt.return (Ok a) Lwt.return (Ok a)
in in
read_more a Optint.Int63.zero read_more a Optint.Int63.zero
(* let init_write t csums =
module HM_running = struct let quux, csums = Archive_checksum.init_write csums in
let swap = Swap.empty t.dev_swap in
let empty h = quux, Ok (csums, swap)
let module H = (val Mirage_crypto.Hash.module_of h) in
(* We need MD5, SHA256 and SHA512. [h] is likely one of the
aforementioned and in that case we don't compute the same hash twice
*)
HM.empty
|> HM.add `MD5 Mirage_crypto.Hash.MD5.empty
|> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty
|> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty
|> HM.add h H.empty
let feed t data =
HM.map (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.feed v data)
t
let get =
HM.map (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.get v)
end
*)
let content_length_of_string s =
match Int64.of_string s with
| len when len >= 0L -> `Fixed len
| _ | exception _ -> `Bad_response
let body_length headers =
match H2.Headers.get_multi headers "content-length" with
| [] -> `Unknown
| [ x ] -> content_length_of_string x
| hd :: tl ->
(* if there are multiple content-length headers we require them all to be
* exactly equal. *)
if List.for_all (String.equal hd) tl then
content_length_of_string hd
else
`Bad_response
let body_length (response : Http_mirage_client.response) =
if response.status <> `OK then
`Bad_response
else
body_length response.headers
let pending_key (hash, csum) =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
| _ ->
Mirage_kv.Key.(pending / hash_to_string hash / Ohex.encode csum)
let to_delete_key (hash, csum) =
let rand = "random" in (* FIXME: generate random string *)
let encoded_csum =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum
| _ ->
Ohex.encode csum
in
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
let write_partial t (hash, csum) url = let write_partial t (hash, csum) url =
(* XXX: we may be in trouble if different hash functions are used for the same archive *) (* XXX: we may be in trouble if different hash functions are used for the same archive *)
let key = pending_key (hash, csum) in
let ( >>>= ) = Lwt_result.bind in let ( >>>= ) = Lwt_result.bind in
fun response r data -> fun response r data ->
Lwt.return r >>>= fun (digests, acc) -> Lwt.return r >>>= fun (digests, swap) ->
let digests = Archive_checksum.update_digests digests data in let digests = Archive_checksum.update_digests digests data in
match acc with active_add_bytes url (String.length data);
| `Init -> Swap.append swap data >|= function
begin match body_length response with | Ok () -> Ok (digests, swap)
| `Bad_response -> Lwt.return (Error `Bad_response) | Error swap_err -> Error (`Swap swap_err)
| `Fixed size ->
KV.allocate t.dev key (Optint.Int63.of_int64 size)
|> Lwt_result.map_error (fun e -> `Write_error e)
>>>= fun () ->
KV.set_partial t.dev key ~offset:Optint.Int63.zero data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.of_int len in
active_length url len (Int64.to_string size ^ " bytes");
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown ->
active_add_bytes url (String.length data);
Lwt.return_ok (digests, `Unknown data)
end
| `Fixed_body (size, offset) ->
KV.set_partial t.dev key ~offset data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.(add offset (of_int len)) in
active_add_bytes url len;
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown body ->
active_add_bytes url (String.length data);
Lwt.return_ok (digests, `Unknown (body ^ data))
let check_csums_digests csums digests = let check_csums_digests csums digests =
let csums' = Archive_checksum.digests_to_hm digests in let csums' = Archive_checksum.digests_to_hm digests in
@ -376,126 +324,90 @@ module Make
(fun (h, csum) -> String.equal csum (HM.find h csums)) (fun (h, csum) -> String.equal csum (HM.find h csums))
common_bindings common_bindings
let finalize_write t (hash, csum) ~url (body : [ `Unknown of string | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests = let set_from_handle dev dest h =
let sizes_match, body_size_in_header = (* TODO: we need a function in tar which
match body with (a) takes a path
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true (b) takes a function that reads (from the swap) and writes (to the tar)
| `Unknown _ -> true, false (c) once the function is finished, it writes the tar header
| `Init -> assert false -> this would allow us to avoid the rename stuff below
*)
let size = Optint.Int63.of_int64 (Swap.size h) in
KV.allocate dev dest size >>= fun r ->
let rec loop offset =
if offset = Swap.size h then
Lwt.return_ok ()
else
let length = Int64.(to_int (min 4096L (sub (Swap.size h) offset))) in
Swap.get_partial h ~offset ~length >>= fun r ->
match r with
| Error e -> Lwt.return (Error (`Swap e))
| Ok data ->
KV.set_partial dev dest ~offset:(Optint.Int63.of_int64 offset) data
>>= fun r ->
match r with
| Error e -> Lwt.return (Error (`Write_error e))
| Ok () ->
loop Int64.(add offset (of_int length))
in in
let source = pending_key (hash, csum) in match r with
if check_csums_digests csums digests && sizes_match then | Ok () ->
loop 0L
| Error e ->
Lwt.return (Error (`Write_error e))
let finalize_write t (hash, csum) ~url swap csums digests =
if check_csums_digests csums digests then
let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256)) let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256))
and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5)) and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5))
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
let dest = Mirage_kv.Key.v sha256 in let dest = Mirage_kv.Key.v sha256 in
begin match body with let temp = Mirage_kv.Key.(v "pending" // dest) in
| `Unknown body -> Lwt_result.bind
Logs.info (fun m -> m "downloaded %s, now writing" url); (Lwt.finalize (fun () -> set_from_handle t.dev temp swap)
KV.set t.dev dest body (fun () -> Swap.free swap))
| `Fixed_body (_reported_size, _actual_size) -> (fun () -> KV.rename t.dev ~source:temp ~dest
Logs.info (fun m -> m "downloaded %s" url); |> Lwt_result.map_error (fun e -> `Write_error e))
KV.rename t.dev ~source ~dest >|= function
| `Init -> assert false
end >|= function
| Ok () -> | Ok () ->
remove_active url; remove_active url;
t.md5s <- SM.add md5 sha256 t.md5s; t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s t.sha512s <- SM.add sha512 sha256 t.sha512s
| Error e -> | Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e); | Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Write failure for %s: %a" url KV.pp_write_error e)
else begin else begin
(if sizes_match then begin add_failed url (Ptime.v (Pclock.now_d_ps ()))
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Bad_checksum (hash, Archive_checksum.get digests hash, csum));
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url Lwt.return_unit
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum));
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum))
end else match body with
| `Fixed_body (reported, actual) ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported);
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported)
| `Unknown _ -> assert false
| `Init -> assert false);
if body_size_in_header then
(* if the checksums mismatch we want to delete the file. We are only
able to do so if it was the latest created file, so we expect and
error. Ideally, we want to match for `Append_only or other errors *)
KV.remove t.dev source >>= function
| Ok () -> Lwt.return_unit
| Error e ->
(* we failed to delete the file so we mark it for deletion *)
let dest = to_delete_key (hash, csum) in
Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a"
Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest);
KV.rename t.dev ~source ~dest >|= function
| Ok () -> ()
| Error e ->
Logs.warn (fun m -> m "Error renaming file %a -> %a: %a"
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e)
else
Lwt.return_unit
end end
(* on disk, we use a flat file system where the filename is the sha256 of the data *) (* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s = let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
KV.list dev Mirage_kv.Key.empty >>= function KV.list dev Mirage_kv.Key.empty >>= function
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false | Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e)
| Ok entries -> | Ok entries ->
let t = empty dev dev_md5s dev_sha512s in let t = empty dev dev_md5s dev_sha512s dev_swap in
Cache.read t.dev_md5s >>= fun r -> Cache.read t.dev_md5s >>= fun r ->
(match r with (match r with
| Ok Some s -> | Ok Some s ->
if not verify_sha256 then if not verify_sha256 then
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s) Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
| Ok None -> Logs.debug (fun m -> m "No md5s cached") | Ok None -> ()
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e)); | Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
Cache.read t.dev_sha512s >>= fun r -> Cache.read t.dev_sha512s >>= fun r ->
(match r with (match r with
| Ok Some s -> | Ok Some s ->
if not verify_sha256 then if not verify_sha256 then
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s) Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
| Ok None -> Logs.debug (fun m -> m "No sha512s cached") | Ok None -> ()
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e)); | Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
let idx = ref 1 in
(* XXX: should we do something about pending downloads?? *)
let entries =
List.filter (fun (p, _) ->
not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete))
entries
in
Lwt_list.iter_s (fun (path, typ) -> Lwt_list.iter_s (fun (path, typ) ->
if !idx mod 10 = 0 then Gc.full_major () ;
match typ with match typ with
| `Dictionary -> | `Dictionary -> Lwt.return_unit
Logs.warn (fun m -> m "unexpected dictionary at %a" Mirage_kv.Key.pp path);
Lwt.return_unit
| `Value -> | `Value ->
let open Digestif in let open Digestif in
let sha256_final = let md5_final =
if verify_sha256 then
let f s =
let digest = SHA256.(to_raw_string (get s)) in
if not (String.equal (Mirage_kv.Key.basename path) (Ohex.encode digest)) then
Logs.err (fun m -> m "corrupt SHA256 data for %a, \
computed %s (should remove)"
Mirage_kv.Key.pp path (Ohex.encode digest))
in
Some f
else
None
and md5_final =
if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then
let f s = let f s =
let digest = MD5.(to_raw_string (get s)) in let digest = MD5.(to_raw_string (get s)) in
@ -514,26 +426,52 @@ module Make
else else
None None
in in
match sha256_final, md5_final, sha512_final with let sha256_final =
| None, None, None -> Lwt.return_unit let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in
| _ -> if need_to_compute then
let f s =
let digest = SHA256.(to_raw_string (get s)) in
if not (String.equal (Mirage_kv.Key.basename path) (Ohex.encode digest)) then
begin
Logs.err (fun m -> m "corrupt SHA256 data for %a, \
computed %s (will rename)"
Mirage_kv.Key.pp path (Ohex.encode digest));
false
end else true
in
Some f
else
None
in
match sha256_final with
| None -> Lwt.return_unit
| Some f ->
read_chunked t `SHA256 path read_chunked t `SHA256 path
(fun (sha256, md5, sha512) data -> (fun (sha256, md5, sha512) data ->
Lwt.return Lwt.return
(Option.map (fun t -> SHA256.feed_string t data) sha256, (SHA256.feed_string sha256 data,
Option.map (fun t -> MD5.feed_string t data) md5, Option.map (fun t -> MD5.feed_string t data) md5,
Option.map (fun t -> SHA512.feed_string t data) sha512)) Option.map (fun t -> SHA512.feed_string t data) sha512))
(Option.map (fun _ -> SHA256.empty) sha256_final, (SHA256.empty,
Option.map (fun _ -> MD5.empty) md5_final, Option.map (fun _ -> MD5.empty) md5_final,
Option.map (fun _ -> SHA512.empty) sha512_final) >|= function Option.map (fun _ -> SHA512.empty) sha512_final) >>= function
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a of %a while computing digests" Logs.err (fun m -> m "error %a of %a while computing digests"
KV.pp_error e Mirage_kv.Key.pp path) KV.pp_error e Mirage_kv.Key.pp path);
Lwt.return_unit
| Ok (sha256, md5, sha512) -> | Ok (sha256, md5, sha512) ->
Option.iter (fun f -> f (Option.get sha256)) sha256_final; if not (f sha256) then
Option.iter (fun f -> f (Option.get md5)) md5_final; (* bad sha256! *)
Option.iter (fun f -> f (Option.get sha512)) sha512_final; KV.rename t.dev ~source:path ~dest:(Mirage_kv.Key.(v "delete" // path)) >|= function
Logs.info (fun m -> m "added %a" Mirage_kv.Key.pp path)) | Ok () -> ()
| Error we ->
Logs.err (fun m -> m "error %a while renaming %a" KV.pp_write_error we
Mirage_kv.Key.pp path)
else begin
Option.iter (fun f -> f (Option.get md5)) md5_final;
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
Lwt.return_unit
end)
entries >>= fun () -> entries >>= fun () ->
update_caches t >|= fun () -> update_caches t >|= fun () ->
t t
@ -544,15 +482,9 @@ module Make
| Ok x -> | Ok x ->
KV.exists t.dev x >|= function KV.exists t.dev x >|= function
| Ok Some `Value -> true | Ok Some `Value -> true
| Ok Some `Dictionary -> | Ok Some `Dictionary -> false
Logs.err (fun m -> m "unexpected dictionary for %s %a"
(hash_to_string h) Mirage_kv.Key.pp v);
false
| Ok None -> false | Ok None -> false
| Error e -> | Error _ -> false
Logs.err (fun m -> m "exists %s %a returned %a"
(hash_to_string h) Mirage_kv.Key.pp v KV.pp_error e);
false
let last_modified t h v = let last_modified t h v =
match find_key t h v with match find_key t h v with
@ -560,10 +492,7 @@ module Make
| Ok x -> | Ok x ->
KV.last_modified t.dev x >|= function KV.last_modified t.dev x >|= function
| Ok data -> Ok data | Ok data -> Ok data
| Error e -> | Error _ -> Error `Not_found
Logs.err (fun m -> m "error %a while last_modified %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Error `Not_found
let size t h v = let size t h v =
match find_key t h v with match find_key t h v with
@ -571,10 +500,7 @@ module Make
| Ok x -> | Ok x ->
KV.size t.dev x >|= function KV.size t.dev x >|= function
| Ok s -> Ok s | Ok s -> Ok s
| Error e -> | Error _ -> Error `Not_found
Logs.err (fun m -> m "error %a while size %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Error `Not_found
end end
module Tarball = struct module Tarball = struct
@ -614,22 +540,23 @@ module Make
then Tar.High (High.inj (Lwt.return_ok None)) then Tar.High (High.inj (Lwt.return_ok None))
else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end
let entries_of_git ~mtime store repo = let entries_of_git ~mtime store repo urls =
Git.find_contents store >>= fun paths -> let entries = Git.contents store in
let entries = Lwt_stream.of_list paths in
let to_entry path = let to_entry path =
Store.get store path >|= function Store.get store path >|= function
| Ok data -> | Ok data ->
let data = let data =
if Mirage_kv.Key.(equal path (v "repo")) if Mirage_kv.Key.(equal path (v "repo"))
then repo else data in then repo else data
in
let file_mode = 0o644 let file_mode = 0o644
and mod_time = Int64.of_int mtime and mod_time = Int64.of_int mtime
and user_id = 0 and user_id = 0
and group_id = 0 and group_id = 0
and size = String.length data in and size = String.length data in
let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
(Mirage_kv.Key.to_string path) (Int64.of_int size) in (Mirage_kv.Key.to_string path) (Int64.of_int size) in
urls := Git.find_urls !urls path data;
Some (Some Tar.Header.Ustar, hdr, once data) Some (Some Tar.Header.Ustar, hdr, once data)
| Error _ -> None in | Error _ -> None in
let entries = Lwt_stream.filter_map_s to_entry entries in let entries = Lwt_stream.filter_map_s to_entry entries in
@ -638,12 +565,13 @@ module Make
let of_git repo store = let of_git repo store =
let now = Ptime.v (Pclock.now_d_ps ()) in let now = Ptime.v (Pclock.now_d_ps ()) in
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
entries_of_git ~mtime store repo >>= fun entries -> let urls = ref SM.empty in
entries_of_git ~mtime store repo urls >>= fun entries ->
let t = Tar.out ~level:Ustar entries in let t = Tar.out ~level:Ustar entries in
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
let buf = Buffer.create 1024 in let buf = Buffer.create 1024 in
to_buffer buf t >|= function to_buffer buf t >|= function
| Ok () -> Buffer.contents buf | Ok () -> Buffer.contents buf, !urls
| Error (`Msg msg) -> failwith msg | Error (`Msg msg) -> failwith msg
end end
@ -696,8 +624,8 @@ stamp: %S
commit_id git_kv >>= fun commit_id -> commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified -> modified git_kv >>= fun modified ->
let repo = repo remote commit_id in let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index -> Tarball.of_git repo git_kv >|= fun (index, urls) ->
{ commit_id ; modified ; repo ; index } { commit_id ; modified ; repo ; index }, urls
let update_lock = Lwt_mutex.create () let update_lock = Lwt_mutex.create ()
@ -710,51 +638,87 @@ stamp: %S
Lwt.return None Lwt.return None
| Ok [] -> | Ok [] ->
Logs.info (fun m -> m "git changes are empty"); Logs.info (fun m -> m "git changes are empty");
Lwt.return (Some []) Lwt.return (Some ([], SM.empty))
| Ok changes -> | Ok changes ->
commit_id git_kv >>= fun commit_id -> commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified -> modified git_kv >>= fun modified ->
Logs.info (fun m -> m "git: %s" commit_id); Logs.info (fun m -> m "git: %s" commit_id);
let repo = repo remote commit_id in let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index -> reset_parse_errors ();
Tarball.of_git repo git_kv >|= fun (index, urls) ->
t.commit_id <- commit_id ; t.commit_id <- commit_id ;
t.modified <- modified ; t.modified <- modified ;
t.repo <- repo ; t.repo <- repo ;
t.index <- index; t.index <- index;
Some changes) Some (changes, urls))
let status disk = let status t disk =
(* report status: (* report status:
- archive size (can we easily measure?) and number of "good" elements - archive size (can we easily measure?) and number of "good" elements
- list of current downloads - list of current downloads
- list of failed downloads - list of failed downloads
*) *)
let archive_stats = let archive_stats =
Fmt.str "<ul><li>%u validated archives on disk</li><li>%Lu bytes free</li></ul>" Fmt.str "<ul><li>commit %s</li><li>last modified %s</li><li>repo %s</li><li>%u validated archives on disk</li><li>%Lu bytes free</li></ul>"
t.commit_id t.modified (K.remote ())
(SM.cardinal disk.Disk.md5s) (SM.cardinal disk.Disk.md5s)
(KV.free disk.Disk.dev) (KV.free disk.Disk.dev)
in in
let sort_by_ts a b = Ptime.compare a b in
let active_downloads = let active_downloads =
let header = "<h2>Active downloads</h2><ul>" in let header = "<h2>Active downloads</h2><ul>" in
let content = let content =
SM.fold (fun url (ts, bytes_written, length_or_unknown) acc -> SM.bindings !active_downloads |>
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to disk, " ^ length_or_unknown ^ "</li>") List.sort (fun (_, (a, _)) (_, (b, _)) -> sort_by_ts a b) |>
^ acc) List.map (fun (url, (ts, bytes_written)) ->
!active_downloads "" "<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to swap</li>")
in in
header ^ content ^ "</ul>" header ^ String.concat "" content ^ "</ul>"
and failed_downloads = and failed_downloads =
let header = "<h2>Failed downloads</h2><ul>" in let header = "<h2>Failed downloads</h2>" in
let content = let group_by xs =
SM.fold (fun url (ts, reason) acc -> let t = Hashtbl.create 7 in
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ reason ^ "</li>") List.iter (fun ((_, (_, reason)) as e) ->
^ acc) let k = key_of_failed reason in
!failed_downloads "" let els = Option.value ~default:[] (Hashtbl.find_opt t k) in
Hashtbl.replace t k (e :: els))
xs;
Hashtbl.fold (fun k els acc ->
let sorted =
List.sort (fun (_, (tsa, _)) (_, (tsb, _)) ->
sort_by_ts tsa tsb)
els
in
(k, sorted) :: acc)
t []
in in
header ^ content ^ "</ul>" let content =
SM.bindings !failed_downloads |>
group_by |>
List.sort (fun (a, _) (b, _) -> compare_failed_key a b) |>
List.map (fun (key, els) ->
let header = Fmt.str "<h3>%a</h3><ul>" pp_key key in
let content =
List.map (fun (url, (ts, reason)) ->
Fmt.str "<li>%s: %s error %a"
(Ptime.to_rfc3339 ?tz_offset_s:None ts) url pp_failed reason)
els
in
header ^ String.concat "" content ^ "</ul>")
in
header ^ String.concat "" content
and parse_errors =
let header = "<h2>Parse errors</h2><ul>" in
let content =
SM.bindings !parse_errors |>
List.sort (fun (a, _) (b, _) -> String.compare a b) |>
List.map (fun (filename, reason) ->
"<li>" ^ filename ^ ": " ^ reason ^ "</li>")
in
header ^ String.concat "" content ^ "</ul>"
in in
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>" "<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>"
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ] ^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ; parse_errors ]
^ "</div></body></html>" ^ "</div></body></html>"
let not_modified request (modified, etag) = let not_modified request (modified, etag) =
@ -794,7 +758,6 @@ stamp: %S
else *) else *)
let dispatch t store hook_url update _flow _conn reqd = let dispatch t store hook_url update _flow _conn reqd =
let request = Httpaf.Reqd.request reqd in let request = Httpaf.Reqd.request reqd in
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
match String.split_on_char '/' request.Httpaf.Request.target with match String.split_on_char '/' request.Httpaf.Request.target with
| [ ""; x ] when String.equal x hook_url -> | [ ""; x ] when String.equal x hook_url ->
Lwt.async update; Lwt.async update;
@ -810,7 +773,7 @@ stamp: %S
let resp = Httpaf.Response.create ~headers `OK in let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data Httpaf.Reqd.respond_with_string reqd resp data
| [ ""; x ] when String.equal x "status" -> | [ ""; x ] when String.equal x "status" ->
let data = status store in let data = status t store in
let mime_type = "text/html" in let mime_type = "text/html" in
let headers = [ let headers = [
"content-type", mime_type ; "content-type", mime_type ;
@ -857,15 +820,12 @@ stamp: %S
begin begin
match hash_of_string hash_algo with match hash_of_string hash_algo with
| Error `Msg msg -> | Error `Msg msg ->
Logs.warn (fun m -> m "error decoding hash algo: %s" msg);
not_found reqd request.Httpaf.Request.target not_found reqd request.Httpaf.Request.target
| Ok h -> | Ok h ->
let hash = Mirage_kv.Key.v hash in let hash = Mirage_kv.Key.v hash in
Lwt.async (fun () -> Lwt.async (fun () ->
(Disk.last_modified store h hash >|= function (Disk.last_modified store h hash >|= function
| Error _ -> | Error _ -> t.modified
Logs.warn (fun m -> m "error retrieving last modified");
t.modified
| Ok v -> ptime_to_http_date v) >>= fun last_modified -> | Ok v -> ptime_to_http_date v) >>= fun last_modified ->
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
let resp = Httpaf.Response.create `Not_modified in let resp = Httpaf.Response.create `Not_modified in
@ -874,7 +834,6 @@ stamp: %S
else else
Disk.size store h hash >>= function Disk.size store h hash >>= function
| Error _ -> | Error _ ->
Logs.warn (fun m -> m "error retrieving size");
not_found reqd request.Httpaf.Request.target; not_found reqd request.Httpaf.Request.target;
Lwt.return_unit Lwt.return_unit
| Ok size -> | Ok size ->
@ -904,54 +863,39 @@ stamp: %S
end end
let bad_archives = SSet.of_list Bad.archives let download_archives parallel_downloads disk http_client urls =
let download_archives parallel_downloads disk http_client store =
(* FIXME: handle resuming partial downloads *) (* FIXME: handle resuming partial downloads *)
Git.find_urls store >>= fun urls -> reset_failed_downloads ();
let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) -> Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () -> Lwt_pool.use pool @@ fun () ->
(* FIXME: check pending and to-delete *)
HM.fold (fun h v r -> HM.fold (fun h v r ->
r >>= function r >>= function
| true -> Disk.exists disk h (hex_to_key v) | true -> Disk.exists disk h (hex_to_key v)
| false -> Lwt.return false) | false -> Lwt.return false)
csums (Lwt.return true) >>= function csums (Lwt.return true) >>= function
| true -> | true -> Lwt.return_unit
Logs.debug (fun m -> m "ignoring %s (already present)" url);
Lwt.return_unit
| false -> | false ->
incr idx; let quux, body_init = Disk.init_write disk csums in
if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url);
let quux, body_init = Archive_checksum.init_write csums in
add_to_active url (Ptime.v (Pclock.now_d_ps ())); add_to_active url (Ptime.v (Pclock.now_d_ps ()));
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
| Ok (resp, r) -> | Ok (resp, r) ->
begin match r with begin match r with
| Error `Bad_response -> | Error `Bad_response ->
Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason);
add_failed url (Ptime.v (Pclock.now_d_ps ())) add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason); (`Bad_response (resp.status, resp.reason));
Lwt.return_unit Lwt.return_unit
| Error `Write_error e -> | Error `Write_error e ->
Logs.err (fun m -> m "%s: write error %a %a" add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
url Lwt.return_unit
Mirage_kv.Key.pp (Disk.pending_key quux) | Error `Swap e ->
KV.pp_write_error e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "write error: %a" KV.pp_write_error e);
Lwt.return_unit Lwt.return_unit
| Ok (digests, body) -> | Ok (digests, body) ->
Disk.finalize_write disk quux ~url body csums digests Disk.finalize_write disk quux ~url body csums digests
end end
| Error me -> | Error me ->
add_failed url (Ptime.v (Pclock.now_d_ps ())) add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
(Fmt.str "mimic error: %a" Mimic.pp_error me);
Lwt.return_unit) Lwt.return_unit)
(SM.bindings urls) >>= fun () -> (SM.bindings urls) >>= fun () ->
Disk.update_caches disk >|= fun () -> Disk.update_caches disk >|= fun () ->
@ -980,13 +924,14 @@ stamp: %S
module Paf = Paf_mirage.Make(Stack.TCP) module Paf = Paf_mirage.Make(Stack.TCP)
let start_mirror { Part.tar; git_dump; md5s; sha512s } stack git_ctx http_ctx = let start_mirror { Part.tar; swap; git_dump; md5s; sha512s } stack git_ctx http_ctx =
KV.connect tar >>= fun kv -> KV.connect tar >>= fun kv ->
Cache.connect git_dump >>= fun git_dump -> Cache.connect git_dump >>= fun git_dump ->
Cache.connect md5s >>= fun md5s -> Cache.connect md5s >>= fun md5s ->
Cache.connect sha512s >>= fun sha512s -> Cache.connect sha512s >>= fun sha512s ->
Swap.connect swap >>= fun swap ->
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv)); Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s >>= fun disk -> Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk ->
let remote = K.remote () in let remote = K.remote () in
if K.check () then if K.check () then
Lwt.return_unit Lwt.return_unit
@ -1006,14 +951,14 @@ stamp: %S
Logs.info (fun m -> m "Done initializing git state!"); Logs.info (fun m -> m "Done initializing git state!");
Serve.commit_id git_kv >>= fun commit_id -> Serve.commit_id git_kv >>= fun commit_id ->
Logs.info (fun m -> m "git: %s" commit_id); Logs.info (fun m -> m "git: %s" commit_id);
Serve.create remote git_kv >>= fun serve -> Serve.create remote git_kv >>= fun (serve, urls) ->
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t -> Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
let update () = let update () =
Serve.update_git ~remote serve git_kv >>= function Serve.update_git ~remote serve git_kv >>= function
| None | Some [] -> Lwt.return_unit | None | Some ([], _) -> Lwt.return_unit
| Some _changes -> | Some (_changes, urls) ->
dump_git git_dump git_kv >>= fun () -> dump_git git_dump git_kv >>= fun () ->
download_archives (K.parallel_downloads ()) disk http_ctx git_kv download_archives (K.parallel_downloads ()) disk http_ctx urls
in in
let service = let service =
Paf.http_service Paf.http_service
@ -1029,15 +974,16 @@ stamp: %S
go () go ()
in in
go ()); go ());
download_archives (K.parallel_downloads ()) disk http_ctx git_kv >>= fun () -> download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
(th >|= fun _v -> ()) (th >|= fun _v -> ())
let start block _time _pclock stack git_ctx http_ctx = let start block _time _pclock stack git_ctx http_ctx =
let initialize_disk = K.initialize_disk () let initialize_disk = K.initialize_disk ()
and sectors_cache = K.sectors_cache () and sectors_cache = K.sectors_cache ()
and sectors_git = K.sectors_git () in and sectors_git = K.sectors_git ()
and sectors_swap = K.sectors_swap () in
if initialize_disk then if initialize_disk then
Part.format block ~sectors_cache ~sectors_git >>= function Part.format block ~sectors_cache ~sectors_git ~sectors_swap >>= function
| Ok () -> | Ok () ->
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk."); Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
Lwt.return_unit Lwt.return_unit