Upgrade opam-mirror #1

Merged
reynir merged 4 commits from with-new-tar into main 2024-10-03 13:42:13 +00:00
3 changed files with 361 additions and 300 deletions

127
mirage/archive_checksum.ml Normal file
View file

@ -0,0 +1,127 @@
module Hash = struct
type t = [ `MD5 | `SHA1 | `SHA224 | `SHA256 | `SHA384 | `SHA512 ]
(* Make the compiler check that (t :> Digestif.hash') *)
let _ = fun (h :t) -> (h :> Digestif.hash')
let compare h h' =
match h, h' with
| `SHA512, `SHA512 -> 0
| `SHA512, _ -> 1
| _, `SHA512 -> -1
| `SHA384, `SHA384 -> 0
| `SHA384, _ -> 1
| _, `SHA384 -> -1
| `SHA256, `SHA256 -> 0
| `SHA256, _ -> 1
| _, `SHA256 -> -1
| `SHA224, `SHA224 -> 0
| `SHA224, _ -> 1
| _, `SHA224 -> -1
| `SHA1, `SHA1 -> 0
| `SHA1, `MD5 -> 1
| `MD5, `MD5 -> 0
| `MD5, _ -> -1
let to_string = function
| `MD5 -> "md5"
| `SHA1 -> "sha1"
| `SHA224 -> "sha224"
| `SHA256 -> "sha256"
| `SHA384 -> "sha384"
| `SHA512 -> "sha512"
let of_string = function
| "md5" -> Ok `MD5
| "sha256" -> Ok `SHA256
| "sha512" -> Ok `SHA512
| h -> Error (`Msg ("unknown hash algorithm: " ^ h))
end
module HM = Map.Make(Hash)
module Running_hash = struct
type _ t =
| MD5 : Digestif.MD5.ctx -> [> `MD5 ] t
| SHA1 : Digestif.SHA1.ctx -> [> `SHA1 ] t
| SHA224 : Digestif.SHA224.ctx -> [> `SHA224 ] t
| SHA256 : Digestif.SHA256.ctx -> [> `SHA256 ] t
| SHA384 : Digestif.SHA384.ctx -> [> `SHA384 ] t
| SHA512 : Digestif.SHA512.ctx -> [> `SHA512 ] t
let empty : _ -> _ t = function
| `MD5 -> MD5 Digestif.MD5.empty
| `SHA1 -> SHA1 Digestif.SHA1.empty
| `SHA224 -> SHA224 Digestif.SHA224.empty
| `SHA256 -> SHA256 Digestif.SHA256.empty
| `SHA384 -> SHA384 Digestif.SHA384.empty
| `SHA512 -> SHA512 Digestif.SHA512.empty
let feed_string t data =
match t with
| MD5 t -> MD5 (Digestif.MD5.feed_string t data)
| SHA1 t -> SHA1 (Digestif.SHA1.feed_string t data)
| SHA224 t -> SHA224 (Digestif.SHA224.feed_string t data)
| SHA256 t -> SHA256 (Digestif.SHA256.feed_string t data)
| SHA384 t -> SHA384 (Digestif.SHA384.feed_string t data)
| SHA512 t -> SHA512 (Digestif.SHA512.feed_string t data)
let get t =
match t with
| MD5 t -> Digestif.MD5.(to_raw_string (get t))
| SHA1 t -> Digestif.SHA1.(to_raw_string (get t))
| SHA224 t -> Digestif.SHA224.(to_raw_string (get t))
| SHA256 t -> Digestif.SHA256.(to_raw_string (get t))
| SHA384 t -> Digestif.SHA384.(to_raw_string (get t))
| SHA512 t -> Digestif.SHA512.(to_raw_string (get t))
let hash_alg t =
match t with
| MD5 _ -> `MD5
| SHA1 _ -> `SHA1
| SHA224 _ -> `SHA224
| SHA256 _ -> `SHA256
| SHA384 _ -> `SHA384
| SHA512 _ -> `SHA512
end
type 'a digests = {
md5 : Digestif.MD5.ctx;
sha256 : Digestif.SHA256.ctx;
sha512 : Digestif.SHA512.ctx;
csum : 'a Running_hash.t;
}
let empty_digests h =
let csum = Running_hash.empty h in
{
md5 = Digestif.MD5.empty;
sha256 = Digestif.SHA256.empty;
sha512 = Digestif.SHA512.empty;
csum;
}
let update_digests { md5; sha256; sha512; csum } data =
{
md5 = Digestif.MD5.feed_string md5 data;
sha256 = Digestif.SHA256.feed_string sha256 data;
sha512 = Digestif.SHA512.feed_string sha512 data;
csum = Running_hash.feed_string csum data;
}
let init_write csums =
let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests hash, `Init)
let digests_to_hm digests =
HM.empty
|> HM.add `MD5
Digestif.MD5.(to_raw_string (get digests.md5))
|> HM.add `SHA256
Digestif.SHA256.(to_raw_string (get digests.sha256))
|> HM.add `SHA512
Digestif.SHA512.(to_raw_string (get digests.sha512))
|> HM.add (Running_hash.hash_alg digests.csum)
(Running_hash.get digests.csum)

View file

@ -1,109 +1,63 @@
open Mirage
let check =
let doc =
Key.Arg.info ~doc:"Only check the cache" ["check"]
in
Key.(create "check" Arg.(flag doc))
let setup = runtime_arg ~pos:__POS__ "Unikernel.K.setup"
let verify_sha256 =
let doc =
Key.Arg.info ~doc:"Verify the SHA256 checksums of the cache contents, and \
re-build the other checksum caches."
["verify-sha256"]
in
Key.(create "verify-sha256" Arg.(flag doc))
let ssh_key =
Runtime_arg.create ~pos:__POS__
{|let open Cmdliner in
let doc = Arg.info ~doc:"The private SSH key (rsa:<seed> or ed25519:<b64-key>)." ["ssh-key"] in
Arg.(value & opt (some string) None doc)|}
let remote =
let doc =
Key.Arg.info
~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \
https://github.com/ocaml/opam-repository.git"
["remote"]
in
Key.(create "remote" Arg.(opt string "https://github.com/ocaml/opam-repository.git#master" doc))
let ssh_authenticator =
Runtime_arg.create ~pos:__POS__
{|let open Cmdliner in
let doc = Arg.info ~doc:"SSH authenticator." ["ssh-auth"] in
Arg.(value & opt (some string) None doc)|}
let parallel_downloads =
let doc =
Key.Arg.info
~doc:"Amount of parallel HTTP downloads"
["parallel-downloads"]
in
Key.(create "parallel-downloads" Arg.(opt int 20 doc))
let hook_url =
let doc =
Key.Arg.info
~doc:"URL to conduct an update of the git repository" ["hook-url"]
in
Key.(create "hook-url" Arg.(opt string "update" doc))
let port =
let doc = Key.Arg.info ~doc:"HTTP listen port." ["port"] in
Key.(create "port" Arg.(opt int 80 doc))
let ssh_password =
Runtime_arg.create ~pos:__POS__
{|let open Cmdliner in
let doc = Arg.info ~doc:"The private SSH password." [ "ssh-password" ] in
Arg.(value & opt (some string) None doc)|}
let tls_authenticator =
(* this will not look the same in the help printout *)
let doc = "TLS host authenticator. See git_http in lib/mirage/mirage.mli for a description of the format."
in
let doc = Key.Arg.info ~doc ["tls-authenticator"] in
Key.(create "tls-authenticator" Arg.(opt (some string) None doc))
let sectors_cache =
let doc = "Number of sectors reserved for each checksum cache (md5, sha512)." in
let doc = Key.Arg.info ~doc ["sectors-cache"] in
Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc))
let sectors_git =
let doc = "Number of sectors reserved for git dump." in
let doc = Key.Arg.info ~doc ["sectors-git"] in
Key.(create "sectors-git" Arg.(opt int64 Int64.(mul 40L (mul 2L 1024L)) doc))
let ignore_local_git =
let doc = "Ignore restoring locally saved git repository." in
let doc = Key.Arg.info ~doc ["ignore-local-git"] in
Key.(create "ignore-local-git" Arg.(flag doc))
Runtime_arg.create ~pos:__POS__
{|let open Cmdliner in
let doc = "TLS host authenticator. See git_http in lib/mirage/mirage.mli for a description of the format." in
let doc = Arg.info ~doc ["tls-authenticator"] in
Arg.(value & opt (some string) None doc)|}
let mirror =
foreign "Unikernel.Make"
~keys:[ Key.v check ; Key.v verify_sha256 ; Key.v remote ;
Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ;
Key.v port ; Key.v sectors_cache ; Key.v sectors_git ;
Key.v ignore_local_git ;
]
main "Unikernel.Make"
~runtime_args:[ setup ]
~packages:[
package ~min:"0.3.0" ~sublibs:[ "mirage" ] "paf" ;
package "h2" ;
package "hex" ;
package "ohex" ;
package "httpaf" ;
package "git-kv" ;
package ~max:"0.0.5" "git-kv" ;
package ~min:"3.10.0" "git-paf" ;
package "opam-file-format" ;
package ~min:"2.2.0" ~sublibs:[ "gz" ] "tar" ;
package ~min:"2.2.0" "tar-mirage" ;
package ~min:"3.0.0" ~sublibs:[ "gz" ] "tar" ;
package ~min:"3.0.0" "tar-mirage" ;
package ~max:"0.2.0" "mirage-block-partition" ;
package "oneffs" ;
package "digestif" ;
]
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> alpn_client @-> job)
let stack = generic_stackv4v6 default_network
let dns = generic_dns_client stack
let he = generic_happy_eyeballs stack
let dns = generic_dns_client stack he
let tcp = tcpv4v6_of_stackv4v6 stack
let block = block_of_file "tar"
let git_client, alpn_client =
let happy_eyeballs = generic_happy_eyeballs stack dns in
let git = mimic_happy_eyeballs stack dns happy_eyeballs in
merge_git_clients (git_tcp tcp git)
(git_http ~authenticator:tls_authenticator tcp git),
paf_client ~pclock:default_posix_clock tcp (mimic_happy_eyeballs stack dns happy_eyeballs)
let program_block_size =
let doc = Key.Arg.info [ "program-block-size" ] in
Key.(create "program_block_size" Arg.(opt int 16 doc))
let block = block_of_file "tar"
let git = mimic_happy_eyeballs stack he dns in
merge_git_clients (git_ssh ~key:ssh_key ~authenticator:ssh_authenticator ~password:ssh_password tcp git)
(merge_git_clients (git_tcp tcp git)
(git_http ~authenticator:tls_authenticator tcp git)),
paf_client tcp (mimic_happy_eyeballs stack he dns)
let () = register "mirror"
[ mirror $ block $ default_time $ default_posix_clock $ stack $ git_client $ alpn_client ]

View file

@ -2,6 +2,90 @@ open Lwt.Infix
let argument_error = 64
module K = struct
open Cmdliner
let check =
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
Arg.(value & flag doc)
let verify_sha256 =
let doc = Arg.info
~doc:"Verify the SHA256 checksums of the cache contents, and \
re-build the other checksum caches."
["verify-sha256"]
in
Arg.(value & flag doc)
let remote =
let doc = Arg.info
~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \
https://github.com/ocaml/opam-repository.git"
["remote"]
in
Arg.(value & opt string "https://github.com/ocaml/opam-repository.git#master" doc)
let parallel_downloads =
let doc = Arg.info
~doc:"Amount of parallel HTTP downloads"
["parallel-downloads"]
in
Arg.(value & opt int 20 doc)
let hook_url =
let doc = Arg.info
~doc:"URL to conduct an update of the git repository" ["hook-url"]
in
Arg.(value & opt string "update" doc)
let port =
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
Arg.(value & opt int 80 doc)
let sectors_cache =
let doc = "Number of sectors reserved for each checksum cache (md5, sha512)." in
let doc = Arg.info ~doc ["sectors-cache"] in
Arg.(value & opt int64 Int64.(mul 4L 2048L) doc)
let sectors_git =
let doc = "Number of sectors reserved for git dump." in
let doc = Arg.info ~doc ["sectors-git"] in
Arg.(value & opt int64 Int64.(mul 40L (mul 2L 1024L)) doc)
let ignore_local_git =
let doc = "Ignore restoring locally saved git repository." in
let doc = Arg.info ~doc ["ignore-local-git"] in
Arg.(value & flag doc)
type t =
{ check : bool
; verify_sha256 : bool
; remote : string
; parallel_downloads : int
; hook_url : string
; port : int
; sectors_cache : int64
; sectors_git : int64
; ignore_local_git : bool }
let v check verify_sha256 remote parallel_downloads hook_url port
sectors_cache sectors_git ignore_local_git =
{ check; verify_sha256; remote; parallel_downloads; hook_url; port
; sectors_cache; sectors_git; ignore_local_git }
let setup =
Term.(const v
$ check
$ verify_sha256
$ remote
$ parallel_downloads
$ hook_url
$ port
$ sectors_cache
$ sectors_git
$ ignore_local_git)
end
module Make
(BLOCK : Mirage_block.S)
(Time : Mirage_time.S)
@ -18,58 +102,24 @@ module Make
module SM = Map.Make(String)
module SSet = Set.Make(String)
let compare_hash h h' =
match h, h' with
| `SHA512, `SHA512 -> 0
| `SHA512, _ -> 1
| _, `SHA512 -> -1
| `SHA384, `SHA384 -> 0
| `SHA384, _ -> 1
| _, `SHA384 -> -1
| `SHA256, `SHA256 -> 0
| `SHA256, _ -> 1
| _, `SHA256 -> -1
| `SHA224, `SHA224 -> 0
| `SHA224, _ -> 1
| _, `SHA224 -> -1
| `SHA1, `SHA1 -> 0
| `SHA1, `MD5 -> 1
| `MD5, `MD5 -> 0
| `MD5, _ -> -1
let compare_hash = Archive_checksum.Hash.compare
module HM = Map.Make(struct
type t = Mirage_crypto.Hash.hash
let compare = compare_hash
end)
module HM = Archive_checksum.HM
let hash_to_string = function
| `MD5 -> "md5"
| `SHA1 -> "sha1"
| `SHA224 -> "sha224"
| `SHA256 -> "sha256"
| `SHA384 -> "sha384"
| `SHA512 -> "sha512"
let hash_to_string = Archive_checksum.Hash.to_string
let hash_of_string = function
| "md5" -> Ok `MD5
| "sha256" -> Ok `SHA256
| "sha512" -> Ok `SHA512
| h -> Error (`Msg ("unknown hash algorithm: " ^ h))
let hash_of_string = Archive_checksum.Hash.of_string
let hex_to_string h =
let `Hex h = Hex.of_string h in
h
let hex_to_key h = Mirage_kv.Key.v (hex_to_string h)
let hex_to_key h = Mirage_kv.Key.v (Ohex.encode h)
let hex_of_string s =
match Hex.to_string (`Hex s) with
match Ohex.decode s with
| d -> Ok d
| exception Invalid_argument err -> Error (`Msg err)
let hm_to_s hm =
HM.fold (fun h v acc ->
hash_to_string h ^ "=" ^ hex_to_string v ^ "\n" ^ acc)
hash_to_string h ^ "=" ^ Ohex.encode v ^ "\n" ^ acc)
hm ""
module Git = struct
@ -174,7 +224,7 @@ module Make
| Some v' when String.equal v v' -> None
| Some v' ->
Logs.warn (fun m -> m "for %s, hash %s, multiple keys are present: %s %s"
(Option.value ~default:"NONE" url) (hash_to_string h) (hex_to_string v) (hex_to_string v'));
(Option.value ~default:"NONE" url) (hash_to_string h) (Ohex.encode v) (Ohex.encode v'));
None)
acc
end
@ -247,9 +297,6 @@ module Make
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
let to_hex d =
let d = Cstruct.to_string d in
hex_to_string d
let marshal_sm (sm : string SM.t) =
let version = char_of_int 1 in
@ -343,74 +390,6 @@ module Make
end
*)
module Running_hash = struct
type _ t =
| MD5 : Mirage_crypto.Hash.MD5.t -> [> `MD5 ] t
| SHA1 : Mirage_crypto.Hash.SHA1.t -> [> `SHA1 ] t
| SHA224 : Mirage_crypto.Hash.SHA224.t -> [> `SHA224 ] t
| SHA256 : Mirage_crypto.Hash.SHA256.t -> [> `SHA256 ] t
| SHA384 : Mirage_crypto.Hash.SHA384.t -> [> `SHA384 ] t
| SHA512 : Mirage_crypto.Hash.SHA512.t -> [> `SHA512 ] t
let empty : _ -> _ t = function
| `MD5 -> MD5 Mirage_crypto.Hash.MD5.empty
| `SHA1 -> SHA1 Mirage_crypto.Hash.SHA1.empty
| `SHA224 -> SHA224 Mirage_crypto.Hash.SHA224.empty
| `SHA256 -> SHA256 Mirage_crypto.Hash.SHA256.empty
| `SHA384 -> SHA384 Mirage_crypto.Hash.SHA384.empty
| `SHA512 -> SHA512 Mirage_crypto.Hash.SHA512.empty
let feed t data =
let open Mirage_crypto.Hash in
match t with
| MD5 t -> MD5 (MD5.feed t data)
| SHA1 t -> SHA1 (SHA1.feed t data)
| SHA224 t -> SHA224 (SHA224.feed t data)
| SHA256 t -> SHA256 (SHA256.feed t data)
| SHA384 t -> SHA384 (SHA384.feed t data)
| SHA512 t -> SHA512 (SHA512.feed t data)
let get t =
let open Mirage_crypto.Hash in
match t with
| MD5 t -> MD5.get t
| SHA1 t -> SHA1.get t
| SHA224 t -> SHA224.get t
| SHA256 t -> SHA256.get t
| SHA384 t -> SHA384.get t
| SHA512 t -> SHA512.get t
end
type 'a digests = {
md5 : Mirage_crypto.Hash.MD5.t;
sha256 : Mirage_crypto.Hash.SHA256.t;
sha512 : Mirage_crypto.Hash.SHA512.t;
csum : 'a Running_hash.t;
}
let empty_digests h =
let open Mirage_crypto.Hash in
{
md5 = MD5.empty;
sha256 = SHA256.empty;
sha512 = SHA512.empty;
csum = Running_hash.empty h;
}
let update_digests { md5; sha256; sha512; csum } data =
let open Mirage_crypto.Hash in
let data = Cstruct.of_string data in
{
md5 = MD5.feed md5 data;
sha256 = SHA256.feed sha256 data;
sha512 = SHA512.feed sha512 data;
csum = Running_hash.feed csum data;
}
let init_write csums =
let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests hash, `Init)
let content_length_of_string s =
match Int64.of_string s with
| len when len >= 0L -> `Fixed len
@ -440,7 +419,7 @@ module Make
(* We can't use hex because the filename would become too long for tar *)
Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
| _ ->
Mirage_kv.Key.(pending / hash_to_string hash / hex_to_string csum)
Mirage_kv.Key.(pending / hash_to_string hash / Ohex.encode csum)
let to_delete_key (hash, csum) =
let rand = "random" in (* FIXME: generate random string *)
@ -450,7 +429,7 @@ module Make
(* We can't use hex because the filename would become too long for tar *)
Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum
| _ ->
hex_to_string csum
Ohex.encode csum
in
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
@ -460,7 +439,7 @@ module Make
let ( >>>= ) = Lwt_result.bind in
fun response r data ->
Lwt.return r >>>= fun (digests, acc) ->
let digests = update_digests digests data in
let digests = Archive_checksum.update_digests digests data in
match acc with
| `Init ->
begin match body_length response with
@ -486,17 +465,8 @@ module Make
| `Unknown body ->
Lwt.return_ok (digests, `Unknown (body ^ data))
let digests_to_hm digests =
HM.empty
|> HM.add `MD5
(Cstruct.to_string (Mirage_crypto.Hash.MD5.get digests.md5))
|> HM.add `SHA256
(Cstruct.to_string (Mirage_crypto.Hash.SHA256.get digests.sha256))
|> HM.add `SHA512
(Cstruct.to_string (Mirage_crypto.Hash.SHA512.get digests.sha512))
let check_csums_digests csums digests =
let csums' = digests_to_hm digests in
let csums' = Archive_checksum.digests_to_hm digests in
let common_bindings = List.filter (fun (h, _) -> HM.mem h csums) (HM.bindings csums') in
List.length common_bindings > 0 &&
List.for_all
@ -512,9 +482,9 @@ module Make
in
let source = pending_key (hash, csum) in
if check_csums_digests csums digests && sizes_match then
let sha256 = to_hex (Mirage_crypto.Hash.SHA256.get digests.sha256)
and md5 = to_hex (Mirage_crypto.Hash.MD5.get digests.md5)
and sha512 = to_hex (Mirage_crypto.Hash.SHA512.get digests.sha512) in
let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256))
and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5))
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
let dest = Mirage_kv.Key.v sha256 in
begin match body with
| `Unknown body ->
@ -533,7 +503,7 @@ module Make
else begin
(if sizes_match then
Logs.err (fun m -> m "Bad checksum %s: computed %s expected %s" url
(hash_to_string hash) (hex_to_string csum))
(hash_to_string hash) (Ohex.encode csum))
else match body with
| `Fixed_body (reported, actual) ->
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
@ -597,15 +567,15 @@ module Make
Logs.warn (fun m -> m "unexpected dictionary at %a" Mirage_kv.Key.pp path);
Lwt.return_unit
| `Value ->
let open Mirage_crypto.Hash in
let open Digestif in
let sha256_final =
if verify_sha256 then
let f s =
let digest = SHA256.get s in
if not (String.equal (Mirage_kv.Key.basename path) (to_hex digest)) then
let digest = SHA256.(to_raw_string (get s)) in
if not (String.equal (Mirage_kv.Key.basename path) (Ohex.encode digest)) then
Logs.err (fun m -> m "corrupt SHA256 data for %a, \
computed %s (should remove)"
Mirage_kv.Key.pp path (to_hex digest))
Mirage_kv.Key.pp path (Ohex.encode digest))
in
Some f
else
@ -613,8 +583,8 @@ module Make
and md5_final =
if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then
let f s =
let digest = MD5.get s in
t.md5s <- SM.add (to_hex digest) (Mirage_kv.Key.basename path) t.md5s
let digest = MD5.(to_raw_string (get s)) in
t.md5s <- SM.add (Ohex.encode digest) (Mirage_kv.Key.basename path) t.md5s
in
Some f
else
@ -622,8 +592,8 @@ module Make
and sha512_final =
if not (SSet.mem (Mirage_kv.Key.basename path) sha512s) then
let f s =
let digest = SHA512.get s in
t.sha512s <- SM.add (to_hex digest) (Mirage_kv.Key.basename path) t.sha512s
let digest = SHA512.(to_raw_string (get s)) in
t.sha512s <- SM.add (Ohex.encode digest) (Mirage_kv.Key.basename path) t.sha512s
in
Some f
else
@ -634,11 +604,10 @@ module Make
| _ ->
read_chunked t `SHA256 path
(fun (sha256, md5, sha512) data ->
let cs = Cstruct.of_string data in
Lwt.return
(Option.map (fun t -> SHA256.feed t cs) sha256,
Option.map (fun t -> MD5.feed t cs) md5,
Option.map (fun t -> SHA512.feed t cs) sha512))
(Option.map (fun t -> SHA256.feed_string t data) sha256,
Option.map (fun t -> MD5.feed_string t data) md5,
Option.map (fun t -> SHA512.feed_string t data) sha512))
(Option.map (fun _ -> SHA256.empty) sha256_final,
Option.map (fun _ -> MD5.empty) md5_final,
Option.map (fun _ -> SHA512.empty) sha512_final) >|= function
@ -694,62 +663,73 @@ module Make
end
module Tarball = struct
module Async = struct
type 'a t = 'a
let ( >>= ) x f = f x
let return x = x
module High : sig
type t
type 'a s = 'a Lwt.t
external inj : 'a s -> ('a, t) Tar.io = "%identity"
external prj : ('a, t) Tar.io -> 'a s = "%identity"
end = struct
type t
type 'a s = 'a Lwt.t
external inj : 'a -> 'b = "%identity"
external prj : 'a -> 'b = "%identity"
end
module Writer = struct
type out_channel = Buffer.t
type 'a t = 'a
let really_write buf data =
Buffer.add_string buf (Cstruct.to_string data)
end
let to_buffer buf t =
let rec run : type a. (a, [> `Msg of string ] as 'err, High.t) Tar.t -> (a, 'err) result Lwt.t
= function
| Tar.Write str ->
Buffer.add_string buf str;
Lwt.return_ok ()
| Tar.Read _ -> assert false
| Tar.Really_read _ -> assert false
| Tar.Seek _ -> assert false
| Tar.Return value -> Lwt.return value
| Tar.High value -> High.prj value
| Tar.Bind (x, f) ->
let open Lwt_result.Infix in
run x >>= fun value -> run (f value) in
run t
(* That's not very interesting here, we just ignore everything*)
module Reader = struct
type in_channel = unit
type 'a t = 'a
let really_read _in _data = ()
let skip _in _len = ()
let read _in _data = 0
end
let once data =
let closed = ref false in
fun () -> if !closed
then Tar.High (High.inj (Lwt.return_ok None))
else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end
module Tar_Gz = Tar_gz.Make (Async)(Writer)(Reader)
let of_git repo store =
let out_channel = Buffer.create 1024 in
let now = Ptime.v (Pclock.now_d_ps ()) in
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
let gz_out =
Tar_Gz.of_out_channel ~level:4 ~mtime:(Int32.of_int mtime)
Gz.Unix out_channel
in
let entries_of_git ~mtime store repo =
Git.find_contents store >>= fun paths ->
Lwt_list.iter_s (fun path ->
let entries = Lwt_stream.of_list paths in
let to_entry path =
Store.get store path >|= function
| Ok data ->
let data =
if Mirage_kv.Key.(equal path (v "repo")) then repo else data
in
let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *)
if Mirage_kv.Key.(equal path (v "repo"))
then repo else data in
let file_mode = 0o644
and mod_time = Int64.of_int mtime
and user_id = 0
and group_id = 0
and size = String.length data
in
let hdr =
Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
(Mirage_kv.Key.to_string path) (Int64.of_int size)
in
let o = ref false in
let stream () = if !o then None else (o := true; Some data) in
Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream
| Error e -> Logs.warn (fun m -> m "Store error: %a" Store.pp_error e))
paths >|= fun () ->
Tar_Gz.write_end gz_out;
Buffer.contents out_channel
and size = String.length data in
let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
(Mirage_kv.Key.to_string path) (Int64.of_int size) in
Some (None, hdr, once data)
| Error _ -> None in
let entries = Lwt_stream.filter_map_s to_entry entries in
Lwt.return begin fun () -> Tar.High (High.inj (Lwt_stream.get entries >|= Result.ok)) end
let of_git repo store =
let now = Ptime.v (Pclock.now_d_ps ()) in
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
entries_of_git ~mtime store repo >>= fun entries ->
let t = Tar.out entries in
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
let buf = Buffer.create 1024 in
to_buffer buf t >|= function
| Ok () -> Buffer.contents buf
| Error (`Msg msg) -> failwith msg
end
module Serve = struct
@ -772,8 +752,8 @@ module Make
Logs.err (fun m -> m "%a" Store.pp_error e);
exit 2)
let repo commit =
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
let repo remote commit =
let upstream = List.hd (String.split_on_char '#' remote) in
Fmt.str
{|opam-version: "2.0"
upstream: "%s#%s"
@ -797,16 +777,16 @@ stamp: %S
mutable index : string ;
}
let create git_kv =
let create remote git_kv =
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
let repo = repo commit_id in
let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index ->
{ commit_id ; modified ; repo ; index }
let update_lock = Lwt_mutex.create ()
let update_git t git_kv =
let update_git ~remote t git_kv =
Lwt_mutex.with_lock update_lock (fun () ->
Logs.info (fun m -> m "pulling the git repository");
Git_kv.pull git_kv >>= function
@ -820,7 +800,7 @@ stamp: %S
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
Logs.info (fun m -> m "git: %s" commit_id);
let repo = repo commit_id in
let repo = repo remote commit_id in
Tarball.of_git repo git_kv >|= fun index ->
t.commit_id <- commit_id ;
t.modified <- modified ;
@ -967,11 +947,11 @@ stamp: %S
let bad_archives = SSet.of_list Bad.archives
let download_archives disk http_client store =
let download_archives parallel_downloads disk http_client store =
(* FIXME: handle resuming partial downloads *)
Git.find_urls store >>= fun urls ->
let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in
let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () ->
@ -988,7 +968,7 @@ stamp: %S
incr idx;
if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url);
let quux, body_init = Disk.init_write csums in
let quux, body_init = Archive_checksum.init_write csums in
Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
| Ok (resp, r) ->
begin match r with
@ -1016,14 +996,14 @@ stamp: %S
| Error e ->
Logs.warn (fun m -> m "failed to dump git: %a" Cache.pp_write_error e)
let restore_git git_dump git_ctx =
let restore_git ~remote git_dump git_ctx =
Cache.read git_dump >>= function
| Ok None -> Lwt.return (Error ())
| Error e ->
Logs.warn (fun m -> m "failed to read git state: %a" Cache.pp_error e);
Lwt.return (Error ())
| Ok Some data ->
Git_kv.of_octets git_ctx ~remote:(Key_gen.remote ()) data >|= function
Git_kv.of_octets git_ctx ~remote data >|= function
| Ok git_kv -> Ok git_kv
| Error `Msg msg ->
Logs.err (fun m -> m "error restoring git state: %s" msg);
@ -1031,10 +1011,10 @@ stamp: %S
module Paf = Paf_mirage.Make(Stack.TCP)
let start block _time _pclock stack git_ctx http_ctx =
let start block _time _pclock stack git_ctx http_ctx
{ K.check; verify_sha256; remote; parallel_downloads; hook_url
; port; sectors_cache; sectors_git; ignore_local_git } =
BLOCK.get_info block >>= fun info ->
let sectors_cache = Key_gen.sectors_cache () in
let sectors_git = Key_gen.sectors_git () in
let git_start =
let cache_size = Int64.(mul 2L sectors_cache) in
Int64.(sub info.size_sectors (add cache_size sectors_git))
@ -1047,41 +1027,41 @@ stamp: %S
Cache.connect sha512s >>= fun sha512s ->
Cache.connect git_dump >>= fun git_dump ->
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
Disk.init ~verify_sha256:(Key_gen.verify_sha256 ()) kv md5s sha512s >>= fun disk ->
if Key_gen.check () then
Disk.init ~verify_sha256 kv md5s sha512s >>= fun disk ->
if check then
Lwt.return_unit
else
begin
Logs.info (fun m -> m "Initializing git state. This may take a while...");
(if Key_gen.ignore_local_git () then
(if ignore_local_git then
Lwt.return (Error ())
else
restore_git git_dump git_ctx) >>= function
restore_git ~remote git_dump git_ctx) >>= function
| Ok git_kv -> Lwt.return git_kv
| Error () ->
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
Git_kv.connect git_ctx remote >>= fun git_kv ->
dump_git git_dump git_kv >|= fun () ->
git_kv
end >>= fun git_kv ->
Logs.info (fun m -> m "Done initializing git state!");
Serve.commit_id git_kv >>= fun commit_id ->
Logs.info (fun m -> m "git: %s" commit_id);
Serve.create git_kv >>= fun serve ->
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
Serve.create remote git_kv >>= fun serve ->
Paf.init ~port (Stack.tcp stack) >>= fun t ->
let update () =
Serve.update_git serve git_kv >>= function
Serve.update_git ~remote serve git_kv >>= function
| None | Some [] -> Lwt.return_unit
| Some _changes ->
dump_git git_dump git_kv >>= fun () ->
download_archives disk http_ctx git_kv
download_archives parallel_downloads disk http_ctx git_kv
in
let service =
Paf.http_service
~error_handler:(fun _ ?request:_ _ _ -> ())
(Serve.dispatch serve disk (Key_gen.hook_url ()) update)
(Serve.dispatch serve disk hook_url update)
in
let `Initialized th = Paf.serve service t in
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
Logs.info (fun f -> f "listening on %d/HTTP" port);
Lwt.async (fun () ->
let rec go () =
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
@ -1089,6 +1069,6 @@ stamp: %S
go ()
in
go ());
download_archives disk http_ctx git_kv >>= fun () ->
download_archives parallel_downloads disk http_ctx git_kv >>= fun () ->
(th >|= fun _v -> ())
end