diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index b100ede..269c523 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -9,11 +9,11 @@ module K = struct let doc = Arg.info ~doc:"Only check the cache" ["check"] in Mirage_runtime.register_arg Arg.(value & flag doc) - let verify_sha256 = + let skip_verify_sha256 = let doc = Arg.info - ~doc:"Verify the SHA256 checksums of the cache contents, and \ - re-build the other checksum caches." - ["verify-sha256"] + ~doc:"Skip verification of the SHA256 checksums of the cache contents, \ + and do not re-build the other checksum caches." + ["skip-verify-sha256"] in Mirage_runtime.register_arg Arg.(value & flag doc) @@ -241,13 +241,17 @@ module Make type t = { mutable md5s : string SM.t ; mutable sha512s : string SM.t ; + mutable checked : bool ; dev : KV.t ; dev_md5s : Cache.t ; dev_sha512s : Cache.t ; dev_swap : Swap.t ; } - let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap } + let empty dev dev_md5s dev_sha512s dev_swap = + { md5s = SM.empty ; sha512s = SM.empty ; checked = false ; dev; dev_md5s; dev_sha512s ; dev_swap } + + let ready t = t.checked let marshal_sm (sm : string SM.t) = let version = char_of_int 1 in @@ -272,18 +276,20 @@ module Make Lwt.return_unit let find_key t h key = - assert (List.length (Mirage_kv.Key.segments key) = 1); - match - match h with - | `MD5 -> - Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s) - | `SHA512 -> - Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s) - | `SHA256 -> Some key - | _ -> None - with - | None -> Error `Not_found - | Some x -> Ok x + if List.length (Mirage_kv.Key.segments key) <> 1 then begin + Logs.warn (fun m -> m "find_key with multiple segments: %a" Mirage_kv.Key.pp key); + Error `Not_found + end else + match + match h with + | `MD5 -> + Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s) + | `SHA512 -> + Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s) + | `SHA256 -> Some key + with + | None -> Error `Not_found + | Some x -> Ok x let read_chunked t h v f a = match find_key t h v with @@ -392,22 +398,23 @@ module Make end (* on disk, we use a flat file system where the filename is the sha256 of the data *) - let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap = - KV.list dev Mirage_kv.Key.empty >>= function - | Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e) + let check ~skip_verify_sha256 t = + KV.list t.dev Mirage_kv.Key.empty >>= function + | Error e -> + Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); + Lwt.return_unit | Ok entries -> - let t = empty dev dev_md5s dev_sha512s dev_swap in Cache.read t.dev_md5s >>= fun r -> (match r with | Ok Some s -> - if not verify_sha256 then + if skip_verify_sha256 then Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s) | Ok None -> () | Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e)); Cache.read t.dev_sha512s >>= fun r -> (match r with | Ok Some s -> - if not verify_sha256 then + if skip_verify_sha256 then Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s) | Ok None -> () | Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e)); @@ -438,7 +445,7 @@ module Make None in let sha256_final = - let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in + let need_to_compute = md5_final <> None || sha512_final <> None || not skip_verify_sha256 in if need_to_compute then let f s = let digest = SHA256.(to_raw_string (get s)) in @@ -485,7 +492,7 @@ module Make end) entries >>= fun () -> update_caches t >|= fun () -> - t + t.checked <- true let exists t h v = match find_key t h v with @@ -841,38 +848,43 @@ stamp: %S | Ok h -> let hash = Mirage_kv.Key.v hash in Lwt.async (fun () -> - (Disk.last_modified store h hash >|= function - | Error _ -> t.modified - | Ok v -> ptime_to_http_date v) >>= fun last_modified -> - if not_modified request (last_modified, Mirage_kv.Key.basename hash) then - let resp = Httpaf.Response.create `Not_modified in - respond_with_empty reqd resp; - Lwt.return_unit - else - Disk.size store h hash >>= function - | Error _ -> - not_found reqd request.Httpaf.Request.target; + if Disk.ready store then + (Disk.last_modified store h hash >|= function + | Error _ -> t.modified + | Ok v -> ptime_to_http_date v) >>= fun last_modified -> + if not_modified request (last_modified, Mirage_kv.Key.basename hash) then + let resp = Httpaf.Response.create `Not_modified in + respond_with_empty reqd resp; Lwt.return_unit - | Ok size -> - let size = Optint.Int63.to_string size in - let mime_type = "application/octet-stream" in - let headers = [ - "content-type", mime_type ; - "etag", Mirage_kv.Key.basename hash ; - "last-modified", last_modified ; - "content-length", size ; - ] - in - let headers = Httpaf.Headers.of_list headers in - let resp = Httpaf.Response.create ~headers `OK in - let body = Httpaf.Reqd.respond_with_streaming reqd resp in - Disk.read_chunked store h hash (fun () chunk -> - let wait, wakeup = Lwt.task () in - (* FIXME: catch exception when body is closed *) - Httpaf.Body.write_string body chunk; - Httpaf.Body.flush body (Lwt.wakeup wakeup); - wait) () >|= fun _ -> - Httpaf.Body.close_writer body) + else + Disk.size store h hash >>= function + | Error _ -> + not_found reqd request.Httpaf.Request.target; + Lwt.return_unit + | Ok size -> + let size = Optint.Int63.to_string size in + let mime_type = "application/octet-stream" in + let headers = [ + "content-type", mime_type ; + "etag", Mirage_kv.Key.basename hash ; + "last-modified", last_modified ; + "content-length", size ; + ] + in + let headers = Httpaf.Headers.of_list headers in + let resp = Httpaf.Response.create ~headers `OK in + let body = Httpaf.Reqd.respond_with_streaming reqd resp in + Disk.read_chunked store h hash (fun () chunk -> + let wait, wakeup = Lwt.task () in + (* FIXME: catch exception when body is closed *) + Httpaf.Body.write_string body chunk; + Httpaf.Body.flush body (Lwt.wakeup wakeup); + wait) () >|= fun _ -> + Httpaf.Body.close_writer body + else begin + not_found reqd request.Httpaf.Request.target; + Lwt.return_unit + end) end | _ -> Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target); @@ -881,7 +893,6 @@ stamp: %S end let download_archives parallel_downloads disk http_client urls = - (* FIXME: handle resuming partial downloads *) reset_failed_downloads (); remaining_downloads := SM.cardinal urls; archives := SM.cardinal urls; @@ -957,10 +968,10 @@ stamp: %S Cache.connect sha512s >>= fun sha512s -> Swap.connect swap >>= fun swap -> Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv)); - Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk -> + let disk = Disk.empty kv md5s sha512s swap in let remote = K.remote () in if K.check () then - Lwt.return_unit + Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk else begin Logs.info (fun m -> m "Initializing git state. This may take a while..."); @@ -968,23 +979,27 @@ stamp: %S Lwt.return (Error ()) else restore_git ~remote git_dump git_ctx) >>= function - | Ok git_kv -> Lwt.return git_kv + | Ok git_kv -> Lwt.return (false, git_kv) | Error () -> Git_kv.connect git_ctx remote >>= fun git_kv -> - dump_git git_dump git_kv >|= fun () -> - git_kv - end >>= fun git_kv -> + Lwt.return (true, git_kv) + end >>= fun (need_dump, git_kv) -> Logs.info (fun m -> m "Done initializing git state!"); Serve.commit_id git_kv >>= fun commit_id -> Logs.info (fun m -> m "git: %s" commit_id); Serve.create remote git_kv >>= fun (serve, urls) -> Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t -> let update () = - Serve.update_git ~remote serve git_kv >>= function - | None | Some ([], _) -> Lwt.return_unit - | Some (_changes, urls) -> - dump_git git_dump git_kv >>= fun () -> - download_archives (K.parallel_downloads ()) disk http_ctx urls + if Disk.ready disk then + Serve.update_git ~remote serve git_kv >>= function + | None | Some ([], _) -> Lwt.return_unit + | Some (_changes, urls) -> + dump_git git_dump git_kv >>= fun () -> + download_archives (K.parallel_downloads ()) disk http_ctx urls + else begin + Logs.warn (fun m -> m "disk is not ready yet, thus not updating"); + Lwt.return_unit + end in let service = Paf.http_service @@ -993,6 +1008,14 @@ stamp: %S in let `Initialized th = Paf.serve service t in Logs.info (fun f -> f "listening on %d/HTTP" (K.port ())); + Lwt.join [ + (if need_dump then begin + Logs.info (fun m -> m "dumping git state %s" commit_id); + dump_git git_dump git_kv + end else + Lwt.return_unit) ; + (Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk) + ] >>= fun () -> Lwt.async (fun () -> let rec go () = Time.sleep_ns (Duration.of_hour 1) >>= fun () ->