diff --git a/mirage/bad.ml b/mirage/bad.ml index 2a5a491..8b40ea2 100644 --- a/mirage/bad.ml +++ b/mirage/bad.ml @@ -39,7 +39,9 @@ let archives = "https://github.com/chetmurthy/pa_ppx/archive/0.01.tar.gz" ; "https://github.com/chambart/ocaml-1/archive/lto.tar.gz" ; "https://github.com/Kappa-Dev/KaSim/archive/v3.5-250915.tar.gz" ; - "https://github.com/bsansouci/bsb-native/archive/1.9.4.tar.gz" + "https://github.com/bsansouci/bsb-native/archive/1.9.4.tar.gz"; + "https://github.com/sanette/oplot/archive/0.7.tar.gz"; + "https://github.com/ulrikstrid/ocaml-cookie/releases/download/0.1.8/session-cookie-lwt-0.1.8.tbz"; ] and bad_request = [ diff --git a/mirage/config.ml b/mirage/config.ml index df11684..b83ed76 100644 --- a/mirage/config.ml +++ b/mirage/config.ml @@ -94,9 +94,9 @@ let tcp = tcpv4v6_of_stackv4v6 stack let git_client, alpn_client = let happy_eyeballs = generic_happy_eyeballs stack dns in - let git_happy_eyeballs = git_happy_eyeballs stack dns happy_eyeballs in - merge_git_clients (git_tcp tcp git_happy_eyeballs) - (git_http ~authenticator:tls_authenticator tcp git_happy_eyeballs), + let git = mimic_happy_eyeballs stack dns happy_eyeballs in + merge_git_clients (git_tcp tcp git) + (git_http ~authenticator:tls_authenticator tcp git), paf_client ~pclock:default_posix_clock tcp (mimic_happy_eyeballs stack dns happy_eyeballs) let program_block_size = diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index febb628..5bbb9ba 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -18,9 +18,28 @@ module Make module SM = Map.Make(String) module SSet = Set.Make(String) + let compare_hash h h' = + match h, h' with + | `SHA512, `SHA512 -> 0 + | `SHA512, _ -> 1 + | _, `SHA512 -> -1 + | `SHA384, `SHA384 -> 0 + | `SHA384, _ -> 1 + | _, `SHA384 -> -1 + | `SHA256, `SHA256 -> 0 + | `SHA256, _ -> 1 + | _, `SHA256 -> -1 + | `SHA224, `SHA224 -> 0 + | `SHA224, _ -> 1 + | _, `SHA224 -> -1 + | `SHA1, `SHA1 -> 0 + | `SHA1, `MD5 -> 1 + | `MD5, `MD5 -> 0 + | `MD5, _ -> -1 + module HM = Map.Make(struct type t = Mirage_crypto.Hash.hash - let compare = compare (* TODO remove polymorphic compare *) + let compare = compare_hash end) let hash_to_string = function @@ -41,6 +60,8 @@ module Make let `Hex h = Hex.of_string h in h + let hex_to_key h = Mirage_kv.Key.v (hex_to_string h) + let hex_of_string s = match Hex.to_string (`Hex s) with | d -> Ok d @@ -61,17 +82,16 @@ module Make Lwt.return acc | Ok steps -> Lwt_list.fold_left_s (fun acc (step, _) -> - let full_path = Mirage_kv.Key.add path step in - Store.exists store full_path >>= function + Store.exists store step >>= function | Error e -> Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e - Mirage_kv.Key.pp full_path); + Mirage_kv.Key.pp step); Lwt.return acc | Ok None -> - Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path); + Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step); Lwt.return acc - | Ok Some `Value -> Lwt.return (full_path :: acc) - | Ok Some `Dictionary -> go store full_path acc) acc steps + | Ok Some `Value -> Lwt.return (step :: acc) + | Ok Some `Dictionary -> go store step acc) acc steps in go store Mirage_kv.Key.empty [] @@ -98,6 +118,19 @@ module Make *) let open OpamParserTypes.FullPos in let opamfile = OpamParser.FullPos.string str filename in + let unavailable = + List.exists + (function + | { pelem = Variable ({ pelem = "available" ; _ }, + { pelem = (Bool false | List { pelem = [{ pelem = Bool false; _ }] ; _ }); _ }) + ; _ } -> true + | _ -> false) + opamfile.file_contents + in + if unavailable then + (Logs.info (fun m -> m "%s is marked unavailable, skipping" filename); + None) + else let url_section = List.find_opt (function | { pelem = Section ({ section_kind = { pelem = "url" ; _ } ; _ }) ; _} -> true | _ -> false) @@ -208,6 +241,10 @@ module Make dev_sha512s : Cache.t ; } + let pending = Mirage_kv.Key.v "pending" + + let to_delete = Mirage_kv.Key.v "to-delete" + let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s } let to_hex d = @@ -237,10 +274,13 @@ module Make Lwt.return_unit let find_key t h key = + assert (List.length (Mirage_kv.Key.segments key) = 1); match match h with - | `MD5 -> SM.find_opt key t.md5s - | `SHA512 -> SM.find_opt key t.sha512s + | `MD5 -> + Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s) + | `SHA512 -> + Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s) | `SHA256 -> Some key | _ -> None with @@ -250,13 +290,12 @@ module Make let read_chunked t h v f a = match find_key t h v with | Error `Not_found -> - Lwt.return (Error (`Not_found (Mirage_kv.Key.v v))) - | Ok x -> - let key = Mirage_kv.Key.v x in + Lwt.return (Error (`Not_found v)) + | Ok key -> KV.size t.dev key >>= function | Error e -> - Logs.err (fun m -> m "error %a while reading %s %s" - KV.pp_error e (hash_to_string h) v); + Logs.err (fun m -> m "error %a while reading %s %a" + KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v); Lwt.return (Error (`Not_found key)) | Ok len -> let chunk_size = 4096 in @@ -265,15 +304,262 @@ module Make KV.get_partial t.dev key ~offset ~length:chunk_size >>= function | Ok data -> f a data >>= fun a -> - read_more a (offset + chunk_size) + read_more a Optint.Int63.(add offset (of_int chunk_size)) | Error e -> - Logs.err (fun m -> m "error %a while reading %s %s" - KV.pp_error e (hash_to_string h) v); + Logs.err (fun m -> m "error %a while reading %s %a" + KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v); Lwt.return (Error e) else Lwt.return (Ok a) in - read_more a 0 + read_more a Optint.Int63.zero + + (* + module HM_running = struct + + let empty h = + let module H = (val Mirage_crypto.Hash.module_of h) in + (* We need MD5, SHA256 and SHA512. [h] is likely one of the + aforementioned and in that case we don't compute the same hash twice + *) + HM.empty + |> HM.add `MD5 Mirage_crypto.Hash.MD5.empty + |> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty + |> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty + |> HM.add h H.empty + + let feed t data = + HM.map (fun h v -> + let module H = (val Mirage_crypto.Hash.module_of h) in + H.feed v data) + t + + let get = + HM.map (fun h v -> + let module H = (val Mirage_crypto.Hash.module_of h) in + H.get v) + + + end + *) + + module Running_hash = struct + type _ t = + | MD5 : Mirage_crypto.Hash.MD5.t -> [> `MD5 ] t + | SHA1 : Mirage_crypto.Hash.SHA1.t -> [> `SHA1 ] t + | SHA224 : Mirage_crypto.Hash.SHA224.t -> [> `SHA224 ] t + | SHA256 : Mirage_crypto.Hash.SHA256.t -> [> `SHA256 ] t + | SHA384 : Mirage_crypto.Hash.SHA384.t -> [> `SHA384 ] t + | SHA512 : Mirage_crypto.Hash.SHA512.t -> [> `SHA512 ] t + + let empty : _ -> _ t = function + | `MD5 -> MD5 Mirage_crypto.Hash.MD5.empty + | `SHA1 -> SHA1 Mirage_crypto.Hash.SHA1.empty + | `SHA224 -> SHA224 Mirage_crypto.Hash.SHA224.empty + | `SHA256 -> SHA256 Mirage_crypto.Hash.SHA256.empty + | `SHA384 -> SHA384 Mirage_crypto.Hash.SHA384.empty + | `SHA512 -> SHA512 Mirage_crypto.Hash.SHA512.empty + + let feed t data = + let open Mirage_crypto.Hash in + match t with + | MD5 t -> MD5 (MD5.feed t data) + | SHA1 t -> SHA1 (SHA1.feed t data) + | SHA224 t -> SHA224 (SHA224.feed t data) + | SHA256 t -> SHA256 (SHA256.feed t data) + | SHA384 t -> SHA384 (SHA384.feed t data) + | SHA512 t -> SHA512 (SHA512.feed t data) + + let get t = + let open Mirage_crypto.Hash in + match t with + | MD5 t -> MD5.get t + | SHA1 t -> SHA1.get t + | SHA224 t -> SHA224.get t + | SHA256 t -> SHA256.get t + | SHA384 t -> SHA384.get t + | SHA512 t -> SHA512.get t + end + + type 'a digests = { + md5 : Mirage_crypto.Hash.MD5.t; + sha256 : Mirage_crypto.Hash.SHA256.t; + sha512 : Mirage_crypto.Hash.SHA512.t; + csum : 'a Running_hash.t; + } + + let empty_digests h = + let open Mirage_crypto.Hash in + { + md5 = MD5.empty; + sha256 = SHA256.empty; + sha512 = SHA512.empty; + csum = Running_hash.empty h; + } + + let update_digests { md5; sha256; sha512; csum } data = + let open Mirage_crypto.Hash in + let data = Cstruct.of_string data in + { + md5 = MD5.feed md5 data; + sha256 = SHA256.feed sha256 data; + sha512 = SHA512.feed sha512 data; + csum = Running_hash.feed csum data; + } + + let init_write csums = + let hash, csum = HM.max_binding csums in + (hash, csum), Ok (empty_digests hash, `Init) + + let content_length_of_string s = + match Int64.of_string s with + | len when len >= 0L -> `Fixed len + | _ | exception _ -> `Bad_response + + let body_length headers = + match H2.Headers.get_multi headers "content-length" with + | [] -> `Unknown + | [ x ] -> content_length_of_string x + | hd :: tl -> + (* if there are multiple content-length headers we require them all to be + * exactly equal. *) + if List.for_all (String.equal hd) tl then + content_length_of_string hd + else + `Bad_response + + let body_length (response : Http_mirage_client.response) = + if response.status <> `OK then + `Bad_response + else + body_length response.headers + + let pending_key (hash, csum) = + match hash with + | `SHA512 -> + (* We can't use hex because the filename would become too long for tar *) + Mirage_kv.Key.(pending / hash_to_string hash / Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum) + | _ -> + Mirage_kv.Key.(pending / hash_to_string hash / hex_to_string csum) + + let to_delete_key (hash, csum) = + let rand = "random" in (* FIXME: generate random string *) + let encoded_csum = + match hash with + | `SHA512 -> + (* We can't use hex because the filename would become too long for tar *) + Base64.encode_string ~alphabet:Base64.uri_safe_alphabet ~pad:false csum + | _ -> + hex_to_string csum + in + Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand)) + + let write_partial t (hash, csum) = + (* XXX: we may be in trouble if different hash functions are used for the same archive *) + let key = pending_key (hash, csum) in + let ( >>>= ) = Lwt_result.bind in + fun response r data -> + Lwt.return r >>>= fun (digests, acc) -> + let digests = update_digests digests data in + match acc with + | `Init -> + begin match body_length response with + | `Bad_response -> Lwt.return (Error `Bad_response) + | `Fixed size -> + KV.allocate t.dev key (Optint.Int63.of_int64 size) + |> Lwt_result.map_error (fun e -> `Write_error e) + >>>= fun () -> + KV.set_partial t.dev key ~offset:Optint.Int63.zero data + |> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () -> + let len = String.length data in + let offset = Optint.Int63.of_int len in + Lwt.return_ok (digests, `Fixed_body (size, offset)) + | `Unknown -> + Lwt.return_ok (digests, `Unknown data) + end + | `Fixed_body (size, offset) -> + KV.set_partial t.dev key ~offset data + |> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () -> + let len = String.length data in + let offset = Optint.Int63.(add offset (of_int len)) in + Lwt.return_ok (digests, `Fixed_body (size, offset)) + | `Unknown body -> + Lwt.return_ok (digests, `Unknown (body ^ data)) + + let digests_to_hm digests = + HM.empty + |> HM.add `MD5 + (Cstruct.to_string (Mirage_crypto.Hash.MD5.get digests.md5)) + |> HM.add `SHA256 + (Cstruct.to_string (Mirage_crypto.Hash.SHA256.get digests.sha256)) + |> HM.add `SHA512 + (Cstruct.to_string (Mirage_crypto.Hash.SHA512.get digests.sha512)) + + let check_csums_digests csums digests = + let csums' = digests_to_hm digests in + let common_bindings = List.filter (fun (h, _) -> HM.mem h csums) (HM.bindings csums') in + List.length common_bindings > 0 && + List.for_all + (fun (h, csum) -> String.equal csum (HM.find h csums)) + common_bindings + + let finalize_write t (hash, csum) ~url (body : [ `Unknown of string | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests = + let sizes_match, body_size_in_header = + match body with + | `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true + | `Unknown _ -> true, false + | `Init -> assert false + in + let source = pending_key (hash, csum) in + if check_csums_digests csums digests && sizes_match then + let sha256 = to_hex (Mirage_crypto.Hash.SHA256.get digests.sha256) + and md5 = to_hex (Mirage_crypto.Hash.MD5.get digests.md5) + and sha512 = to_hex (Mirage_crypto.Hash.SHA512.get digests.sha512) in + let dest = Mirage_kv.Key.v sha256 in + begin match body with + | `Unknown body -> + Logs.info (fun m -> m "downloaded %s, now writing" url); + KV.set t.dev dest body + | `Fixed_body (_reported_size, _actual_size) -> + Logs.info (fun m -> m "downloaded %s" url); + KV.rename t.dev ~source ~dest + | `Init -> assert false + end >|= function + | Ok () -> + t.md5s <- SM.add md5 sha256 t.md5s; + t.sha512s <- SM.add sha512 sha256 t.sha512s + | Error e -> + Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e) + else begin + (if sizes_match then + Logs.err (fun m -> m "Bad checksum %s: computed %s expected %s" url + (hash_to_string hash) (hex_to_string csum)) + else match body with + | `Fixed_body (reported, actual) -> + Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes" + url Optint.Int63.pp actual reported) + | `Unknown _ -> assert false + | `Init -> assert false); + if body_size_in_header then + (* if the checksums mismatch we want to delete the file. We are only + able to do so if it was the latest created file, so we expect and + error. Ideally, we want to match for `Append_only or other errors *) + KV.remove t.dev source >>= function + | Ok () -> Lwt.return_unit + | Error e -> + (* we failed to delete the file so we mark it for deletion *) + let dest = to_delete_key (hash, csum) in + Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a" + Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest); + KV.rename t.dev ~source ~dest >|= function + | Ok () -> () + | Error e -> + Logs.warn (fun m -> m "Error renaming file %a -> %a: %a" + Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e) + else + Lwt.return_unit + end + (* on disk, we use a flat file system where the filename is the sha256 of the data *) let init ~verify_sha256 dev dev_md5s dev_sha512s = @@ -298,11 +584,17 @@ module Make let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in let idx = ref 1 in - Lwt_list.iter_s (fun (name, typ) -> + (* XXX: should we do something about pending downloads?? *) + let entries = + List.filter (fun (p, _) -> + not (Mirage_kv.Key.equal p pending || Mirage_kv.Key.equal p to_delete)) + entries + in + Lwt_list.iter_s (fun (path, typ) -> if !idx mod 10 = 0 then Gc.full_major () ; match typ with | `Dictionary -> - Logs.warn (fun m -> m "unexpected dictionary at %s" name); + Logs.warn (fun m -> m "unexpected dictionary at %a" Mirage_kv.Key.pp path); Lwt.return_unit | `Value -> let open Mirage_crypto.Hash in @@ -310,28 +602,28 @@ module Make if verify_sha256 then let f s = let digest = SHA256.get s in - if not (String.equal name (to_hex digest)) then - Logs.err (fun m -> m "corrupt SHA256 data for %s, \ + if not (String.equal (Mirage_kv.Key.basename path) (to_hex digest)) then + Logs.err (fun m -> m "corrupt SHA256 data for %a, \ computed %s (should remove)" - name (to_hex digest)) + Mirage_kv.Key.pp path (to_hex digest)) in Some f else None and md5_final = - if not (SSet.mem name md5s) then + if not (SSet.mem (Mirage_kv.Key.basename path) md5s) then let f s = let digest = MD5.get s in - t.md5s <- SM.add (to_hex digest) name t.md5s + t.md5s <- SM.add (to_hex digest) (Mirage_kv.Key.basename path) t.md5s in Some f else None and sha512_final = - if not (SSet.mem name sha512s) then + if not (SSet.mem (Mirage_kv.Key.basename path) sha512s) then let f s = let digest = SHA512.get s in - t.sha512s <- SM.add (to_hex digest) name t.sha512s + t.sha512s <- SM.add (to_hex digest) (Mirage_kv.Key.basename path) t.sha512s in Some f else @@ -340,7 +632,7 @@ module Make match sha256_final, md5_final, sha512_final with | None, None, None -> Lwt.return_unit | _ -> - read_chunked t `SHA256 name + read_chunked t `SHA256 path (fun (sha256, md5, sha512) data -> let cs = Cstruct.of_string data in Lwt.return @@ -351,85 +643,53 @@ module Make Option.map (fun _ -> MD5.empty) md5_final, Option.map (fun _ -> SHA512.empty) sha512_final) >|= function | Error e -> - Logs.err (fun m -> m "error %a of %s while computing digests" - KV.pp_error e name) + Logs.err (fun m -> m "error %a of %a while computing digests" + KV.pp_error e Mirage_kv.Key.pp path) | Ok (sha256, md5, sha512) -> Option.iter (fun f -> f (Option.get sha256)) sha256_final; Option.iter (fun f -> f (Option.get md5)) md5_final; Option.iter (fun f -> f (Option.get sha512)) sha512_final; - Logs.info (fun m -> m "added %s" name)) + Logs.info (fun m -> m "added %a" Mirage_kv.Key.pp path)) entries >>= fun () -> update_caches t >|= fun () -> t - let write t ~url data hm = - let cs = Cstruct.of_string data in - let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex - and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex - and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex - in - if - HM.for_all (fun h v -> - let v' = - match h with `MD5 -> md5 | `SHA256 -> sha256 | `SHA512 -> sha512 | _ -> assert false - in - let v = hex_to_string v in - if String.equal v v' then - true - else begin - Logs.err (fun m -> m "%s hash mismatch %s: expected %s, got %s" url - (hash_to_string h) v v'); - false - end) hm - then begin - KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function - | Ok () -> - t.md5s <- SM.add md5 sha256 t.md5s; - t.sha512s <- SM.add sha512 sha256 t.sha512s; - Logs.debug (fun m -> m "wrote %s (%d bytes)" sha256 - (String.length data)) - | Error e -> - Logs.err (fun m -> m "error %a while writing %s (key %s)" - KV.pp_write_error e url sha256) - end else - Lwt.return_unit - let exists t h v = match find_key t h v with | Error _ -> Lwt.return false | Ok x -> - KV.exists t.dev (Mirage_kv.Key.v x) >|= function + KV.exists t.dev x >|= function | Ok Some `Value -> true | Ok Some `Dictionary -> - Logs.err (fun m -> m "unexpected dictionary for %s %s" - (hash_to_string h) v); + Logs.err (fun m -> m "unexpected dictionary for %s %a" + (hash_to_string h) Mirage_kv.Key.pp v); false | Ok None -> false | Error e -> - Logs.err (fun m -> m "exists %s %s returned %a" - (hash_to_string h) v KV.pp_error e); + Logs.err (fun m -> m "exists %s %a returned %a" + (hash_to_string h) Mirage_kv.Key.pp v KV.pp_error e); false let last_modified t h v = match find_key t h v with | Error _ as e -> Lwt.return e | Ok x -> - KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function + KV.last_modified t.dev x >|= function | Ok data -> Ok data | Error e -> - Logs.err (fun m -> m "error %a while last_modified %s %s" - KV.pp_error e (hash_to_string h) v); + Logs.err (fun m -> m "error %a while last_modified %s %a" + KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v); Error `Not_found let size t h v = match find_key t h v with | Error _ as e -> Lwt.return e | Ok x -> - KV.size t.dev (Mirage_kv.Key.v x) >|= function + KV.size t.dev x >|= function | Ok s -> Ok s | Error e -> - Logs.err (fun m -> m "error %a while size %s %s" - KV.pp_error e (hash_to_string h) v); + Logs.err (fun m -> m "error %a while size %s %a" + KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v); Error `Not_found end @@ -507,7 +767,10 @@ module Make let commit_id git_kv = Store.digest git_kv Mirage_kv.Key.empty >|= fun r -> - Result.get_ok r + Result.fold r ~ok:Fun.id + ~error:(fun e -> + Logs.err (fun m -> m "%a" Store.pp_error e); + exit 2) let repo commit = let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in @@ -520,8 +783,12 @@ stamp: %S let modified git_kv = Store.last_modified git_kv Mirage_kv.Key.empty >|= fun r -> - let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in - ptime_to_http_date (Ptime.v v) + let v = + Result.fold r + ~ok:Fun.id + ~error:(fun _ -> Ptime.v (Pclock.now_d_ps ())) + in + ptime_to_http_date v type t = { mutable commit_id : string ; @@ -654,13 +921,14 @@ stamp: %S Logs.warn (fun m -> m "error decoding hash algo: %s" msg); not_found reqd request.Httpaf.Request.target | Ok h -> + let hash = Mirage_kv.Key.v hash in Lwt.async (fun () -> (Disk.last_modified store h hash >|= function | Error _ -> Logs.warn (fun m -> m "error retrieving last modified"); t.modified - | Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified -> - if not_modified request (last_modified, hash) then + | Ok v -> ptime_to_http_date v) >>= fun last_modified -> + if not_modified request (last_modified, Mirage_kv.Key.basename hash) then let resp = Httpaf.Response.create `Not_modified in respond_with_empty reqd resp; Lwt.return_unit @@ -671,11 +939,11 @@ stamp: %S not_found reqd request.Httpaf.Request.target; Lwt.return_unit | Ok size -> - let size = string_of_int size in + let size = Optint.Int63.to_string size in let mime_type = "application/octet-stream" in let headers = [ "content-type", mime_type ; - "etag", hash ; + "etag", Mirage_kv.Key.basename hash ; "last-modified", last_modified ; "content-length", size ; ] @@ -685,6 +953,7 @@ stamp: %S let body = Httpaf.Reqd.respond_with_streaming reqd resp in Disk.read_chunked store h hash (fun () chunk -> let wait, wakeup = Lwt.task () in + (* FIXME: catch exception when body is closed *) Httpaf.Body.write_string body chunk; Httpaf.Body.flush body (Lwt.wakeup wakeup); wait) () >|= fun _ -> @@ -699,15 +968,17 @@ stamp: %S let bad_archives = SSet.of_list Bad.archives let download_archives disk http_client store = + (* FIXME: handle resuming partial downloads *) Git.find_urls store >>= fun urls -> let urls = SM.filter (fun k _ -> not (SSet.mem k bad_archives)) urls in let pool = Lwt_pool.create (Key_gen.parallel_downloads ()) (Fun.const Lwt.return_unit) in let idx = ref 0 in Lwt_list.iter_p (fun (url, csums) -> Lwt_pool.use pool @@ fun () -> + (* FIXME: check pending and to-delete *) HM.fold (fun h v r -> r >>= function - | true -> Disk.exists disk h (hex_to_string v) + | true -> Disk.exists disk h (hex_to_key v) | false -> Lwt.return false) csums (Lwt.return true) >>= function | true -> @@ -717,16 +988,20 @@ stamp: %S incr idx; if !idx mod 10 = 0 then Gc.full_major () ; Logs.info (fun m -> m "downloading %s" url); - let body _response acc data = Lwt.return (acc ^ data) in - Http_mirage_client.request http_client url body "" >>= function - | Ok (resp, body) -> - if resp.status = `OK then begin - Logs.info (fun m -> m "downloaded %s" url); - Disk.write disk ~url body csums - end else begin - Logs.warn (fun m -> m "%s: %a (reason %s)" - url H2.Status.pp_hum resp.status resp.reason); - Lwt.return_unit + let quux, body_init = Disk.init_write csums in + Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function + | Ok (resp, r) -> + begin match r with + | Error `Bad_response -> + Logs.warn (fun m -> m "%s: %a (reason %s)" + url H2.Status.pp_hum resp.status resp.reason); + Lwt.return_unit + | Error `Write_error e -> + Logs.err (fun m -> m "%s: write error %a" + url KV.pp_write_error e); + Lwt.return_unit + | Ok (digests, body) -> + Disk.finalize_write disk quux ~url body csums digests end | _ -> Lwt.return_unit) (SM.bindings urls) >>= fun () ->