Merge pull request 'mirage-kv 6.0.1' (#27) from mirage-kv-6 into main

Reviewed-on: https://git.robur.io/robur/opam-mirror/pulls/27
This commit is contained in:
Hannes Mehnert 2023-05-28 14:31:10 +00:00
commit b3a74b0c1d
3 changed files with 373 additions and 96 deletions

View file

@ -39,7 +39,9 @@ let archives =
"https://github.com/chetmurthy/pa_ppx/archive/0.01.tar.gz" ; "https://github.com/chetmurthy/pa_ppx/archive/0.01.tar.gz" ;
"https://github.com/chambart/ocaml-1/archive/lto.tar.gz" ; "https://github.com/chambart/ocaml-1/archive/lto.tar.gz" ;
"https://github.com/Kappa-Dev/KaSim/archive/v3.5-250915.tar.gz" ; "https://github.com/Kappa-Dev/KaSim/archive/v3.5-250915.tar.gz" ;
"https://github.com/bsansouci/bsb-native/archive/1.9.4.tar.gz" "https://github.com/bsansouci/bsb-native/archive/1.9.4.tar.gz";
"https://github.com/sanette/oplot/archive/0.7.tar.gz";
"https://github.com/ulrikstrid/ocaml-cookie/releases/download/0.1.8/session-cookie-lwt-0.1.8.tbz";
] ]
and bad_request = [ and bad_request = [

View file

@ -94,9 +94,9 @@ let tcp = tcpv4v6_of_stackv4v6 stack
let git_client, alpn_client = let git_client, alpn_client =
let happy_eyeballs = generic_happy_eyeballs stack dns in let happy_eyeballs = generic_happy_eyeballs stack dns in
let git_happy_eyeballs = git_happy_eyeballs stack dns happy_eyeballs in let git = mimic_happy_eyeballs stack dns happy_eyeballs in
merge_git_clients (git_tcp tcp git_happy_eyeballs) merge_git_clients (git_tcp tcp git)
(git_http ~authenticator:tls_authenticator tcp git_happy_eyeballs), (git_http ~authenticator:tls_authenticator tcp git),
paf_client ~pclock:default_posix_clock tcp (mimic_happy_eyeballs stack dns happy_eyeballs) paf_client ~pclock:default_posix_clock tcp (mimic_happy_eyeballs stack dns happy_eyeballs)
let program_block_size = let program_block_size =

View file

@ -18,9 +18,28 @@ module Make
module SM = Map.Make(String) module SM = Map.Make(String)
module SSet = Set.Make(String) module SSet = Set.Make(String)
let compare_hash h h' =
match h, h' with
| `SHA512, `SHA512 -> 0
| `SHA512, _ -> 1
| _, `SHA512 -> -1
| `SHA384, `SHA384 -> 0
| `SHA384, _ -> 1
| _, `SHA384 -> -1
| `SHA256, `SHA256 -> 0
| `SHA256, _ -> 1
| _, `SHA256 -> -1
| `SHA224, `SHA224 -> 0
| `SHA224, _ -> 1
| _, `SHA224 -> -1
| `SHA1, `SHA1 -> 0
| `SHA1, `MD5 -> 1
| `MD5, `MD5 -> 0
| `MD5, _ -> -1
module HM = Map.Make(struct module HM = Map.Make(struct
type t = Mirage_crypto.Hash.hash type t = Mirage_crypto.Hash.hash
let compare = compare (* TODO remove polymorphic compare *) let compare = compare_hash
end) end)
let hash_to_string = function let hash_to_string = function
@ -41,6 +60,8 @@ module Make
let `Hex h = Hex.of_string h in let `Hex h = Hex.of_string h in
h h
let hex_to_key h = Mirage_kv.Key.v (hex_to_string h)
let hex_of_string s = let hex_of_string s =
match Hex.to_string (`Hex s) with match Hex.to_string (`Hex s) with
| d -> Ok d | d -> Ok d
@ -61,17 +82,16 @@ module Make
Lwt.return acc Lwt.return acc
| Ok steps -> | Ok steps ->
Lwt_list.fold_left_s (fun acc (step, _) -> Lwt_list.fold_left_s (fun acc (step, _) ->
let full_path = Mirage_kv.Key.add path step in Store.exists store step >>= function
Store.exists store full_path >>= function
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e
Mirage_kv.Key.pp full_path); Mirage_kv.Key.pp step);
Lwt.return acc Lwt.return acc
| Ok None -> | Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path); Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
Lwt.return acc Lwt.return acc
| Ok Some `Value -> Lwt.return (full_path :: acc) | Ok Some `Value -> Lwt.return (step :: acc)
| Ok Some `Dictionary -> go store full_path acc) acc steps | Ok Some `Dictionary -> go store step acc) acc steps
in in
go store Mirage_kv.Key.empty [] go store Mirage_kv.Key.empty []
@ -98,6 +118,19 @@ module Make
*) *)
let open OpamParserTypes.FullPos in let open OpamParserTypes.FullPos in
let opamfile = OpamParser.FullPos.string str filename in let opamfile = OpamParser.FullPos.string str filename in
let unavailable =
List.exists
(function
| { pelem = Variable ({ pelem = "available" ; _ },
{ pelem = (Bool false | List { pelem = [{ pelem = Bool false; _ }] ; _ }); _ })
; _ } -> true
| _ -> false)
opamfile.file_contents
in
if unavailable then
(Logs.info (fun m -> m "%s is marked unavailable, skipping" filename);
None)
else
let url_section = let url_section =
List.find_opt (function List.find_opt (function
| { pelem = Section ({ section_kind = { pelem = "url" ; _ } ; _ }) ; _} -> true | _ -> false) | { pelem = Section ({ section_kind = { pelem = "url" ; _ } ; _ }) ; _} -> true | _ -> false)
@ -208,6 +241,10 @@ module Make
dev_sha512s : Cache.t ; dev_sha512s : Cache.t ;
} }
let pending = Mirage_kv.Key.v "pending"
let to_delete = Mirage_kv.Key.v "to-delete"
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s } let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
let to_hex d = let to_hex d =
@ -237,10 +274,13 @@ module Make
Lwt.return_unit Lwt.return_unit
let find_key t h key = let find_key t h key =
assert (List.length (Mirage_kv.Key.segments key) = 1);
match match
match h with match h with
| `MD5 -> SM.find_opt key t.md5s | `MD5 ->
| `SHA512 -> SM.find_opt key t.sha512s Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
| `SHA512 ->
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
| `SHA256 -> Some key | `SHA256 -> Some key
| _ -> None | _ -> None
with with
@ -250,13 +290,12 @@ module Make
let read_chunked t h v f a = let read_chunked t h v f a =
match find_key t h v with match find_key t h v with
| Error `Not_found -> | Error `Not_found ->
Lwt.return (Error (`Not_found (Mirage_kv.Key.v v))) Lwt.return (Error (`Not_found v))
| Ok x -> | Ok key ->
let key = Mirage_kv.Key.v x in
KV.size t.dev key >>= function KV.size t.dev key >>= function
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %s" Logs.err (fun m -> m "error %a while reading %s %a"
KV.pp_error e (hash_to_string h) v); KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Lwt.return (Error (`Not_found key)) Lwt.return (Error (`Not_found key))
| Ok len -> | Ok len ->
let chunk_size = 4096 in let chunk_size = 4096 in
@ -265,15 +304,262 @@ module Make
KV.get_partial t.dev key ~offset ~length:chunk_size >>= function KV.get_partial t.dev key ~offset ~length:chunk_size >>= function
| Ok data -> | Ok data ->
f a data >>= fun a -> f a data >>= fun a ->
read_more a (offset + chunk_size) read_more a Optint.Int63.(add offset (of_int chunk_size))
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %s" Logs.err (fun m -> m "error %a while reading %s %a"
KV.pp_error e (hash_to_string h) v); KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Lwt.return (Error e) Lwt.return (Error e)
else else
Lwt.return (Ok a) Lwt.return (Ok a)
in in
read_more a 0 read_more a Optint.Int63.zero
(*
module HM_running = struct
let empty h =
let module H = (val Mirage_crypto.Hash.module_of h) in
(* We need MD5, SHA256 and SHA512. [h] is likely one of the
aforementioned and in that case we don't compute the same hash twice
*)
HM.empty
|> HM.add `MD5 Mirage_crypto.Hash.MD5.empty
|> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty
|> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty
|> HM.add h H.empty
let feed t data =
HM.map (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.feed v data)
t
let get =
HM.map (fun h v ->
let module H = (val Mirage_crypto.Hash.module_of h) in
H.get v)
end
*)
module Running_hash = struct
type _ t =
| MD5 : Mirage_crypto.Hash.MD5.t -> [> `MD5 ] t
| SHA1 : Mirage_crypto.Hash.SHA1.t -> [> `SHA1 ] t
| SHA224 : Mirage_crypto.Hash.SHA224.t -> [> `SHA224 ] t
| SHA256 : Mirage_crypto.Hash.SHA256.t -> [> `SHA256 ] t
| SHA384 : Mirage_crypto.Hash.SHA384.t -> [> `SHA384 ] t
| SHA512 : Mirage_crypto.Hash.SHA512.t -> [> `SHA512 ] t
let empty : _ -> _ t = function
| `MD5 -> MD5 Mirage_crypto.Hash.MD5.empty
| `SHA1 -> SHA1 Mirage_crypto.Hash.SHA1.empty
| `SHA224 -> SHA224 Mirage_crypto.Hash.SHA224.empty
| `SHA256 -> SHA256 Mirage_crypto.Hash.SHA256.empty
| `SHA384 -> SHA384 Mirage_crypto.Hash.SHA384.empty
| `SHA512 -> SHA512 Mirage_crypto.Hash.SHA512.empty
let feed t data =
let open Mirage_crypto.Hash in
match t with
| MD5 t -> MD5 (MD5.feed t data)
| SHA1 t -> SHA1 (SHA1.feed t data)
| SHA224 t -> SHA224 (SHA224.feed t data)
| SHA256 t -> SHA256 (SHA256.feed t data)
| SHA384 t -> SHA384 (SHA384.feed t data)
| SHA512 t -> SHA512 (SHA512.feed t data)
let get t =
let open Mirage_crypto.Hash in
match t with
| MD5 t -> MD5.get t
| SHA1 t -> SHA1.get t
| SHA224 t -> SHA224.get t
| SHA256 t -> SHA256.get t
| SHA384 t -> SHA384.get t
| SHA512 t -> SHA512.get t
end
type 'a digests = {
md5 : Mirage_crypto.Hash.MD5.t;
sha256 : Mirage_crypto.Hash.SHA256.t;
sha512 : Mirage_crypto.Hash.SHA512.t;
csum : 'a Running_hash.t;
}
let empty_digests h =
let open Mirage_crypto.Hash in
{
md5 = MD5.empty;
sha256 = SHA256.empty;
sha512 = SHA512.empty;
csum = Running_hash.empty h;
}
let update_digests { md5; sha256; sha512; csum } data =
let open Mirage_crypto.Hash in
let data = Cstruct.of_string data in
{
md5 = MD5.feed md5 data;
sha256 = SHA256.feed sha256 data;
sha512 = SHA512.feed sha512 data;
csum = Running_hash.feed csum data;
}
let init_write csums =
let hash, csum = HM.max_binding csums in
(hash, csum), Ok (empty_digests hash, `Init)
let content_length_of_string s =
match Int64.of_string s with
| len when len >= 0L -> `Fixed len
| _ | exception _ -> `Bad_response
let body_length headers =
match H2.Headers.get_multi headers "content-length" with
| [] -> `Unknown
| [ x ] -> content_length_of_string x
| hd :: tl ->
(* if there are multiple content-length headers we require them all to be
* exactly equal. *)
if List.for_all (String.equal hd) tl then
content_length_of_string hd
else
`Bad_response
let body_length (response : Http_mirage_client.response) =
if response.status <> `OK then
`Bad_response
else
body_length response.headers
let pending_key (hash, csum) =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum)
| _ ->
Mirage_kv.Key.(pending / hash_to_string hash / hex_to_string csum)
let to_delete_key (hash, csum) =
let rand = "random" in (* FIXME: generate random string *)
let encoded_csum =
match hash with
| `SHA512 ->
(* We can't use hex because the filename would become too long for tar *)
Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum
| _ ->
hex_to_string csum
in
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
let write_partial t (hash, csum) =
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
let key = pending_key (hash, csum) in
let ( >>>= ) = Lwt_result.bind in
fun response r data ->
Lwt.return r >>>= fun (digests, acc) ->
let digests = update_digests digests data in
match acc with
| `Init ->
begin match body_length response with
| `Bad_response -> Lwt.return (Error `Bad_response)
| `Fixed size ->
KV.allocate t.dev key (Optint.Int63.of_int64 size)
|> Lwt_result.map_error (fun e -> `Write_error e)
>>>= fun () ->
KV.set_partial t.dev key ~offset:Optint.Int63.zero data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.of_int len in
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown ->
Lwt.return_ok (digests, `Unknown data)
end
| `Fixed_body (size, offset) ->
KV.set_partial t.dev key ~offset data
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.(add offset (of_int len)) in
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown body ->
Lwt.return_ok (digests, `Unknown (body ^ data))
let digests_to_hm digests =
HM.empty
|> HM.add `MD5
(Cstruct.to_string (Mirage_crypto.Hash.MD5.get digests.md5))
|> HM.add `SHA256
(Cstruct.to_string (Mirage_crypto.Hash.SHA256.get digests.sha256))
|> HM.add `SHA512
(Cstruct.to_string (Mirage_crypto.Hash.SHA512.get digests.sha512))
let check_csums_digests csums digests =
let csums' = digests_to_hm digests in
let common_bindings = List.filter (fun (h, _) -> HM.mem h csums) (HM.bindings csums') in
List.length common_bindings > 0 &&
List.for_all
(fun (h, csum) -> String.equal csum (HM.find h csums))
common_bindings
let finalize_write t (hash, csum) ~url (body : [ `Unknown of string | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests =
let sizes_match, body_size_in_header =
match body with
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true
| `Unknown _ -> true, false
| `Init -> assert false
in
let source = pending_key (hash, csum) in
if check_csums_digests csums digests && sizes_match then
let sha256 = to_hex (Mirage_crypto.Hash.SHA256.get digests.sha256)
and md5 = to_hex (Mirage_crypto.Hash.MD5.get digests.md5)
and sha512 = to_hex (Mirage_crypto.Hash.SHA512.get digests.sha512) in
let dest = Mirage_kv.Key.v sha256 in
begin match body with
| `Unknown body ->
Logs.info (fun m -> m "downloaded %s, now writing" url);
KV.set t.dev dest body
| `Fixed_body (_reported_size, _actual_size) ->
Logs.info (fun m -> m "downloaded %s" url);
KV.rename t.dev ~source ~dest
| `Init -> assert false
end >|= function
| Ok () ->
t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s
| Error e ->
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e)
else begin
(if sizes_match then
Logs.err (fun m -> m "Bad checksum %s: computed %s expected %s" url
(hash_to_string hash) (hex_to_string csum))
else match body with
| `Fixed_body (reported, actual) ->
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported)
| `Unknown _ -> assert false
| `Init -> assert false);
if body_size_in_header then
(* if the checksums mismatch we want to delete the file. We are only
able to do so if it was the latest created file, so we expect and
error. Ideally, we want to match for `Append_only or other errors *)
KV.remove t.dev source >>= function
| Ok () -> Lwt.return_unit
| Error e ->
(* we failed to delete the file so we mark it for deletion *)
let dest = to_delete_key (hash, csum) in
Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a"
Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest);
KV.rename t.dev ~source ~dest >|= function
| Ok () -> ()
| Error e ->
Logs.warn (fun m -> m "Error renaming file %a -> %a: %a"
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e)
else
Lwt.return_unit
end
(* on disk, we use a flat file system where the filename is the sha256 of the data *) (* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s = let init ~verify_sha256 dev dev_md5s dev_sha512s =
@ -298,11 +584,17 @@ module Make
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
let idx = ref 1 in let idx = ref 1 in
Lwt_list.iter_s (fun (name, typ) -> (* XXX: should we do something about pending downloads?? *)
let entries =
List.filter (fun (p, _) ->
not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete))
entries
in
Lwt_list.iter_s (fun (path, typ) ->
if !idx mod 10 = 0 then Gc.full_major () ; if !idx mod 10 = 0 then Gc.full_major () ;
match typ with match typ with
| `Dictionary -> | `Dictionary ->
Logs.warn (fun m -> m "unexpected dictionary at %s" name); Logs.warn (fun m -> m "unexpected dictionary at %a" Mirage_kv.Key.pp path);
Lwt.return_unit Lwt.return_unit
| `Value -> | `Value ->
let open Mirage_crypto.Hash in let open Mirage_crypto.Hash in
@ -310,28 +602,28 @@ module Make
if verify_sha256 then if verify_sha256 then
let f s = let f s =
let digest = SHA256.get s in let digest = SHA256.get s in
if not (String.equal name (to_hex digest)) then if not (String.equal (Mirage_kv.Key.basename path) (to_hex digest)) then
Logs.err (fun m -> m "corrupt SHA256 data for %s, \ Logs.err (fun m -> m "corrupt SHA256 data for %a, \
computed %s (should remove)" computed %s (should remove)"
name (to_hex digest)) Mirage_kv.Key.pp path (to_hex digest))
in in
Some f Some f
else else
None None
and md5_final = and md5_final =
if not (SSet.mem name md5s) then if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then
let f s = let f s =
let digest = MD5.get s in let digest = MD5.get s in
t.md5s <- SM.add (to_hex digest) name t.md5s t.md5s <- SM.add (to_hex digest) (Mirage_kv.Key.basename path) t.md5s
in in
Some f Some f
else else
None None
and sha512_final = and sha512_final =
if not (SSet.mem name sha512s) then if not (SSet.mem (Mirage_kv.Key.basename path) sha512s) then
let f s = let f s =
let digest = SHA512.get s in let digest = SHA512.get s in
t.sha512s <- SM.add (to_hex digest) name t.sha512s t.sha512s <- SM.add (to_hex digest) (Mirage_kv.Key.basename path) t.sha512s
in in
Some f Some f
else else
@ -340,7 +632,7 @@ module Make
match sha256_final, md5_final, sha512_final with match sha256_final, md5_final, sha512_final with
| None, None, None -> Lwt.return_unit | None, None, None -> Lwt.return_unit
| _ -> | _ ->
read_chunked t `SHA256 name read_chunked t `SHA256 path
(fun (sha256, md5, sha512) data -> (fun (sha256, md5, sha512) data ->
let cs = Cstruct.of_string data in let cs = Cstruct.of_string data in
Lwt.return Lwt.return
@ -351,85 +643,53 @@ module Make
Option.map (fun _ -> MD5.empty) md5_final, Option.map (fun _ -> MD5.empty) md5_final,
Option.map (fun _ -> SHA512.empty) sha512_final) >|= function Option.map (fun _ -> SHA512.empty) sha512_final) >|= function
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a of %s while computing digests" Logs.err (fun m -> m "error %a of %a while computing digests"
KV.pp_error e name) KV.pp_error e Mirage_kv.Key.pp path)
| Ok (sha256, md5, sha512) -> | Ok (sha256, md5, sha512) ->
Option.iter (fun f -> f (Option.get sha256)) sha256_final; Option.iter (fun f -> f (Option.get sha256)) sha256_final;
Option.iter (fun f -> f (Option.get md5)) md5_final; Option.iter (fun f -> f (Option.get md5)) md5_final;
Option.iter (fun f -> f (Option.get sha512)) sha512_final; Option.iter (fun f -> f (Option.get sha512)) sha512_final;
Logs.info (fun m -> m "added %s" name)) Logs.info (fun m -> m "added %a" Mirage_kv.Key.pp path))
entries >>= fun () -> entries >>= fun () ->
update_caches t >|= fun () -> update_caches t >|= fun () ->
t t
let write t ~url data hm =
let cs = Cstruct.of_string data in
let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex
and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex
in
if
HM.for_all (fun h v ->
let v' =
match h with `MD5 -> md5 | `SHA256 -> sha256 | `SHA512 -> sha512 | _ -> assert false
in
let v = hex_to_string v in
if String.equal v v' then
true
else begin
Logs.err (fun m -> m "%s hash mismatch %s: expected %s, got %s" url
(hash_to_string h) v v');
false
end) hm
then begin
KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function
| Ok () ->
t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s;
Logs.debug (fun m -> m "wrote %s (%d bytes)" sha256
(String.length data))
| Error e ->
Logs.err (fun m -> m "error %a while writing %s (key %s)"
KV.pp_write_error e url sha256)
end else
Lwt.return_unit
let exists t h v = let exists t h v =
match find_key t h v with match find_key t h v with
| Error _ -> Lwt.return false | Error _ -> Lwt.return false
| Ok x -> | Ok x ->
KV.exists t.dev (Mirage_kv.Key.v x) >|= function KV.exists t.dev x >|= function
| Ok Some `Value -> true | Ok Some `Value -> true
| Ok Some `Dictionary -> | Ok Some `Dictionary ->
Logs.err (fun m -> m "unexpected dictionary for %s %s" Logs.err (fun m -> m "unexpected dictionary for %s %a"
(hash_to_string h) v); (hash_to_string h) Mirage_kv.Key.pp v);
false false
| Ok None -> false | Ok None -> false
| Error e -> | Error e ->
Logs.err (fun m -> m "exists %s %s returned %a" Logs.err (fun m -> m "exists %s %a returned %a"
(hash_to_string h) v KV.pp_error e); (hash_to_string h) Mirage_kv.Key.pp v KV.pp_error e);
false false
let last_modified t h v = let last_modified t h v =
match find_key t h v with match find_key t h v with
| Error _ as e -> Lwt.return e | Error _ as e -> Lwt.return e
| Ok x -> | Ok x ->
KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function KV.last_modified t.dev x >|= function
| Ok data -> Ok data | Ok data -> Ok data
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while last_modified %s %s" Logs.err (fun m -> m "error %a while last_modified %s %a"
KV.pp_error e (hash_to_string h) v); KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Error `Not_found Error `Not_found
let size t h v = let size t h v =
match find_key t h v with match find_key t h v with
| Error _ as e -> Lwt.return e | Error _ as e -> Lwt.return e
| Ok x -> | Ok x ->
KV.size t.dev (Mirage_kv.Key.v x) >|= function KV.size t.dev x >|= function
| Ok s -> Ok s | Ok s -> Ok s
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while size %s %s" Logs.err (fun m -> m "error %a while size %s %a"
KV.pp_error e (hash_to_string h) v); KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Error `Not_found Error `Not_found
end end
@ -507,7 +767,10 @@ module Make
let commit_id git_kv = let commit_id git_kv =
Store.digest git_kv Mirage_kv.Key.empty >|= fun r -> Store.digest git_kv Mirage_kv.Key.empty >|= fun r ->
Result.get_ok r Result.fold r ~ok:Fun.id
~error:(fun e ->
Logs.err (fun m -> m "%a" Store.pp_error e);
exit 2)
let repo commit = let repo commit =
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
@ -520,8 +783,12 @@ stamp: %S
let modified git_kv = let modified git_kv =
Store.last_modified git_kv Mirage_kv.Key.empty >|= fun r -> Store.last_modified git_kv Mirage_kv.Key.empty >|= fun r ->
let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in let v =
ptime_to_http_date (Ptime.v v) Result.fold r
~ok:Fun.id
~error:(fun _ -> Ptime.v (Pclock.now_d_ps ()))
in
ptime_to_http_date v
type t = { type t = {
mutable commit_id : string ; mutable commit_id : string ;
@ -654,13 +921,14 @@ stamp: %S
Logs.warn (fun m -> m "error decoding hash algo: %s" msg); Logs.warn (fun m -> m "error decoding hash algo: %s" msg);
not_found reqd request.Httpaf.Request.target not_found reqd request.Httpaf.Request.target
| Ok h -> | Ok h ->
let hash = Mirage_kv.Key.v hash in
Lwt.async (fun () -> Lwt.async (fun () ->
(Disk.last_modified store h hash >|= function (Disk.last_modified store h hash >|= function
| Error _ -> | Error _ ->
Logs.warn (fun m -> m "error retrieving last modified"); Logs.warn (fun m -> m "error retrieving last modified");
t.modified t.modified
| Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified -> | Ok v -> ptime_to_http_date v) >>= fun last_modified ->
if not_modified request (last_modified, hash) then if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
let resp = Httpaf.Response.create `Not_modified in let resp = Httpaf.Response.create `Not_modified in
respond_with_empty reqd resp; respond_with_empty reqd resp;
Lwt.return_unit Lwt.return_unit
@ -671,11 +939,11 @@ stamp: %S
not_found reqd request.Httpaf.Request.target; not_found reqd request.Httpaf.Request.target;
Lwt.return_unit Lwt.return_unit
| Ok size -> | Ok size ->
let size = string_of_int size in let size = Optint.Int63.to_string size in
let mime_type = "application/octet-stream" in let mime_type = "application/octet-stream" in
let headers = [ let headers = [
"content-type", mime_type ; "content-type", mime_type ;
"etag", hash ; "etag", Mirage_kv.Key.basename hash ;
"last-modified", last_modified ; "last-modified", last_modified ;
"content-length", size ; "content-length", size ;
] ]
@ -685,6 +953,7 @@ stamp: %S
let body = Httpaf.Reqd.respond_with_streaming reqd resp in let body = Httpaf.Reqd.respond_with_streaming reqd resp in
Disk.read_chunked store h hash (fun () chunk -> Disk.read_chunked store h hash (fun () chunk ->
let wait, wakeup = Lwt.task () in let wait, wakeup = Lwt.task () in
(* FIXME: catch exception when body is closed *)
Httpaf.Body.write_string body chunk; Httpaf.Body.write_string body chunk;
Httpaf.Body.flush body (Lwt.wakeup wakeup); Httpaf.Body.flush body (Lwt.wakeup wakeup);
wait) () >|= fun _ -> wait) () >|= fun _ ->
@ -699,15 +968,17 @@ stamp: %S
let bad_archives = SSet.of_list Bad.archives let bad_archives = SSet.of_list Bad.archives
let download_archives disk http_client store = let download_archives disk http_client store =
(* FIXME: handle resuming partial downloads *)
Git.find_urls store >>= fun urls -> Git.find_urls store >>= fun urls ->
let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in
let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in
let idx = ref 0 in let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) -> Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () -> Lwt_pool.use pool @@ fun () ->
(* FIXME: check pending and to-delete *)
HM.fold (fun h v r -> HM.fold (fun h v r ->
r >>= function r >>= function
| true -> Disk.exists disk h (hex_to_string v) | true -> Disk.exists disk h (hex_to_key v)
| false -> Lwt.return false) | false -> Lwt.return false)
csums (Lwt.return true) >>= function csums (Lwt.return true) >>= function
| true -> | true ->
@ -717,16 +988,20 @@ stamp: %S
incr idx; incr idx;
if !idx mod 10 = 0 then Gc.full_major () ; if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url); Logs.info (fun m -> m "downloading %s" url);
let body _response acc data = Lwt.return (acc ^ data) in let quux, body_init = Disk.init_write csums in
Http_mirage_client.request http_client url body "" >>= function Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
| Ok (resp, body) -> | Ok (resp, r) ->
if resp.status = `OK then begin begin match r with
Logs.info (fun m -> m "downloaded %s" url); | Error `Bad_response ->
Disk.write disk ~url body csums
end else begin
Logs.warn (fun m -> m "%s: %a (reason %s)" Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason); url H2.Status.pp_hum resp.status resp.reason);
Lwt.return_unit Lwt.return_unit
| Error `Write_error e ->
Logs.err (fun m -> m "%s: write error %a"
url KV.pp_write_error e);
Lwt.return_unit
| Ok (digests, body) ->
Disk.finalize_write disk quux ~url body csums digests
end end
| _ -> Lwt.return_unit) | _ -> Lwt.return_unit)
(SM.bindings urls) >>= fun () -> (SM.bindings urls) >>= fun () ->