From a61f944d4f1bf4884058338a3ca4e25aca758191 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Fri, 26 Aug 2022 15:18:02 +0200 Subject: [PATCH] read and dump to a kv, taking csums into account --- mirage/config.ml | 4 +- mirage/unikernel.ml | 188 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 179 insertions(+), 13 deletions(-) diff --git a/mirage/config.ml b/mirage/config.ml index 39c5960..097394e 100644 --- a/mirage/config.ml +++ b/mirage/config.ml @@ -32,7 +32,7 @@ let mirror = package ~min:"3.7.0" "git-paf" ; package "opam-file-format" ; ] - (time @-> pclock @-> stackv4v6 @-> dns_client @-> paf @-> git_client @-> job) + (kv_rw @-> time @-> pclock @-> stackv4v6 @-> dns_client @-> paf @-> git_client @-> job) let paf time stackv4v6 = paf_conf () $ time $ tcpv4v6_of_stackv4v6 stackv4v6 @@ -48,4 +48,4 @@ let git_client = (git_http ~authenticator:tls_authenticator tcp git) let () = register "mirror" - [ mirror $ default_time $ default_posix_clock $ stack $ dns $ paf default_time stack $ git_client ] + [ mirror $ kv_rw_mem () $ default_time $ default_posix_clock $ stack $ dns $ paf default_time stack $ git_client ] diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index 3aefbb1..9f5d419 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -10,6 +10,7 @@ open Lwt.Infix let argument_error = 64 module Make + (KV : Mirage_kv.RW) (Time : Mirage_time.S) (Pclock : Mirage_clock.PCLOCK) (Stack : Tcpip.Stack.V4V6) @@ -86,6 +87,11 @@ module Make let `Hex h = Hex.of_string h in h + let hex_of_string s = + match Hex.to_string (`Hex s) with + | d -> Ok d + | exception Invalid_argument err -> Error (`Msg err) + let hm_to_s hm = HM.fold (fun h v acc -> hash_to_string h ^ "=" ^ hex_to_string v ^ "\n" ^ acc) @@ -139,9 +145,10 @@ module Make let decode_digest filename str = let hex h s = - match Hex.to_string (`Hex s) with - | d -> Some (h, d) - | exception Invalid_argument _ -> Logs.warn (fun m -> m "%s invalid hex %s" filename s); None + match hex_of_string s with + | Ok d -> Some (h, d) + | Error `Msg msg -> + Logs.warn (fun m -> m "%s invalid hex (%s) %s" filename msg s); None in match String.split_on_char '=' str with | [ data ] -> hex `MD5 data @@ -260,18 +267,142 @@ module Make acc) | None -> acc) SM.empty opam_paths >|= fun urls -> - Logs.info (fun m -> m "map contains %d urls" (SM.cardinal urls)) - (* SM.iter (fun url csums -> Logs.info (fun m -> m "%s: %s" url (hm_to_s csums))) urls *) + Logs.info (fun m -> m "map contains %d urls" (SM.cardinal urls)); + urls end - let start _time _pclock stack dns _paf_cohttp git_ctx = + module Disk = struct + type t = { + mutable md5s : string SM.t ; + mutable sha512s : string SM.t ; + dev : KV.t ; + } + + let empty dev = { md5s = SM.empty ; sha512s = SM.empty ; dev } + + (* on disk, we use a flat file system where the filename is the sha256 of the data *) + (* on startup, we read + validate all data, and also store in the overlays (md5/sha512) the pointers *) + (* the read can be md5/sha256/sha512 sum, and will output the data requested *) + (* a write will compute the hashes and save the data (also validating potential other hashes) *) + let init dev = + KV.list dev Mirage_kv.Key.empty >>= function + | Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false + | Ok entries -> + let t = empty dev in + Lwt_list.iter_s (fun (name, typ) -> + match typ with + | `Dictionary -> + Logs.warn (fun m -> m "unexpected dictionary at %s" name); + Lwt.return_unit + | `Value -> + KV.get dev (Mirage_kv.Key.v name) >>= function + | Ok data -> + let cs = Cstruct.of_string data in + let digest = Mirage_crypto.Hash.digest `SHA256 cs in + if Cstruct.equal digest (Cstruct.of_string name) then + let md5 = Mirage_crypto.Hash.digest `MD5 cs + and sha512 = Mirage_crypto.Hash.digest `SHA512 cs + in + let md5s = SM.add (Cstruct.to_string md5) name t.md5s + and sha512s = SM.add (Cstruct.to_string sha512) name t.sha512s + in + t.md5s <- md5s ; t.sha512s <- sha512s; + Lwt.return_unit + else begin + Logs.err (fun m -> m "corrupt data, expected %s, read %s" + (hex_to_string name) + (hex_to_string (Cstruct.to_string digest))); + KV.remove dev (Mirage_kv.Key.v name) >|= function + | Ok () -> () + | Error e -> + Logs.err (fun m -> m "error %a while removing %s" + KV.pp_write_error e (hex_to_string name)) + end + | Error e -> + Logs.err (fun m -> m "error %a reading %s" + KV.pp_error e (hex_to_string name)); + Lwt.return_unit) + entries >|= fun () -> + t + + let write t data hm = + let cs = Cstruct.of_string data in + let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> Cstruct.to_string + and md5 = Mirage_crypto.Hash.digest `MD5 cs |> Cstruct.to_string + and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> Cstruct.to_string + in + if + HM.for_all (fun h v -> + let v' = + match h with `MD5 -> md5 | `SHA256 -> sha256 | `SHA512 -> sha512 | _ -> assert false + in + if String.equal v v' then + true + else begin + Logs.err (fun m -> m "hash mismatch %s: expected %s, got %s" + (hash_to_string h) (hex_to_string v) (hex_to_string v')); + false + end) hm + then + KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function + | Ok () -> + t.md5s <- SM.add md5 sha256 t.md5s; + t.sha512s <- SM.add sha512 sha256 t.sha512s; + Logs.info (fun m -> m "wrote %s (%d bytes)" (hex_to_string sha256) + (String.length data)) + | Error e -> + Logs.err (fun m -> m "error %a while writing %s" + KV.pp_write_error e (hex_to_string sha256)) + else + Lwt.return_unit + + let read t h v = + match hex_of_string v with + | Error `Msg msg -> + Logs.err (fun m -> m "error %s while decoding hex %s" msg v); + Lwt.return (Error `Bad_request) + | Ok bin -> + match + match h with + | `MD5 -> SM.find_opt bin t.md5s + | `SHA512 -> SM.find_opt bin t.sha512s + | `SHA256 -> Some bin + with + | None -> + Logs.err (fun m -> m "couldn't find %s" v); + Lwt.return (Error `Not_found) + | Some x -> + KV.get t.dev (Mirage_kv.Key.v x) >|= function + | Ok data -> Ok data + | Error e -> + Logs.err (fun m -> m "error %a while reading %s %s" + KV.pp_error e (hash_to_string h) v); + Error `Not_found + end + + let resolve_location ~uri ~location = + match String.split_on_char '/' location with + | "http:" :: "" :: _ -> Ok location + | "https:" :: "" :: _ -> Ok location + | "" :: "" :: _ -> + let schema = String.sub uri 0 (String.index uri '/') in + Ok (schema ^ location) + | "" :: _ -> + (match String.split_on_char '/' uri with + | schema :: "" :: user_pass_host_port :: _ -> + Ok (String.concat "/" [schema ; "" ; user_pass_host_port ^ location]) + | _ -> Error (`Msg ("expected an absolute uri, got: " ^ uri))) + | _ -> Error (`Msg ("unknown location (relative path): " ^ location)) + + let start kv _time _pclock stack dns _paf_cohttp git_ctx = Git.connect git_ctx >>= fun (store, upstream) -> Git.pull store upstream >>= function | Error `Msg msg -> Lwt.fail_with msg | Ok msg -> - Logs.info (fun m -> m "store: %s" msg); - Git.find_urls store >|= fun () -> - let _ctx = + Logs.info (fun m -> m "git: %s" msg); + Git.find_urls store >>= fun urls -> + Disk.init kv >>= fun disk -> + let ctx = Mimic.empty |> with_sleep |> with_tcp (* stack -> ipaddr -> port => (stack * ipaddr * port) *) @@ -279,7 +410,42 @@ module Make |> with_resolv (* domain_name => ipaddr *) |> with_stack stack (* stack *) |> with_dns dns (* dns *) in -(* Client.get ~ctx uri >>= fun (_resp, body) -> - Cohttp_lwt.Body.to_string body >|= fun str -> *) + let rec follow count uri = + if count = 0 then begin + Logs.err (fun m -> m "redirection limit exceeded"); + Lwt.return None + end else begin + Logs.info (fun m -> m "retrieving %s" uri); + Client.get ~ctx (Uri.of_string uri) >>= fun (resp, body) -> + match resp.Cohttp.Response.status with + | `OK -> + Cohttp_lwt.Body.to_string body >|= fun str -> + Some str + | #Cohttp.Code.redirection_status -> + begin match Cohttp.Header.get resp.Cohttp.Response.headers "location" with + | Some location -> + begin match resolve_location ~uri ~location with + | Error `Msg msg -> + Logs.err (fun m -> m "error %s resolving redirect location %s" + msg location); + Lwt.return None + | Ok new_uri -> follow (pred count) new_uri + end + | None -> + Logs.err (fun m -> m "redirect without location"); + Lwt.return None + end + | s -> + Logs.err (fun m -> m "error %s while fetching %s" + (Cohttp.Code.string_of_status resp.Cohttp.Response.status) + uri); + Lwt.return None + end + in + Lwt_list.iter_p (fun (url, csums) -> + follow 20 url >>= function + | Some str -> Disk.write disk str csums + | None -> Lwt.return_unit) + (SM.bindings urls) >|= fun () -> Logs.info (fun m -> m "done") end