revise startup (as proposed in #18):
- recover git (from disk or download) - make index.tar.gz - start web service - check disk (unless skip-verify-sha256) - dump git state - start downloads - enable update job, and hook
This commit is contained in:
parent
7e09f08767
commit
4481923ade
1 changed files with 91 additions and 68 deletions
|
@ -9,11 +9,11 @@ module K = struct
|
|||
let doc = Arg.info ~doc:"Only check the cache" ["check"] in
|
||||
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||
|
||||
let verify_sha256 =
|
||||
let skip_verify_sha256 =
|
||||
let doc = Arg.info
|
||||
~doc:"Verify the SHA256 checksums of the cache contents, and \
|
||||
re-build the other checksum caches."
|
||||
["verify-sha256"]
|
||||
~doc:"Skip verification of the SHA256 checksums of the cache contents, \
|
||||
and do not re-build the other checksum caches."
|
||||
["skip-verify-sha256"]
|
||||
in
|
||||
Mirage_runtime.register_arg Arg.(value & flag doc)
|
||||
|
||||
|
@ -241,13 +241,17 @@ module Make
|
|||
type t = {
|
||||
mutable md5s : string SM.t ;
|
||||
mutable sha512s : string SM.t ;
|
||||
mutable checked : bool ;
|
||||
dev : KV.t ;
|
||||
dev_md5s : Cache.t ;
|
||||
dev_sha512s : Cache.t ;
|
||||
dev_swap : Swap.t ;
|
||||
}
|
||||
|
||||
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
||||
let empty dev dev_md5s dev_sha512s dev_swap =
|
||||
{ md5s = SM.empty ; sha512s = SM.empty ; checked = false ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
||||
|
||||
let ready t = t.checked
|
||||
|
||||
let marshal_sm (sm : string SM.t) =
|
||||
let version = char_of_int 1 in
|
||||
|
@ -272,18 +276,20 @@ module Make
|
|||
Lwt.return_unit
|
||||
|
||||
let find_key t h key =
|
||||
assert (List.length (Mirage_kv.Key.segments key) = 1);
|
||||
match
|
||||
match h with
|
||||
| `MD5 ->
|
||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
||||
| `SHA512 ->
|
||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
||||
| `SHA256 -> Some key
|
||||
| _ -> None
|
||||
with
|
||||
| None -> Error `Not_found
|
||||
| Some x -> Ok x
|
||||
if List.length (Mirage_kv.Key.segments key) <> 1 then begin
|
||||
Logs.warn (fun m -> m "find_key with multiple segments: %a" Mirage_kv.Key.pp key);
|
||||
Error `Not_found
|
||||
end else
|
||||
match
|
||||
match h with
|
||||
| `MD5 ->
|
||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.md5s)
|
||||
| `SHA512 ->
|
||||
Option.map Mirage_kv.Key.v (SM.find_opt (Mirage_kv.Key.basename key) t.sha512s)
|
||||
| `SHA256 -> Some key
|
||||
with
|
||||
| None -> Error `Not_found
|
||||
| Some x -> Ok x
|
||||
|
||||
let read_chunked t h v f a =
|
||||
match find_key t h v with
|
||||
|
@ -392,22 +398,23 @@ module Make
|
|||
end
|
||||
|
||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
|
||||
KV.list dev Mirage_kv.Key.empty >>= function
|
||||
| Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e)
|
||||
let check ~skip_verify_sha256 t =
|
||||
KV.list t.dev Mirage_kv.Key.empty >>= function
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a listing kv" KV.pp_error e);
|
||||
Lwt.return_unit
|
||||
| Ok entries ->
|
||||
let t = empty dev dev_md5s dev_sha512s dev_swap in
|
||||
Cache.read t.dev_md5s >>= fun r ->
|
||||
(match r with
|
||||
| Ok Some s ->
|
||||
if not verify_sha256 then
|
||||
if skip_verify_sha256 then
|
||||
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
||||
| Ok None -> ()
|
||||
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
||||
Cache.read t.dev_sha512s >>= fun r ->
|
||||
(match r with
|
||||
| Ok Some s ->
|
||||
if not verify_sha256 then
|
||||
if skip_verify_sha256 then
|
||||
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
||||
| Ok None -> ()
|
||||
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
||||
|
@ -438,7 +445,7 @@ module Make
|
|||
None
|
||||
in
|
||||
let sha256_final =
|
||||
let need_to_compute = md5_final <> None || sha512_final <> None || verify_sha256 in
|
||||
let need_to_compute = md5_final <> None || sha512_final <> None || not skip_verify_sha256 in
|
||||
if need_to_compute then
|
||||
let f s =
|
||||
let digest = SHA256.(to_raw_string (get s)) in
|
||||
|
@ -485,7 +492,7 @@ module Make
|
|||
end)
|
||||
entries >>= fun () ->
|
||||
update_caches t >|= fun () ->
|
||||
t
|
||||
t.checked <- true
|
||||
|
||||
let exists t h v =
|
||||
match find_key t h v with
|
||||
|
@ -841,38 +848,43 @@ stamp: %S
|
|||
| Ok h ->
|
||||
let hash = Mirage_kv.Key.v hash in
|
||||
Lwt.async (fun () ->
|
||||
(Disk.last_modified store h hash >|= function
|
||||
| Error _ -> t.modified
|
||||
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
||||
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
||||
let resp = Httpaf.Response.create `Not_modified in
|
||||
respond_with_empty reqd resp;
|
||||
Lwt.return_unit
|
||||
else
|
||||
Disk.size store h hash >>= function
|
||||
| Error _ ->
|
||||
not_found reqd request.Httpaf.Request.target;
|
||||
if Disk.ready store then
|
||||
(Disk.last_modified store h hash >|= function
|
||||
| Error _ -> t.modified
|
||||
| Ok v -> ptime_to_http_date v) >>= fun last_modified ->
|
||||
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
|
||||
let resp = Httpaf.Response.create `Not_modified in
|
||||
respond_with_empty reqd resp;
|
||||
Lwt.return_unit
|
||||
| Ok size ->
|
||||
let size = Optint.Int63.to_string size in
|
||||
let mime_type = "application/octet-stream" in
|
||||
let headers = [
|
||||
"content-type", mime_type ;
|
||||
"etag", Mirage_kv.Key.basename hash ;
|
||||
"last-modified", last_modified ;
|
||||
"content-length", size ;
|
||||
]
|
||||
in
|
||||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
||||
Disk.read_chunked store h hash (fun () chunk ->
|
||||
let wait, wakeup = Lwt.task () in
|
||||
(* FIXME: catch exception when body is closed *)
|
||||
Httpaf.Body.write_string body chunk;
|
||||
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
||||
wait) () >|= fun _ ->
|
||||
Httpaf.Body.close_writer body)
|
||||
else
|
||||
Disk.size store h hash >>= function
|
||||
| Error _ ->
|
||||
not_found reqd request.Httpaf.Request.target;
|
||||
Lwt.return_unit
|
||||
| Ok size ->
|
||||
let size = Optint.Int63.to_string size in
|
||||
let mime_type = "application/octet-stream" in
|
||||
let headers = [
|
||||
"content-type", mime_type ;
|
||||
"etag", Mirage_kv.Key.basename hash ;
|
||||
"last-modified", last_modified ;
|
||||
"content-length", size ;
|
||||
]
|
||||
in
|
||||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
let body = Httpaf.Reqd.respond_with_streaming reqd resp in
|
||||
Disk.read_chunked store h hash (fun () chunk ->
|
||||
let wait, wakeup = Lwt.task () in
|
||||
(* FIXME: catch exception when body is closed *)
|
||||
Httpaf.Body.write_string body chunk;
|
||||
Httpaf.Body.flush body (Lwt.wakeup wakeup);
|
||||
wait) () >|= fun _ ->
|
||||
Httpaf.Body.close_writer body
|
||||
else begin
|
||||
not_found reqd request.Httpaf.Request.target;
|
||||
Lwt.return_unit
|
||||
end)
|
||||
end
|
||||
| _ ->
|
||||
Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target);
|
||||
|
@ -881,7 +893,6 @@ stamp: %S
|
|||
end
|
||||
|
||||
let download_archives parallel_downloads disk http_client urls =
|
||||
(* FIXME: handle resuming partial downloads *)
|
||||
reset_failed_downloads ();
|
||||
remaining_downloads := SM.cardinal urls;
|
||||
archives := SM.cardinal urls;
|
||||
|
@ -957,10 +968,10 @@ stamp: %S
|
|||
Cache.connect sha512s >>= fun sha512s ->
|
||||
Swap.connect swap >>= fun swap ->
|
||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk ->
|
||||
let disk = Disk.empty kv md5s sha512s swap in
|
||||
let remote = K.remote () in
|
||||
if K.check () then
|
||||
Lwt.return_unit
|
||||
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
||||
else
|
||||
begin
|
||||
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
||||
|
@ -968,23 +979,27 @@ stamp: %S
|
|||
Lwt.return (Error ())
|
||||
else
|
||||
restore_git ~remote git_dump git_ctx) >>= function
|
||||
| Ok git_kv -> Lwt.return git_kv
|
||||
| Ok git_kv -> Lwt.return (false, git_kv)
|
||||
| Error () ->
|
||||
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
||||
dump_git git_dump git_kv >|= fun () ->
|
||||
git_kv
|
||||
end >>= fun git_kv ->
|
||||
Lwt.return (true, git_kv)
|
||||
end >>= fun (need_dump, git_kv) ->
|
||||
Logs.info (fun m -> m "Done initializing git state!");
|
||||
Serve.commit_id git_kv >>= fun commit_id ->
|
||||
Logs.info (fun m -> m "git: %s" commit_id);
|
||||
Serve.create remote git_kv >>= fun (serve, urls) ->
|
||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||
let update () =
|
||||
Serve.update_git ~remote serve git_kv >>= function
|
||||
| None | Some ([], _) -> Lwt.return_unit
|
||||
| Some (_changes, urls) ->
|
||||
dump_git git_dump git_kv >>= fun () ->
|
||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||
if Disk.ready disk then
|
||||
Serve.update_git ~remote serve git_kv >>= function
|
||||
| None | Some ([], _) -> Lwt.return_unit
|
||||
| Some (_changes, urls) ->
|
||||
dump_git git_dump git_kv >>= fun () ->
|
||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||
else begin
|
||||
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
||||
Lwt.return_unit
|
||||
end
|
||||
in
|
||||
let service =
|
||||
Paf.http_service
|
||||
|
@ -993,6 +1008,14 @@ stamp: %S
|
|||
in
|
||||
let `Initialized th = Paf.serve service t in
|
||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||
Lwt.join [
|
||||
(if need_dump then begin
|
||||
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||
dump_git git_dump git_kv
|
||||
end else
|
||||
Lwt.return_unit) ;
|
||||
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||
] >>= fun () ->
|
||||
Lwt.async (fun () ->
|
||||
let rec go () =
|
||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||
|
|
Loading…
Reference in a new issue