diff --git a/README.md b/README.md index 3c0e7bc..5a7dd53 100644 --- a/README.md +++ b/README.md @@ -3,5 +3,4 @@ This unikernel periodically (at startup, on request, every hour) updates the provided opam-repository and downloads all referenced archives. It acts as an opam-repository including archive mirror. Only archives with appropriate -checksums are stored. On startup, all data present on the block device is -validated. +checksums are stored. diff --git a/mirage/config.ml b/mirage/config.ml index 0cd340a..3f5180f 100644 --- a/mirage/config.ml +++ b/mirage/config.ml @@ -5,12 +5,16 @@ let http_client = typ HTTP_client let check = let doc = - Key.Arg.info - ~doc:"Only check the cache" - ["check"] + Key.Arg.info ~doc:"Only check the cache" ["check"] in Key.(create "check" Arg.(flag doc)) +let verify = + let doc = + Key.Arg.info ~doc:"Verify the cache contents" ["verify"] + in + Key.(create "verify" Arg.(flag doc)) + let remote = let doc = Key.Arg.info @@ -46,9 +50,14 @@ let tls_authenticator = let doc = Key.Arg.info ~doc ["tls-authenticator"] in Key.(create "tls-authenticator" Arg.(opt (some string) None doc)) +let sectors_cache = + let doc = "Number of sectors reserved for each checksum cache (md5, sha512)." in + let doc = Key.Arg.info ~doc ["sectors-cache"] in + Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc)) + let mirror = foreign "Unikernel.Make" - ~keys:[ Key.v check ; Key.v remote ; Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ; Key.v port ] + ~keys:[ Key.v check ; Key.v verify ; Key.v remote ; Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ; Key.v port ; Key.v sectors_cache ] ~packages:[ package ~min:"0.1.0" ~sublibs:[ "mirage" ] "paf" ; package "h2" ; @@ -58,6 +67,8 @@ let mirror = package "opam-file-format" ; package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ; package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw" "tar-mirage" ; + package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ; + package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ; ] (block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job) diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index bb9d194..9c49367 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -10,9 +10,12 @@ module Make (_ : sig end) (HTTP : Http_mirage_client.S) = struct - module KV = Tar_mirage.Make_KV_RW(BLOCK) + module Part = Mirage_block_partition.Make(BLOCK) + module KV = Tar_mirage.Make_KV_RW(Part) + module Cache = OneFFS.Make(Part) module SM = Map.Make(String) + module SSet = Set.Make(String) module HM = Map.Make(struct type t = Mirage_crypto.Hash.hash @@ -200,66 +203,120 @@ module Make mutable md5s : string SM.t ; mutable sha512s : string SM.t ; dev : KV.t ; + dev_md5s : Cache.t ; + dev_sha512s : Cache.t ; } - let empty dev = { md5s = SM.empty ; sha512s = SM.empty ; dev } + let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s } - let key _t d = + let to_hex d = let d = Cstruct.to_string d in hex_to_string d + let marshal_sm (sm : string SM.t) = + let version = char_of_int 1 in + String.make 1 version ^ Marshal.to_string sm [] + + let unmarshal_sm s = + let version = int_of_char s.[0] in + match version with + | 1 -> Ok (Marshal.from_string s 1 : string SM.t) + | _ -> Error ("Unsupported version " ^ string_of_int version) + + let update_caches t = + Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r -> + (match r with + | Ok () -> Logs.info (fun m -> m "Set 'md5s'") + | Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e)); + Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r -> + match r with + | Ok () -> Logs.info (fun m -> m "Set 'sha512s'"); Lwt.return_unit + | Error e -> + Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e); + Lwt.return_unit + (* on disk, we use a flat file system where the filename is the sha256 of the data *) - (* on startup, we read + validate all data, and also store in the overlays (md5/sha512) the pointers *) - (* the read can be md5/sha256/sha512 sum, and will output the data requested *) - (* a write will compute the hashes and save the data (also validating potential other hashes) *) - let init dev = + let init ~verify dev dev_md5s dev_sha512s = + Logs.info (fun m -> m "init with verify %B" verify); KV.list dev Mirage_kv.Key.empty >>= function | Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false | Ok entries -> - let t = empty dev in - Lwt_list.iteri_s (fun idx (name, typ) -> - if idx mod 10 = 0 then Gc.full_major () ; + let t = empty dev dev_md5s dev_sha512s in + Cache.read t.dev_md5s >>= fun r -> + (match r with + | Ok Some s -> Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s) + | Ok None -> Logs.debug (fun m -> m "No md5s cached") + | Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e)); + Cache.read t.dev_sha512s >>= fun r -> + (match r with + | Ok Some s -> Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s) + | Ok None -> Logs.debug (fun m -> m "No sha512s cached") + | Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e)); + let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) + and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in + let idx = ref 1 in + Lwt_list.iter_s (fun (name, typ) -> + if !idx mod 10 = 0 then Gc.full_major () ; match typ with | `Dictionary -> Logs.warn (fun m -> m "unexpected dictionary at %s" name); Lwt.return_unit | `Value -> - KV.get dev (Mirage_kv.Key.v name) >>= function - | Ok data -> - let cs = Cstruct.of_string data in - let digest = Mirage_crypto.Hash.digest `SHA256 cs in - if String.equal name (key t digest) then begin - let md5 = Mirage_crypto.Hash.digest `MD5 cs |> key t - and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> key t - in - let md5s = SM.add md5 name t.md5s - and sha512s = SM.add sha512 name t.sha512s - in - t.md5s <- md5s ; t.sha512s <- sha512s; - Logs.info (fun m -> m "added %s" name); - Lwt.return_unit - end else begin - Logs.err (fun m -> m "corrupt data, expected %s, read %s (should remove)" - name (hex_to_string (Cstruct.to_string digest))); - (*KV.remove dev (Mirage_kv.Key.v name) >|= function - | Ok () -> () + let ( >|?= ) x f = Lwt_result.iter (fun v -> Lwt.return (f v)) x in + let _data = ref None in + let read_data () = + match !_data with + | Some cs -> Lwt.return (Ok cs) + | None -> + incr idx; + KV.get dev (Mirage_kv.Key.v name) >|= function | Error e -> - Logs.err (fun m -> m "error %a while removing %s" - KV.pp_write_error e (key_to_string t name)) *) + Logs.err (fun m -> m "error %a reading %s" + KV.pp_error e name); + Error () + | Ok data -> + let cs = Cstruct.of_string data in + _data := Some cs; + Ok cs + in + begin + if verify then begin + read_data () >|?= fun cs -> + let digest = Mirage_crypto.Hash.digest `SHA256 cs in + if not (String.equal name (to_hex digest)) then + Logs.err (fun m -> m "corrupt data, expected %s, read %s (should remove)" + name (hex_to_string (Cstruct.to_string digest))); + end else Lwt.return_unit - end - | Error e -> - Logs.err (fun m -> m "error %a reading %s" - KV.pp_error e name); - Lwt.return_unit) - entries >|= fun () -> + end >>= fun () -> + begin + if not (SSet.mem name md5s) then begin + read_data () >|?= fun cs -> + let md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex in + let md5s = SM.add md5 name t.md5s in + t.md5s <- md5s + end else + Lwt.return_unit + end >>= fun () -> + begin + if not (SSet.mem name sha512s) then begin + read_data () >|?= fun cs -> + let sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex in + let sha512s = SM.add sha512 name t.sha512s in + t.sha512s <- sha512s + end else + Lwt.return_unit + end >|= fun () -> + Logs.info (fun m -> m "added %s" name)) + entries >>= fun () -> + update_caches t >|= fun () -> t let write t ~url data hm = let cs = Cstruct.of_string data in - let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> key t - and md5 = Mirage_crypto.Hash.digest `MD5 cs |> key t - and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> key t + let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex + and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex + and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex in if HM.for_all (fun h v -> @@ -500,7 +557,7 @@ stamp: %S let resp = Httpaf.Response.create `Not_modified in respond_with_empty reqd resp else *) - let dispatch t store hook_url git_kv update _flow _conn reqd = + let dispatch t store hook_url update _flow _conn reqd = let request = Httpaf.Reqd.request reqd in Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target); match String.split_on_char '/' request.Httpaf.Request.target with @@ -629,16 +686,24 @@ stamp: %S Lwt.return_unit end | _ -> Lwt.return_unit) - (SM.bindings urls) >|= fun () -> + (SM.bindings urls) >>= fun () -> + Disk.update_caches disk >|= fun () -> Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls)) module Paf = Paf_mirage.Make(Time)(Stack.TCP) let start block _time _pclock stack git_ctx http_ctx = - KV.connect block >>= fun kv -> + BLOCK.get_info block >>= fun info -> + let sectors_cache = Key_gen.sectors_cache () in + Part.connect Int64.(sub info.size_sectors (mul 2L sectors_cache)) block >>= fun (b1, rest) -> + let b2, b3 = Part.subpartition sectors_cache rest in + KV.connect b1 >>= fun kv -> + Cache.connect b2 >>= fun md5s -> + Cache.connect b3 >>= fun sha512s -> Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv)); - Disk.init kv >>= fun disk -> - if Key_gen.check () then Lwt.return_unit + Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk -> + if Key_gen.check () then + Lwt.return_unit else Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv -> Serve.commit_id git_kv >>= fun commit_id -> @@ -653,7 +718,7 @@ stamp: %S let service = Paf.http_service ~error_handler:(fun _ ?request:_ _ _ -> ()) - (Serve.dispatch serve disk (Key_gen.hook_url ()) git_kv update) + (Serve.dispatch serve disk (Key_gen.hook_url ()) update) in let `Initialized th = Paf.serve service t in Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));