diff --git a/mirage/config.ml b/mirage/config.ml index 3f5180f..5eda798 100644 --- a/mirage/config.ml +++ b/mirage/config.ml @@ -66,7 +66,7 @@ let mirror = package ~min:"3.7.0" "git-paf" ; package "opam-file-format" ; package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ; - package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw" "tar-mirage" ; + package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ; package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ; package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ; ] diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index 9c49367..f2e825a 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -235,6 +235,45 @@ module Make Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e); Lwt.return_unit + let find_key t h key = + match + match h with + | `MD5 -> SM.find_opt key t.md5s + | `SHA512 -> SM.find_opt key t.sha512s + | `SHA256 -> Some key + | _ -> None + with + | None -> Error `Not_found + | Some x -> Ok x + + let read_chunked t h v f a = + match find_key t h v with + | Error `Not_found -> + Lwt.return (Error (`Not_found (Mirage_kv.Key.v v))) + | Ok x -> + let key = Mirage_kv.Key.v x in + KV.size t.dev key >>= function + | Error e -> + Logs.err (fun m -> m "error %a while reading %s %s" + KV.pp_error e (hash_to_string h) v); + Lwt.return (Error (`Not_found key)) + | Ok len -> + let chunk_size = 4096 in + let rec read_more a offset = + if offset < len then + KV.get_partial t.dev key ~offset ~length:chunk_size >>= function + | Ok data -> + f a data >>= fun a -> + read_more a (offset + chunk_size) + | Error e -> + Logs.err (fun m -> m "error %a while reading %s %s" + KV.pp_error e (hash_to_string h) v); + Lwt.return (Error e) + else + Lwt.return (Ok a) + in + read_more a 0 + (* on disk, we use a flat file system where the filename is the sha256 of the data *) let init ~verify dev dev_md5s dev_sha512s = Logs.info (fun m -> m "init with verify %B" verify); @@ -262,52 +301,83 @@ module Make Logs.warn (fun m -> m "unexpected dictionary at %s" name); Lwt.return_unit | `Value -> - let ( >|?= ) x f = Lwt_result.iter (fun v -> Lwt.return (f v)) x in - let _data = ref None in - let read_data () = - match !_data with - | Some cs -> Lwt.return (Ok cs) - | None -> - incr idx; - KV.get dev (Mirage_kv.Key.v name) >|= function - | Error e -> - Logs.err (fun m -> m "error %a reading %s" - KV.pp_error e name); - Error () - | Ok data -> - let cs = Cstruct.of_string data in - _data := Some cs; - Ok cs + let open Mirage_crypto.Hash in + let sha256_final = + if verify then + let f s = + let digest = SHA256.get s in + if not (String.equal name (to_hex digest)) then + Logs.err (fun m -> m "corrupt SHA256 data for %s, \ + computed %s (should remove)" + name (to_hex digest)) + in + Some f + else + None + and md5_final = + if not (SSet.mem name md5s) then + let f s = + let digest = MD5.get s in + t.md5s <- SM.add (to_hex digest) name t.md5s + in + Some f + else if verify then + let f s = + let digest = MD5.get s |> to_hex in + match SM.find_opt digest t.md5s with + | Some x when String.equal name x -> () + | y -> + Logs.err (fun m -> m "corrupt MD5 data for %s, \ + expected %a, computed %s" + name Fmt.(option ~none:(any "NONE") string) y + digest) + in + Some f + else + None + and sha512_final = + if not (SSet.mem name sha512s) then + let f s = + let digest = SHA512.get s in + t.sha512s <- SM.add (to_hex digest) name t.sha512s + in + Some f + else if verify then + let f s = + let digest = SHA512.get s |> to_hex in + match SM.find_opt digest t.sha512s with + | Some x when String.equal name x -> () + | y -> + Logs.err (fun m -> m "corrupt SHA512 data for %s, \ + expected %a, computed %s" + name Fmt.(option ~none:(any "NONE") string) y + digest) + in + Some f + else + None in - begin - if verify then begin - read_data () >|?= fun cs -> - let digest = Mirage_crypto.Hash.digest `SHA256 cs in - if not (String.equal name (to_hex digest)) then - Logs.err (fun m -> m "corrupt data, expected %s, read %s (should remove)" - name (hex_to_string (Cstruct.to_string digest))); - end else - Lwt.return_unit - end >>= fun () -> - begin - if not (SSet.mem name md5s) then begin - read_data () >|?= fun cs -> - let md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex in - let md5s = SM.add md5 name t.md5s in - t.md5s <- md5s - end else - Lwt.return_unit - end >>= fun () -> - begin - if not (SSet.mem name sha512s) then begin - read_data () >|?= fun cs -> - let sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex in - let sha512s = SM.add sha512 name t.sha512s in - t.sha512s <- sha512s - end else - Lwt.return_unit - end >|= fun () -> - Logs.info (fun m -> m "added %s" name)) + match sha256_final, md5_final, sha512_final with + | None, None, None -> Lwt.return_unit + | _ -> + read_chunked t `SHA256 name + (fun (sha256, md5, sha512) data -> + let cs = Cstruct.of_string data in + Lwt.return + (Option.map (fun t -> SHA256.feed t cs) sha256, + Option.map (fun t -> MD5.feed t cs) md5, + Option.map (fun t -> SHA512.feed t cs) sha512)) + (Option.map (fun _ -> SHA256.empty) sha256_final, + Option.map (fun _ -> MD5.empty) md5_final, + Option.map (fun _ -> SHA512.empty) sha512_final) >|= function + | Error e -> + Logs.err (fun m -> m "error %a of %s while computing digests" + KV.pp_error e name) + | Ok (sha256, md5, sha512) -> + Option.iter (fun f -> f (Option.get sha256)) sha256_final; + Option.iter (fun f -> f (Option.get md5)) md5_final; + Option.iter (fun f -> f (Option.get sha512)) sha512_final; + Logs.info (fun m -> m "added %s" name)) entries >>= fun () -> update_caches t >|= fun () -> t @@ -344,17 +414,6 @@ module Make end else Lwt.return_unit - let find_key t h key = - match - match h with - | `MD5 -> SM.find_opt key t.md5s - | `SHA512 -> SM.find_opt key t.sha512s - | `SHA256 -> Some key - | _ -> None - with - | None -> Error `Not_found - | Some x -> Ok x - let exists t h v = match find_key t h v with | Error _ -> Lwt.return false @@ -371,17 +430,6 @@ module Make (hash_to_string h) v KV.pp_error e); false - let read t h v = - match find_key t h v with - | Error _ as e -> Lwt.return e - | Ok x -> - KV.get t.dev (Mirage_kv.Key.v x) >|= function - | Ok data -> Ok data - | Error e -> - Logs.err (fun m -> m "error %a while reading %s %s" - KV.pp_error e (hash_to_string h) v); - Error `Not_found - let last_modified t h v = match find_key t h v with | Error _ as e -> Lwt.return e @@ -389,10 +437,21 @@ module Make KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function | Ok data -> Ok data | Error e -> - Logs.err (fun m -> m "error %a while reading %s %s" + Logs.err (fun m -> m "error %a while last_modified %s %s" KV.pp_error e (hash_to_string h) v); Error `Not_found - end + + let size t h v = + match find_key t h v with + | Error _ as e -> Lwt.return e + | Ok x -> + KV.size t.dev (Mirage_kv.Key.v x) >|= function + | Ok s -> Ok s + | Error e -> + Logs.err (fun m -> m "error %a while size %s %s" + KV.pp_error e (hash_to_string h) v); + Error `Not_found + end module Tarball = struct module Async = struct @@ -626,22 +685,30 @@ stamp: %S respond_with_empty reqd resp; Lwt.return_unit else - Disk.read store h hash >>= function + Disk.size store h hash >>= function | Error _ -> + Logs.warn (fun m -> m "error retrieving size"); not_found reqd request.Httpaf.Request.target; Lwt.return_unit - | Ok data -> + | Ok size -> + let size = string_of_int size in let mime_type = "application/octet-stream" in let headers = [ "content-type", mime_type ; "etag", hash ; "last-modified", last_modified ; - "content-length", string_of_int (String.length data) ; - ] in + "content-length", size ; + ] + in let headers = Httpaf.Headers.of_list headers in let resp = Httpaf.Response.create ~headers `OK in - Httpaf.Reqd.respond_with_string reqd resp data ; - Lwt.return_unit) + let body = Httpaf.Reqd.respond_with_streaming reqd resp in + Disk.read_chunked store h hash (fun () chunk -> + let wait, wakeup = Lwt.task () in + Httpaf.Body.write_string body chunk; + Httpaf.Body.flush body (Lwt.wakeup wakeup); + wait) () >|= fun _ -> + Httpaf.Body.close_writer body) end | _ -> Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);