1005 lines
38 KiB
OCaml
1005 lines
38 KiB
OCaml
open Lwt.Infix
|
|
|
|
let argument_error = 64
|
|
|
|
module K = struct
|
|
open Cmdliner
|
|
|
|
let check =
|
|
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
|
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
|
|
|
let verify_sha256 =
|
|
let doc = Arg.info
|
|
~doc:"Verify the SHA256 checksums of the cache contents, and \
|
|
re-build the other checksum caches."
|
|
["verify-sha256"]
|
|
in
|
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
|
|
|
let remote =
|
|
let doc = Arg.info
|
|
~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \
|
|
https://github.com/ocaml/opam-repository.git"
|
|
["remote"]
|
|
in
|
|
Mirage_runtime.register_arg
|
|
Arg.(value & opt string "https://github.com/ocaml/opam-repository.git#master" doc)
|
|
|
|
let parallel_downloads =
|
|
let doc = Arg.info
|
|
~doc:"Amount of parallel HTTP downloads"
|
|
["parallel-downloads"]
|
|
in
|
|
Mirage_runtime.register_arg Arg.(value & opt int 20 doc)
|
|
|
|
let hook_url =
|
|
let doc = Arg.info
|
|
~doc:"URL to conduct an update of the git repository" ["hook-url"]
|
|
in
|
|
Mirage_runtime.register_arg Arg.(value & opt string "update" doc)
|
|
|
|
let port =
|
|
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
|
|
Mirage_runtime.register_arg Arg.(value & opt int 80 doc)
|
|
|
|
let sectors_cache =
|
|
let doc = "Number of sectors reserved for each checksum cache (md5, sha512). Only used with --initialize-disk." in
|
|
let doc = Arg.info ~doc ["sectors-cache"] in
|
|
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 4L 2048L) doc)
|
|
|
|
let sectors_git =
|
|
let doc = "Number of sectors reserved for git dump. Only used with --initialize-disk" in
|
|
let doc = Arg.info ~doc ["sectors-git"] in
|
|
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 40L (mul 2L 1024L)) doc)
|
|
|
|
let sectors_swap =
|
|
let doc = "Number of sectors reserved for swap. Only used with --initialize-disk" in
|
|
let doc = Arg.info ~doc ["sectors-swap"] in
|
|
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 1024L 2048L) doc)
|
|
|
|
let initialize_disk =
|
|
let doc = "Initialize the disk with a partition table. THIS IS DESTRUCTIVE!" in
|
|
let doc = Arg.info ~doc ["initialize-disk"] in
|
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
|
|
|
let ignore_local_git =
|
|
let doc = "Ignore restoring locally saved git repository." in
|
|
let doc = Arg.info ~doc ["ignore-local-git"] in
|
|
Mirage_runtime.register_arg Arg.(value & flag doc)
|
|
end
|
|
|
|
module Make
|
|
(BLOCK : Mirage_block.S)
|
|
(Time : Mirage_time.S)
|
|
(Pclock : Mirage_clock.PCLOCK)
|
|
(Stack : Tcpip.Stack.V4V6)
|
|
(_ : sig end)
|
|
(HTTP : Http_mirage_client.S) = struct
|
|
|
|
module Part = Partitions.Make(BLOCK)
|
|
module KV = Tar_mirage.Make_KV_RW(Pclock)(Part)
|
|
module Cache = OneFFS.Make(Part)
|
|
module Swap = Swapfs.Make(Part)
|
|
module Store = Git_kv.Make(Pclock)
|
|
|
|
module SM = Map.Make(String)
|
|
module SSet = Set.Make(String)
|
|
|
|
let compare_hash = Archive_checksum.Hash.compare
|
|
|
|
module HM = Archive_checksum.HM
|
|
|
|
let hash_to_string = Archive_checksum.Hash.to_string
|
|
|
|
let hash_of_string = Archive_checksum.Hash.of_string
|
|
|
|
let hex_to_key h = Mirage_kv.Key.v (Ohex.encode h)
|
|
|
|
let hm_to_s hm =
|
|
HM.fold (fun h v acc ->
|
|
hash_to_string h ^ "=" ^ Ohex.encode v ^ "\n" ^ acc)
|
|
hm ""
|
|
|
|
let parse_errors = ref SM.empty
|
|
|
|
let reset_parse_errors () = parse_errors := SM.empty
|
|
|
|
let add_parse_error filename error =
|
|
parse_errors := SM.add filename error !parse_errors
|
|
|
|
module Git = struct
|
|
let contents store =
|
|
let explore = ref [ Mirage_kv.Key.empty ] in
|
|
let more () =
|
|
let rec go () =
|
|
match !explore with
|
|
| [] -> Lwt.return None
|
|
| step :: tl ->
|
|
explore := tl;
|
|
Store.exists store step >>= function
|
|
| Error e -> go ()
|
|
| Ok None -> go ()
|
|
| Ok Some `Value -> Lwt.return (Some step)
|
|
| Ok Some `Dictionary ->
|
|
Store.list store step >>= function
|
|
| Error e -> go ()
|
|
| Ok steps ->
|
|
explore := List.map fst steps @ !explore;
|
|
go ()
|
|
in
|
|
go ()
|
|
in
|
|
Lwt_stream.from more
|
|
|
|
let find_urls acc path data =
|
|
if Mirage_kv.Key.basename path = "opam" then
|
|
let path = Mirage_kv.Key.to_string path in
|
|
let url_csums, errs = Opam_file.extract_urls path data in
|
|
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
|
|
List.fold_left (fun acc (url, csums) ->
|
|
if HM.cardinal csums = 0 then
|
|
(add_parse_error path ("no checksums for " ^ url);
|
|
acc)
|
|
else
|
|
SM.update url (function
|
|
| None -> Some csums
|
|
| Some csums' ->
|
|
if HM.for_all (fun h v ->
|
|
match HM.find_opt h csums with
|
|
| None -> true | Some v' -> String.equal v v')
|
|
csums'
|
|
then
|
|
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
|
else begin
|
|
add_parse_error path (Fmt.str
|
|
"mismatching hashes for %s: %s vs %s"
|
|
url (hm_to_s csums') (hm_to_s csums));
|
|
None
|
|
end) acc) acc url_csums
|
|
else
|
|
acc
|
|
|
|
end
|
|
|
|
let active_downloads = ref SM.empty
|
|
|
|
let add_to_active url ts =
|
|
active_downloads := SM.add url (ts, 0) !active_downloads
|
|
|
|
let remove_active url =
|
|
active_downloads := SM.remove url !active_downloads
|
|
|
|
let active_add_bytes url written =
|
|
match SM.find_opt url !active_downloads with
|
|
| None -> ()
|
|
| Some (ts, written') ->
|
|
active_downloads := SM.add url (ts, written + written')
|
|
!active_downloads
|
|
|
|
let failed_downloads = ref SM.empty
|
|
|
|
let reset_failed_downloads () = failed_downloads := SM.empty
|
|
|
|
let add_failed url ts reason =
|
|
remove_active url;
|
|
failed_downloads := SM.add url (ts, reason) !failed_downloads
|
|
|
|
let pp_failed ppf = function
|
|
| `Write_error e ->
|
|
KV.pp_write_error ppf e
|
|
| `Swap e ->
|
|
Swap.pp_error ppf e
|
|
| `Bad_checksum (hash, computed, expected) ->
|
|
Fmt.pf ppf "%s checksum: computed %s expected %s"
|
|
(hash_to_string hash)
|
|
(Ohex.encode computed)
|
|
(Ohex.encode expected)
|
|
| `Bad_response (status, reason) ->
|
|
Fmt.pf ppf "%a %s" H2.Status.pp_hum status reason
|
|
| `Mimic me ->
|
|
Mimic.pp_error ppf me
|
|
|
|
let key_of_failed = function
|
|
| `Write_error _ -> `Write_error
|
|
| `Swap _ -> `Swap
|
|
| `Bad_checksum _ -> `Bad_checksum
|
|
| `Bad_response _ -> `Bad_response
|
|
| `Mimic _ -> `Mimic
|
|
|
|
let compare_failed_key a b = match a, b with
|
|
| `Write_error, `Write_error -> 0
|
|
| `Write_error, _ -> -1
|
|
| _, `Write_error -> 1
|
|
| `Swap, `Swap -> 0
|
|
| `Swap, _ -> -1
|
|
| _, `Swap -> 1
|
|
| `Bad_checksum, `Bad_checksum -> 0
|
|
| `Bad_checksum, _ -> -1
|
|
| _, `Bad_checksum -> 1
|
|
| `Bad_response, `Bad_response -> 0
|
|
| `Bad_response, _ -> -1
|
|
| _, `Bad_response -> 1
|
|
| `Mimic, `Mimic -> 0
|
|
|
|
let pp_key ppf = function
|
|
| `Write_error -> Fmt.pf ppf "Write error"
|
|
| `Swap -> Fmt.pf ppf "Swap error"
|
|
| `Bad_checksum -> Fmt.pf ppf "Bad checksum"
|
|
| `Bad_response -> Fmt.pf ppf "Bad response"
|
|
| `Mimic -> Fmt.pf ppf "Mimic"
|
|
|
|
module Disk = struct
|
|
type t = {
|
|
mutable md5s : string SM.t ;
|
|
mutable sha512s : string SM.t ;
|
|
dev : KV.t ;
|
|
dev_md5s : Cache.t ;
|
|
dev_sha512s : Cache.t ;
|
|
dev_swap : Swap.t ;
|
|
}
|
|
|
|
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
|
|
|
let marshal_sm (sm : string SM.t) =
|
|
let version = char_of_int 1 in
|
|
String.make 1 version ^ Marshal.to_string sm []
|
|
|
|
let unmarshal_sm s =
|
|
let version = int_of_char s.[0] in
|
|
match version with
|
|
| 1 -> Ok (Marshal.from_string s 1 : string SM.t)
|
|
| _ -> Error ("Unsupported version " ^ string_of_int version)
|
|
|
|
let update_caches t =
|
|
Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r ->
|
|
(match r with
|
|
| Ok () -> ()
|
|
| Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e));
|
|
Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r ->
|
|
match r with
|
|
| Ok () -> Lwt.return_unit
|
|
| Error e ->
|
|
Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e);
|
|
Lwt.return_unit
|
|
|
|
let find_key t h key =
|
|
assert (List.length (Mirage_kv.Key.segments key) = 1);
|
|
match
|
|
match h with
|
|
| `MD5 ->
|
|
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
|
| `SHA512 ->
|
|
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
|
| `SHA256 -> Some key
|
|
| _ -> None
|
|
with
|
|
| None -> Error `Not_found
|
|
| Some x -> Ok x
|
|
|
|
let read_chunked t h v f a =
|
|
match find_key t h v with
|
|
| Error `Not_found ->
|
|
Lwt.return (Error (`Not_found v))
|
|
| Ok key ->
|
|
KV.size t.dev key >>= function
|
|
| Error e ->
|
|
Lwt.return (Error (`Not_found key))
|
|
| Ok len ->
|
|
let chunk_size = 4096 in
|
|
let rec read_more a offset =
|
|
if offset < len then
|
|
KV.get_partial t.dev key ~offset ~length:chunk_size >>= function
|
|
| Ok data ->
|
|
f a data >>= fun a ->
|
|
read_more a Optint.Int63.(add offset (of_int chunk_size))
|
|
| Error e ->
|
|
Lwt.return (Error e)
|
|
else
|
|
Lwt.return (Ok a)
|
|
in
|
|
read_more a Optint.Int63.zero
|
|
|
|
let init_write t csums =
|
|
let quux, csums = Archive_checksum.init_write csums in
|
|
let swap = Swap.empty t.dev_swap in
|
|
quux, Ok (csums, swap)
|
|
|
|
let write_partial t (hash, csum) url =
|
|
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
|
let ( >>>= ) = Lwt_result.bind in
|
|
fun response r data ->
|
|
if Http_mirage_client.Status.is_successful response.Http_mirage_client.status then
|
|
Lwt.return r >>>= fun (digests, swap) ->
|
|
let digests = Archive_checksum.update_digests digests data in
|
|
active_add_bytes url (String.length data);
|
|
Swap.append swap data >|= function
|
|
| Ok () -> Ok (digests, swap)
|
|
| Error swap_err -> Error (`Swap swap_err)
|
|
else
|
|
Lwt.return (Error `Bad_response)
|
|
|
|
let check_csums_digests csums digests =
|
|
let csums' = Archive_checksum.digests_to_hm digests in
|
|
let common_bindings = List.filter (fun (h, _) -> HM.mem h csums) (HM.bindings csums') in
|
|
List.length common_bindings > 0 &&
|
|
List.for_all
|
|
(fun (h, csum) -> String.equal csum (HM.find h csums))
|
|
common_bindings
|
|
|
|
let set_from_handle dev dest h =
|
|
(* TODO: we need a function in tar which
|
|
(a) takes a path
|
|
(b) takes a function that reads (from the swap) and writes (to the tar)
|
|
(c) once the function is finished, it writes the tar header
|
|
-> this would allow us to avoid the rename stuff below
|
|
*)
|
|
let size = Optint.Int63.of_int64 (Swap.size h) in
|
|
KV.allocate dev dest size >>= fun r ->
|
|
let rec loop offset =
|
|
if offset = Swap.size h then
|
|
Lwt.return_ok ()
|
|
else
|
|
let length = Int64.(to_int (min 4096L (sub (Swap.size h) offset))) in
|
|
Swap.get_partial h ~offset ~length >>= fun r ->
|
|
match r with
|
|
| Error e -> Lwt.return (Error (`Swap e))
|
|
| Ok data ->
|
|
KV.set_partial dev dest ~offset:(Optint.Int63.of_int64 offset) data
|
|
>>= fun r ->
|
|
match r with
|
|
| Error e -> Lwt.return (Error (`Write_error e))
|
|
| Ok () ->
|
|
loop Int64.(add offset (of_int length))
|
|
in
|
|
match r with
|
|
| Ok () ->
|
|
loop 0L
|
|
| Error e ->
|
|
Lwt.return (Error (`Write_error e))
|
|
|
|
let finalize_write t (hash, csum) ~url swap csums digests =
|
|
if check_csums_digests csums digests then
|
|
let sha256 = Ohex.encode Digestif.SHA256.(to_raw_string (get digests.sha256))
|
|
and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5))
|
|
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
|
|
let dest = Mirage_kv.Key.v sha256 in
|
|
let temp = Mirage_kv.Key.(v "pending" // dest) in
|
|
Lwt_result.bind
|
|
(Lwt.finalize (fun () -> set_from_handle t.dev temp swap)
|
|
(fun () -> Swap.free swap))
|
|
(fun () -> KV.rename t.dev ~source:temp ~dest
|
|
|> Lwt_result.map_error (fun e -> `Write_error e))
|
|
>|= function
|
|
| Ok () ->
|
|
remove_active url;
|
|
t.md5s <- SM.add md5 sha256 t.md5s;
|
|
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
|
| Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
|
|
| Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
|
|
else begin
|
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
(`Bad_checksum (hash, Archive_checksum.get digests hash, csum));
|
|
Lwt.return_unit
|
|
end
|
|
|
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
|
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
|
|
KV.list dev Mirage_kv.Key.empty >>= function
|
|
| Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e)
|
|
| Ok entries ->
|
|
let t = empty dev dev_md5s dev_sha512s dev_swap in
|
|
Cache.read t.dev_md5s >>= fun r ->
|
|
(match r with
|
|
| Ok Some s ->
|
|
if not verify_sha256 then
|
|
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
|
| Ok None -> ()
|
|
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
|
Cache.read t.dev_sha512s >>= fun r ->
|
|
(match r with
|
|
| Ok Some s ->
|
|
if not verify_sha256 then
|
|
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
|
| Ok None -> ()
|
|
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
|
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
|
|
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
|
|
Lwt_list.iter_s (fun (path, typ) ->
|
|
match typ with
|
|
| `Dictionary -> Lwt.return_unit
|
|
| `Value ->
|
|
let open Digestif in
|
|
let md5_final =
|
|
if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then
|
|
let f s =
|
|
let digest = MD5.(to_raw_string (get s)) in
|
|
t.md5s <- SM.add (Ohex.encode digest) (Mirage_kv.Key.basename path) t.md5s
|
|
in
|
|
Some f
|
|
else
|
|
None
|
|
and sha512_final =
|
|
if not (SSet.mem (Mirage_kv.Key.basename path) sha512s) then
|
|
let f s =
|
|
let digest = SHA512.(to_raw_string (get s)) in
|
|
t.sha512s <- SM.add (Ohex.encode digest) (Mirage_kv.Key.basename path) t.sha512s
|
|
in
|
|
Some f
|
|
else
|
|
None
|
|
in
|
|
let sha256_final =
|
|
let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in
|
|
if need_to_compute then
|
|
let f s =
|
|
let digest = SHA256.(to_raw_string (get s)) in
|
|
if not (String.equal (Mirage_kv.Key.basename path) (Ohex.encode digest)) then
|
|
begin
|
|
Logs.err (fun m -> m "corrupt SHA256 data for %a, \
|
|
computed %s (will rename)"
|
|
Mirage_kv.Key.pp path (Ohex.encode digest));
|
|
false
|
|
end else true
|
|
in
|
|
Some f
|
|
else
|
|
None
|
|
in
|
|
match sha256_final with
|
|
| None -> Lwt.return_unit
|
|
| Some f ->
|
|
read_chunked t `SHA256 path
|
|
(fun (sha256, md5, sha512) data ->
|
|
Lwt.return
|
|
(SHA256.feed_string sha256 data,
|
|
Option.map (fun t -> MD5.feed_string t data) md5,
|
|
Option.map (fun t -> SHA512.feed_string t data) sha512))
|
|
(SHA256.empty,
|
|
Option.map (fun _ -> MD5.empty) md5_final,
|
|
Option.map (fun _ -> SHA512.empty) sha512_final) >>= function
|
|
| Error e ->
|
|
Logs.err (fun m -> m "error %a of %a while computing digests"
|
|
KV.pp_error e Mirage_kv.Key.pp path);
|
|
Lwt.return_unit
|
|
| Ok (sha256, md5, sha512) ->
|
|
if not (f sha256) then
|
|
(* bad sha256! *)
|
|
KV.rename t.dev ~source:path ~dest:(Mirage_kv.Key.(v "delete" // path)) >|= function
|
|
| Ok () -> ()
|
|
| Error we ->
|
|
Logs.err (fun m -> m "error %a while renaming %a" KV.pp_write_error we
|
|
Mirage_kv.Key.pp path)
|
|
else begin
|
|
Option.iter (fun f -> f (Option.get md5)) md5_final;
|
|
Option.iter (fun f -> f (Option.get sha512)) sha512_final;
|
|
Lwt.return_unit
|
|
end)
|
|
entries >>= fun () ->
|
|
update_caches t >|= fun () ->
|
|
t
|
|
|
|
let exists t h v =
|
|
match find_key t h v with
|
|
| Error _ -> Lwt.return false
|
|
| Ok x ->
|
|
KV.exists t.dev x >|= function
|
|
| Ok Some `Value -> true
|
|
| Ok Some `Dictionary -> false
|
|
| Ok None -> false
|
|
| Error _ -> false
|
|
|
|
let last_modified t h v =
|
|
match find_key t h v with
|
|
| Error _ as e -> Lwt.return e
|
|
| Ok x ->
|
|
KV.last_modified t.dev x >|= function
|
|
| Ok data -> Ok data
|
|
| Error _ -> Error `Not_found
|
|
|
|
let size t h v =
|
|
match find_key t h v with
|
|
| Error _ as e -> Lwt.return e
|
|
| Ok x ->
|
|
KV.size t.dev x >|= function
|
|
| Ok s -> Ok s
|
|
| Error _ -> Error `Not_found
|
|
end
|
|
|
|
module Tarball = struct
|
|
module High : sig
|
|
type t
|
|
type 'a s = 'a Lwt.t
|
|
|
|
external inj : 'a s -> ('a, t) Tar.io = "%identity"
|
|
external prj : ('a, t) Tar.io -> 'a s = "%identity"
|
|
end = struct
|
|
type t
|
|
type 'a s = 'a Lwt.t
|
|
|
|
external inj : 'a -> 'b = "%identity"
|
|
external prj : 'a -> 'b = "%identity"
|
|
end
|
|
|
|
let to_buffer buf t =
|
|
let rec run : type a. (a, [> `Msg of string ] as 'err, High.t) Tar.t -> (a, 'err) result Lwt.t
|
|
= function
|
|
| Tar.Write str ->
|
|
Buffer.add_string buf str;
|
|
Lwt.return_ok ()
|
|
| Tar.Read _ -> assert false
|
|
| Tar.Really_read _ -> assert false
|
|
| Tar.Seek _ -> assert false
|
|
| Tar.Return value -> Lwt.return value
|
|
| Tar.High value -> High.prj value
|
|
| Tar.Bind (x, f) ->
|
|
let open Lwt_result.Infix in
|
|
run x >>= fun value -> run (f value) in
|
|
run t
|
|
|
|
let once data =
|
|
let closed = ref false in
|
|
fun () -> if !closed
|
|
then Tar.High (High.inj (Lwt.return_ok None))
|
|
else begin closed := true; Tar.High (High.inj (Lwt.return_ok (Some data))) end
|
|
|
|
let entries_of_git ~mtime store repo urls =
|
|
let entries = Git.contents store in
|
|
let to_entry path =
|
|
Store.get store path >|= function
|
|
| Ok data ->
|
|
let data =
|
|
if Mirage_kv.Key.(equal path (v "repo"))
|
|
then repo else data
|
|
in
|
|
let file_mode = 0o644
|
|
and mod_time = Int64.of_int mtime
|
|
and user_id = 0
|
|
and group_id = 0
|
|
and size = String.length data in
|
|
let hdr = Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
|
|
(Mirage_kv.Key.to_string path) (Int64.of_int size) in
|
|
urls := Git.find_urls !urls path data;
|
|
Some (Some Tar.Header.Ustar, hdr, once data)
|
|
| Error _ -> None in
|
|
let entries = Lwt_stream.filter_map_s to_entry entries in
|
|
Lwt.return begin fun () -> Tar.High (High.inj (Lwt_stream.get entries >|= Result.ok)) end
|
|
|
|
let of_git repo store =
|
|
let now = Ptime.v (Pclock.now_d_ps ()) in
|
|
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
|
let urls = ref SM.empty in
|
|
entries_of_git ~mtime store repo urls >>= fun entries ->
|
|
let t = Tar.out ~level:Ustar entries in
|
|
let t = Tar_gz.out_gzipped ~level:4 ~mtime:(Int32.of_int mtime) Gz.Unix t in
|
|
let buf = Buffer.create 1024 in
|
|
to_buffer buf t >|= function
|
|
| Ok () -> Buffer.contents buf, !urls
|
|
| Error (`Msg msg) -> failwith msg
|
|
end
|
|
|
|
module Serve = struct
|
|
let ptime_to_http_date ptime =
|
|
let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime
|
|
and weekday = match Ptime.weekday ptime with
|
|
| `Mon -> "Mon" | `Tue -> "Tue" | `Wed -> "Wed" | `Thu -> "Thu"
|
|
| `Fri -> "Fri" | `Sat -> "Sat" | `Sun -> "Sun"
|
|
and month =
|
|
[| "Jan" ; "Feb" ; "Mar" ; "Apr" ; "May" ; "Jun" ;
|
|
"Jul" ; "Aug" ; "Sep" ; "Oct" ; "Nov" ; "Dec" |]
|
|
in
|
|
let m' = Array.get month (pred m) in
|
|
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
|
|
|
|
let commit_id git_kv =
|
|
Store.digest git_kv Mirage_kv.Key.empty >|= fun r ->
|
|
Result.fold r ~ok:Ohex.encode
|
|
~error:(fun e ->
|
|
Logs.err (fun m -> m "%a" Store.pp_error e);
|
|
exit 2)
|
|
|
|
let repo remote commit =
|
|
let upstream = List.hd (String.split_on_char '#' remote) in
|
|
Fmt.str
|
|
{|opam-version: "2.0"
|
|
upstream: "%s#%s"
|
|
archive-mirrors: "cache"
|
|
stamp: %S
|
|
|} upstream commit commit
|
|
|
|
let modified git_kv =
|
|
Store.last_modified git_kv Mirage_kv.Key.empty >|= fun r ->
|
|
let v =
|
|
Result.fold r
|
|
~ok:Fun.id
|
|
~error:(fun _ -> Ptime.v (Pclock.now_d_ps ()))
|
|
in
|
|
ptime_to_http_date v
|
|
|
|
type t = {
|
|
mutable commit_id : string ;
|
|
mutable modified : string ;
|
|
mutable repo : string ;
|
|
mutable index : string ;
|
|
}
|
|
|
|
let create remote git_kv =
|
|
commit_id git_kv >>= fun commit_id ->
|
|
modified git_kv >>= fun modified ->
|
|
let repo = repo remote commit_id in
|
|
Tarball.of_git repo git_kv >|= fun (index, urls) ->
|
|
{ commit_id ; modified ; repo ; index }, urls
|
|
|
|
let update_lock = Lwt_mutex.create ()
|
|
|
|
let update_git ~remote t git_kv =
|
|
Lwt_mutex.with_lock update_lock (fun () ->
|
|
Logs.info (fun m -> m "pulling the git repository");
|
|
Git_kv.pull git_kv >>= function
|
|
| Error `Msg msg ->
|
|
Logs.err (fun m -> m "error %s while updating git" msg);
|
|
Lwt.return None
|
|
| Ok [] ->
|
|
Logs.info (fun m -> m "git changes are empty");
|
|
Lwt.return (Some ([], SM.empty))
|
|
| Ok changes ->
|
|
commit_id git_kv >>= fun commit_id ->
|
|
modified git_kv >>= fun modified ->
|
|
Logs.info (fun m -> m "git: %s" commit_id);
|
|
let repo = repo remote commit_id in
|
|
reset_parse_errors ();
|
|
Tarball.of_git repo git_kv >|= fun (index, urls) ->
|
|
t.commit_id <- commit_id ;
|
|
t.modified <- modified ;
|
|
t.repo <- repo ;
|
|
t.index <- index;
|
|
Some (changes, urls))
|
|
|
|
let status t disk =
|
|
(* report status:
|
|
- archive size (can we easily measure?) and number of "good" elements
|
|
- list of current downloads
|
|
- list of failed downloads
|
|
*)
|
|
let archive_stats =
|
|
Fmt.str "<ul><li>commit %s</li><li>last modified %s</li><li>repo %s</li><li>%u validated archives on disk</li><li>%Lu bytes free</li></ul>"
|
|
t.commit_id t.modified (K.remote ())
|
|
(SM.cardinal disk.Disk.md5s)
|
|
(KV.free disk.Disk.dev)
|
|
in
|
|
let sort_by_ts a b = Ptime.compare b a in
|
|
let active_downloads =
|
|
let header = "<h2>Active downloads</h2><ul>" in
|
|
let content =
|
|
SM.bindings !active_downloads |>
|
|
List.sort (fun (_, (a, _)) (_, (b, _)) -> sort_by_ts a b) |>
|
|
List.map (fun (url, (ts, bytes_written)) ->
|
|
"<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to swap</li>")
|
|
in
|
|
header ^ String.concat "" content ^ "</ul>"
|
|
and failed_downloads =
|
|
let header = "<h2>Failed downloads</h2>" in
|
|
let group_by xs =
|
|
let t = Hashtbl.create 7 in
|
|
List.iter (fun ((_, (_, reason)) as e) ->
|
|
let k = key_of_failed reason in
|
|
let els = Option.value ~default:[] (Hashtbl.find_opt t k) in
|
|
Hashtbl.replace t k (e :: els))
|
|
xs;
|
|
Hashtbl.fold (fun k els acc ->
|
|
let sorted =
|
|
List.sort (fun (_, (tsa, _)) (_, (tsb, _)) ->
|
|
sort_by_ts tsa tsb)
|
|
els
|
|
in
|
|
(k, sorted) :: acc)
|
|
t []
|
|
in
|
|
let content =
|
|
SM.bindings !failed_downloads |>
|
|
group_by |>
|
|
List.sort (fun (a, _) (b, _) -> compare_failed_key a b) |>
|
|
List.map (fun (key, els) ->
|
|
let header = Fmt.str "<h3>%a</h3><ul>" pp_key key in
|
|
let content =
|
|
List.map (fun (url, (ts, reason)) ->
|
|
Fmt.str "<li>%s: %s error %a"
|
|
(Ptime.to_rfc3339 ?tz_offset_s:None ts) url pp_failed reason)
|
|
els
|
|
in
|
|
header ^ String.concat "" content ^ "</ul>")
|
|
in
|
|
header ^ String.concat "" content
|
|
and parse_errors =
|
|
let header = "<h2>Parse errors</h2><ul>" in
|
|
let content =
|
|
SM.bindings !parse_errors |>
|
|
List.sort (fun (a, _) (b, _) -> String.compare a b) |>
|
|
List.map (fun (filename, reason) ->
|
|
"<li>" ^ filename ^ ": " ^ reason ^ "</li>")
|
|
in
|
|
header ^ String.concat "" content ^ "</ul>"
|
|
in
|
|
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>"
|
|
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ; parse_errors ]
|
|
^ "</div></body></html>"
|
|
|
|
let not_modified request (modified, etag) =
|
|
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
|
| Some ts -> String.equal ts modified
|
|
| None -> match Httpaf.Headers.get request.Httpaf.Request.headers "if-none-match" with
|
|
| Some etags -> List.mem etag (String.split_on_char ',' etags)
|
|
| None -> false
|
|
|
|
let not_found reqd path =
|
|
let data = "Resource not found " ^ path in
|
|
let headers = Httpaf.Headers.of_list
|
|
[ "content-length", string_of_int (String.length data) ] in
|
|
let resp = Httpaf.Response.create ~headers `Not_found in
|
|
Httpaf.Reqd.respond_with_string reqd resp data
|
|
|
|
let respond_with_empty reqd resp =
|
|
let hdr =
|
|
Httpaf.Headers.add_unless_exists resp.Httpaf.Response.headers
|
|
"connection" "close"
|
|
in
|
|
let resp = { resp with Httpaf.Response.headers = hdr } in
|
|
Httpaf.Reqd.respond_with_string reqd resp ""
|
|
|
|
(* From the OPAM manual, all we need:
|
|
/repo -- repository configuration file
|
|
/cache -- cached archives
|
|
/index.tar.gz -- archive containing the whole repository contents
|
|
*)
|
|
(* may include "announce: [ string { filter } ... ]" *)
|
|
(* use Key_gen.remote for browse & upstream *)
|
|
|
|
(* for repo and index.tar.gz:
|
|
if Last_modified.not_modified request then
|
|
let resp = Httpaf.Response.create `Not_modified in
|
|
respond_with_empty reqd resp
|
|
else *)
|
|
let dispatch t store hook_url update _flow _conn reqd =
|
|
let request = Httpaf.Reqd.request reqd in
|
|
match String.split_on_char '/' request.Httpaf.Request.target with
|
|
| [ ""; x ] when String.equal x hook_url ->
|
|
Lwt.async update;
|
|
let data = "Update in progress" in
|
|
let mime_type = "text/plain" in
|
|
let headers = [
|
|
"content-type", mime_type ;
|
|
"etag", t.commit_id ;
|
|
"last-modified", t.modified ;
|
|
"content-length", string_of_int (String.length data) ;
|
|
] in
|
|
let headers = Httpaf.Headers.of_list headers in
|
|
let resp = Httpaf.Response.create ~headers `OK in
|
|
Httpaf.Reqd.respond_with_string reqd resp data
|
|
| [ ""; x ] when String.equal x "status" ->
|
|
let data = status t store in
|
|
let mime_type = "text/html" in
|
|
let headers = [
|
|
"content-type", mime_type ;
|
|
"content-length", string_of_int (String.length data) ;
|
|
] in
|
|
let headers = Httpaf.Headers.of_list headers in
|
|
let resp = Httpaf.Response.create ~headers `OK in
|
|
Httpaf.Reqd.respond_with_string reqd resp data
|
|
| [ ""; "repo" ] ->
|
|
if not_modified request (t.modified, t.commit_id) then
|
|
let resp = Httpaf.Response.create `Not_modified in
|
|
respond_with_empty reqd resp
|
|
else
|
|
let data = t.repo in
|
|
let mime_type = "text/plain" in
|
|
let headers = [
|
|
"content-type", mime_type ;
|
|
"etag", t.commit_id ;
|
|
"last-modified", t.modified ;
|
|
"content-length", string_of_int (String.length data) ;
|
|
] in
|
|
let headers = Httpaf.Headers.of_list headers in
|
|
let resp = Httpaf.Response.create ~headers `OK in
|
|
Httpaf.Reqd.respond_with_string reqd resp data
|
|
| [ ""; "index.tar.gz" ] ->
|
|
(* deliver prepared tarball *)
|
|
if not_modified request (t.modified, t.commit_id) then
|
|
let resp = Httpaf.Response.create `Not_modified in
|
|
respond_with_empty reqd resp
|
|
else
|
|
let data = t.index in
|
|
let mime_type = "application/octet-stream" in
|
|
let headers = [
|
|
"content-type", mime_type ;
|
|
"etag", t.commit_id ;
|
|
"last-modified", t.modified ;
|
|
"content-length", string_of_int (String.length data) ;
|
|
] in
|
|
let headers = Httpaf.Headers.of_list headers in
|
|
let resp = Httpaf.Response.create ~headers `OK in
|
|
Httpaf.Reqd.respond_with_string reqd resp data
|
|
| "" :: "cache" :: hash_algo :: _ :: hash :: [] ->
|
|
(* `<hash-algo>/<first-2-hash-characters>/<hash>` *)
|
|
begin
|
|
match hash_of_string hash_algo with
|
|
| Error `Msg msg ->
|
|
not_found reqd request.Httpaf.Request.target
|
|
| Ok h ->
|
|
let hash = Mirage_kv.Key.v hash in
|
|
Lwt.async (fun () ->
|
|
(Disk.last_modified store h hash >|= function
|
|
| Error _ -> t.modified
|
|
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
|
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
|
let resp = Httpaf.Response.create `Not_modified in
|
|
respond_with_empty reqd resp;
|
|
Lwt.return_unit
|
|
else
|
|
Disk.size store h hash >>= function
|
|
| Error _ ->
|
|
not_found reqd request.Httpaf.Request.target;
|
|
Lwt.return_unit
|
|
| Ok size ->
|
|
let size = Optint.Int63.to_string size in
|
|
let mime_type = "application/octet-stream" in
|
|
let headers = [
|
|
"content-type", mime_type ;
|
|
"etag", Mirage_kv.Key.basename hash ;
|
|
"last-modified", last_modified ;
|
|
"content-length", size ;
|
|
]
|
|
in
|
|
let headers = Httpaf.Headers.of_list headers in
|
|
let resp = Httpaf.Response.create ~headers `OK in
|
|
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
|
Disk.read_chunked store h hash (fun () chunk ->
|
|
let wait, wakeup = Lwt.task () in
|
|
(* FIXME: catch exception when body is closed *)
|
|
Httpaf.Body.write_string body chunk;
|
|
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
|
wait) () >|= fun _ ->
|
|
Httpaf.Body.close_writer body)
|
|
end
|
|
| _ ->
|
|
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
|
not_found reqd request.Httpaf.Request.target
|
|
|
|
end
|
|
|
|
let download_archives parallel_downloads disk http_client urls =
|
|
(* FIXME: handle resuming partial downloads *)
|
|
reset_failed_downloads ();
|
|
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
|
|
Lwt_list.iter_p (fun (url, csums) ->
|
|
Lwt_pool.use pool @@ fun () ->
|
|
HM.fold (fun h v r ->
|
|
r >>= function
|
|
| true -> Disk.exists disk h (hex_to_key v)
|
|
| false -> Lwt.return false)
|
|
csums (Lwt.return true) >>= function
|
|
| true -> Lwt.return_unit
|
|
| false ->
|
|
let quux, body_init = Disk.init_write disk csums in
|
|
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
|
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
|
| Ok (resp, r) ->
|
|
begin match r with
|
|
| Error `Bad_response ->
|
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
(`Bad_response (resp.status, resp.reason));
|
|
Lwt.return_unit
|
|
| Error `Write_error e ->
|
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
|
|
Lwt.return_unit
|
|
| Error `Swap e ->
|
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
|
|
Lwt.return_unit
|
|
| Ok (digests, body) ->
|
|
Disk.finalize_write disk quux ~url body csums digests
|
|
end
|
|
| Error me ->
|
|
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Mimic me);
|
|
Lwt.return_unit)
|
|
(SM.bindings urls) >>= fun () ->
|
|
Disk.update_caches disk >|= fun () ->
|
|
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
|
|
|
let dump_git git_dump git_kv =
|
|
let stream = Git_kv.to_octets git_kv in
|
|
Lwt_stream.to_list stream >>= fun datas ->
|
|
let data = String.concat "" datas in
|
|
Cache.write git_dump data >|= function
|
|
| Ok () ->
|
|
Logs.info (fun m -> m "dumped git %d bytes" (String.length data))
|
|
| Error e ->
|
|
Logs.warn (fun m -> m "failed to dump git: %a" Cache.pp_write_error e)
|
|
|
|
let restore_git ~remote git_dump git_ctx =
|
|
Cache.read git_dump >>= function
|
|
| Ok None -> Lwt.return (Error ())
|
|
| Error e ->
|
|
Logs.warn (fun m -> m "failed to read git state: %a" Cache.pp_error e);
|
|
Lwt.return (Error ())
|
|
| Ok Some data ->
|
|
let stream = Lwt_stream.return data in
|
|
Git_kv.of_octets git_ctx ~remote stream >|= function
|
|
| Ok git_kv -> Ok git_kv
|
|
| Error `Msg msg ->
|
|
Logs.err (fun m -> m "error restoring git state: %s" msg);
|
|
Error ()
|
|
|
|
module Paf = Paf_mirage.Make(Stack.TCP)
|
|
|
|
let start_mirror { Part.tar; swap; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
|
KV.connect tar >>= fun kv ->
|
|
Cache.connect git_dump >>= fun git_dump ->
|
|
Cache.connect md5s >>= fun md5s ->
|
|
Cache.connect sha512s >>= fun sha512s ->
|
|
Swap.connect swap >>= fun swap ->
|
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
|
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk ->
|
|
let remote = K.remote () in
|
|
if K.check () then
|
|
Lwt.return_unit
|
|
else
|
|
begin
|
|
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
|
(if K.ignore_local_git () then
|
|
Lwt.return (Error ())
|
|
else
|
|
restore_git ~remote git_dump git_ctx) >>= function
|
|
| Ok git_kv -> Lwt.return git_kv
|
|
| Error () ->
|
|
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
|
dump_git git_dump git_kv >|= fun () ->
|
|
git_kv
|
|
end >>= fun git_kv ->
|
|
Logs.info (fun m -> m "Done initializing git state!");
|
|
Serve.commit_id git_kv >>= fun commit_id ->
|
|
Logs.info (fun m -> m "git: %s" commit_id);
|
|
Serve.create remote git_kv >>= fun (serve, urls) ->
|
|
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
|
let update () =
|
|
Serve.update_git ~remote serve git_kv >>= function
|
|
| None | Some ([], _) -> Lwt.return_unit
|
|
| Some (_changes, urls) ->
|
|
dump_git git_dump git_kv >>= fun () ->
|
|
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
|
in
|
|
let service =
|
|
Paf.http_service
|
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
|
(Serve.dispatch serve disk (K.hook_url ()) update)
|
|
in
|
|
let `Initialized th = Paf.serve service t in
|
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
|
Lwt.async (fun () ->
|
|
let rec go () =
|
|
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
|
update () >>= fun () ->
|
|
go ()
|
|
in
|
|
go ());
|
|
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
|
(th >|= fun _v -> ())
|
|
|
|
let start block _time _pclock stack git_ctx http_ctx =
|
|
let initialize_disk = K.initialize_disk ()
|
|
and sectors_cache = K.sectors_cache ()
|
|
and sectors_git = K.sectors_git ()
|
|
and sectors_swap = K.sectors_swap () in
|
|
if initialize_disk then
|
|
Part.format block ~sectors_cache ~sectors_git ~sectors_swap >>= function
|
|
| Ok () ->
|
|
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
|
Lwt.return_unit
|
|
| Error `Msg e ->
|
|
Logs.err (fun m -> m "Error formatting disk: %s" e);
|
|
exit Mirage_runtime.argument_error
|
|
| Error `Block e ->
|
|
Logs.err (fun m -> m "Error formatting disk: %a" BLOCK.pp_write_error e);
|
|
exit 2
|
|
else
|
|
Part.connect block >>= fun parts ->
|
|
start_mirror parts stack git_ctx http_ctx
|
|
end
|