Reserve an area for dumping the Serve.t -- basically the tarball and last_modified date #28
2 changed files with 172 additions and 70 deletions
|
@ -8,6 +8,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
type partitions = {
|
||||
tar : Part.t ;
|
||||
swap : Part.t ;
|
||||
index : Part.t ;
|
||||
git_dump : Part.t ;
|
||||
md5s : Part.t ;
|
||||
sha512s : Part.t ;
|
||||
|
@ -18,6 +19,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
||||
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
||||
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
||||
let index_guid = Uuidm.of_string "1cf8c2dc-a7fd-11ef-a2a6-68f728e7bbbc" |> Option.get
|
||||
|
||||
(* GPT uses a 72 byte utf16be encoded string for partition names *)
|
||||
let utf16be_of_ascii s =
|
||||
|
@ -56,37 +58,41 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
let connect block =
|
||||
let* info = BLOCK.get_info block in
|
||||
let* gpt = read_partition_table info block in
|
||||
let tar, swap, git_dump, md5s, sha512s =
|
||||
let tar, swap, index, git_dump, md5s, sha512s =
|
||||
match
|
||||
List.fold_left
|
||||
(fun (tar, swap, git_dump, md5s, sha512s) p ->
|
||||
(fun (tar, swap, index, git_dump, md5s, sha512s) p ->
|
||||
if String.equal p.Gpt.Partition.name
|
||||
(utf16be_of_ascii "tar")
|
||||
then
|
||||
(Some p, swap, git_dump, md5s, sha512s)
|
||||
(Some p, swap, index, git_dump, md5s, sha512s)
|
||||
else if String.equal p.name
|
||||
(utf16be_of_ascii "git_dump")
|
||||
then
|
||||
(tar, swap, Some p, md5s, sha512s)
|
||||
(tar, swap, index, Some p, md5s, sha512s)
|
||||
else if String.equal p.name
|
||||
(utf16be_of_ascii "md5s")
|
||||
then
|
||||
(tar, swap, git_dump, Some p, sha512s)
|
||||
(tar, swap, index, git_dump, Some p, sha512s)
|
||||
else if String.equal p.name
|
||||
(utf16be_of_ascii "sha512s")
|
||||
then
|
||||
(tar, swap, git_dump, md5s, Some p)
|
||||
(tar, swap, index, git_dump, md5s, Some p)
|
||||
else if String.equal p.name
|
||||
(utf16be_of_ascii "swap")
|
||||
then
|
||||
(tar, Some p, git_dump, md5s, sha512s)
|
||||
(tar, Some p, index, git_dump, md5s, sha512s)
|
||||
else if String.equal p.name
|
||||
(utf16be_of_ascii "index")
|
||||
then
|
||||
(tar, swap, Some p, git_dump, md5s, sha512s)
|
||||
else
|
||||
Format.kasprintf failwith "Unknown partition %S" p.name)
|
||||
(None, None, None, None, None)
|
||||
(None, None, None, None, None, None)
|
||||
gpt.partitions
|
||||
with
|
||||
| (Some tar, Some swap, Some git_dump, Some md5s, Some sha512s) ->
|
||||
(tar, swap, git_dump, md5s, sha512s)
|
||||
| (Some tar, Some swap, Some index, Some git_dump, Some md5s, Some sha512s) ->
|
||||
(tar, swap, index, git_dump, md5s, sha512s)
|
||||
| _ ->
|
||||
failwith "not all partitions found :("
|
||||
in
|
||||
|
@ -97,11 +103,12 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
let (part, _after) = Part.subpartition len after in
|
||||
part
|
||||
in
|
||||
let tar = get_part tar and swap = get_part swap and git_dump = get_part git_dump
|
||||
let tar = get_part tar and swap = get_part swap and index =get_part index
|
||||
and git_dump = get_part git_dump
|
||||
and md5s = get_part md5s and sha512s = get_part sha512s in
|
||||
{ tar ; swap; git_dump ; md5s ; sha512s }
|
||||
{ tar ; swap ; index ; git_dump ; md5s ; sha512s }
|
||||
|
||||
let format block ~cache_size ~git_size ~swap_size =
|
||||
let format block ~cache_size ~git_size ~swap_size ~index_size =
|
||||
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
||||
let ( let*? ) = Lwt_result.bind in
|
||||
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
||||
|
@ -119,13 +126,14 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
let sectors_cache = mb_in_sectors cache_size
|
||||
and sectors_git = mb_in_sectors git_size
|
||||
and sectors_swap = mb_in_sectors swap_size
|
||||
and sectors_index = mb_in_sectors index_size
|
||||
in
|
||||
let*? () =
|
||||
if size_sectors <
|
||||
(* protective MBR + GPT header + GPT table *)
|
||||
let ( + ) = Int64.add in
|
||||
empty.first_usable_lba +
|
||||
min 1L (Int64.of_int (2 * Tar.Header.length / sector_size)) + sectors_cache + sectors_cache + sectors_git
|
||||
min 1L (Int64.of_int (2 * Tar.Header.length / sector_size)) + sectors_cache + sectors_cache + sectors_git + sectors_index
|
||||
+ 1L (* backup GPT header *) then
|
||||
Lwt.return_error (`Msg "too small disk")
|
||||
else Lwt_result.return ()
|
||||
|
@ -160,13 +168,22 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
(Int64.pred md5s.starting_lba)
|
||||
|> Result.get_ok
|
||||
in
|
||||
let index =
|
||||
Gpt.Partition.make
|
||||
~name:(utf16be_of_ascii "index")
|
||||
~type_guid:index_guid
|
||||
~attributes
|
||||
(Int64.sub git_dump.starting_lba sectors_swap)
|
||||
(Int64.pred git_dump.starting_lba)
|
||||
|> Result.get_ok
|
||||
in
|
||||
let swap =
|
||||
Gpt.Partition.make
|
||||
~name:(utf16be_of_ascii "swap")
|
||||
~type_guid:swap_guid
|
||||
~attributes
|
||||
(Int64.sub git_dump.starting_lba sectors_swap)
|
||||
(Int64.pred git_dump.starting_lba)
|
||||
(Int64.sub index.starting_lba sectors_swap)
|
||||
(Int64.pred index.starting_lba)
|
||||
|> Result.get_ok
|
||||
in
|
||||
let tar =
|
||||
|
@ -180,7 +197,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
|||
in
|
||||
let gpt =
|
||||
let partitions =
|
||||
[ tar; swap; git_dump; md5s; sha512s ]
|
||||
[ tar; swap; index; git_dump; md5s; sha512s ]
|
||||
in
|
||||
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
||||
|> Result.get_ok
|
||||
|
|
|
@ -59,6 +59,11 @@ module K = struct
|
|||
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
|
||||
Mirage_runtime.register_arg Arg.(value & opt int 80 doc)
|
||||
|
||||
let index_size =
|
||||
let doc = "Number of MB reserved for the index tarball. Only used with --initialize-disk." in
|
||||
let doc = Arg.info ~doc ["index-size"] in
|
||||
Mirage_runtime.register_arg Arg.(value & opt int 10 doc)
|
||||
|
||||
let cache_size =
|
||||
let doc = "Number of MB reserved for each checksum cache (md5, sha512). Only used with --initialize-disk." in
|
||||
let doc = Arg.info ~doc ["cache-size"] in
|
||||
|
@ -718,6 +723,46 @@ stamp: %S
|
|||
mutable index : string ;
|
||||
}
|
||||
|
||||
let to_string { commit_id ; modified ; repo ; index } =
|
||||
|
||||
String.concat ";" [ commit_id ; modified ; repo ; index ]
|
||||
|
||||
let of_string str =
|
||||
match String.index_opt str ';' with
|
||||
| None -> Error (`Msg "no separator found")
|
||||
| Some commit_end ->
|
||||
match String.index_from_opt str (succ commit_end) ';' with
|
||||
| None -> Error (`Msg "no separator found")
|
||||
| Some modified_end ->
|
||||
match String.index_from_opt str (succ modified_end) ';' with
|
||||
| None -> Error (`Msg "no separator found")
|
||||
| Some repo_end ->
|
||||
let last_chunk = String.length str - succ repo_end in
|
||||
Ok { commit_id = String.sub str 0 commit_end ;
|
||||
modified = String.sub str (succ commit_end) modified_end ;
|
||||
repo = String.sub str (succ modified_end) repo_end ;
|
||||
index = String.sub str (succ repo_end) last_chunk }
|
||||
|
||||
let dump_index index_dump t =
|
||||
let data = to_string t in
|
||||
Cache.write index_dump data >|= function
|
||||
| Ok () ->
|
||||
Logs.info (fun m -> m "dumped index %d bytes" (String.length data))
|
||||
| Error e ->
|
||||
Logs.warn (fun m -> m "failed to dump index: %a" Cache.pp_write_error e)
|
||||
|
||||
let restore_index index_dump =
|
||||
Cache.read index_dump >|= function
|
||||
| Ok None -> Error ()
|
||||
| Error e ->
|
||||
Logs.warn (fun m -> m "failed to read index state: %a" Cache.pp_error e);
|
||||
Error ()
|
||||
| Ok Some data ->
|
||||
match of_string data with
|
||||
| Error `Msg msg ->
|
||||
Logs.warn (fun m -> m "failed to decode index: %s" msg);
|
||||
Error ()
|
||||
| Ok t -> Ok t
|
||||
|
||||
let create remote git_kv =
|
||||
commit_id git_kv >>= fun commit_id ->
|
||||
modified git_kv >>= fun modified ->
|
||||
|
@ -1063,11 +1108,12 @@ stamp: %S
|
|||
|
||||
module Paf = Paf_mirage.Make(Stack.TCP)
|
||||
|
||||
let start_mirror { Part.tar; swap; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
||||
let start_mirror { Part.tar; swap; index; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
||||
KV.connect tar >>= fun kv ->
|
||||
Cache.connect git_dump >>= fun git_dump ->
|
||||
Cache.connect md5s >>= fun md5s ->
|
||||
Cache.connect sha512s >>= fun sha512s ->
|
||||
Cache.connect index >>= fun index ->
|
||||
Swap.connect swap >>= fun swap ->
|
||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||
let disk = Disk.empty kv md5s sha512s swap in
|
||||
|
@ -1076,65 +1122,104 @@ stamp: %S
|
|||
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
||||
else
|
||||
begin
|
||||
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
||||
(if K.ignore_local_git () then
|
||||
Lwt.return (Error ())
|
||||
else
|
||||
restore_git ~remote git_dump git_ctx) >>= function
|
||||
| Ok git_kv -> Lwt.return (false, git_kv)
|
||||
| Error () ->
|
||||
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
||||
Lwt.return (true, git_kv)
|
||||
end >>= fun (need_dump, git_kv) ->
|
||||
Logs.info (fun m -> m "Done initializing git state!");
|
||||
Serve.commit_id git_kv >>= fun commit_id ->
|
||||
Logs.info (fun m -> m "git: %s" commit_id);
|
||||
Serve.create remote git_kv >>= fun (serve, urls) ->
|
||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||
let update () =
|
||||
if Disk.completely_checked disk then
|
||||
Serve.update_git ~remote serve git_kv >>= function
|
||||
| None | Some ([], _) -> Lwt.return_unit
|
||||
| Some (_changes, urls) ->
|
||||
dump_git git_dump git_kv >>= fun () ->
|
||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||
else begin
|
||||
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
||||
Lwt.return_unit
|
||||
end
|
||||
in
|
||||
let service =
|
||||
Paf.http_service
|
||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||
(Serve.dispatch serve disk (K.hook_url ()) update)
|
||||
in
|
||||
let `Initialized th = Paf.serve service t in
|
||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||
Lwt.join [
|
||||
(if need_dump then begin
|
||||
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||
dump_git git_dump git_kv
|
||||
end else
|
||||
Lwt.return_unit) ;
|
||||
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||
Logs.info (fun m -> m "Restoring index.");
|
||||
let git_kv = ref None in
|
||||
let init_git_kv () =
|
||||
Logs.info (fun m -> m "Initializing git state. This may take a while");
|
||||
((if K.ignore_local_git () then
|
||||
Lwt.return (Error ())
|
||||
else
|
||||
restore_git ~remote git_dump git_ctx) >>= function
|
||||
| Ok git -> Lwt.return (false, git)
|
||||
| Error () ->
|
||||
Git_kv.connect git_ctx remote >>= fun git ->
|
||||
Lwt.return (true, git)) >>= fun (need_dump, git) ->
|
||||
Logs.info (fun m -> m "Done initializing git state!");
|
||||
git_kv := Some git;
|
||||
Lwt.return (need_dump, git)
|
||||
in
|
||||
let update serve () =
|
||||
match !git_kv with
|
||||
| None ->
|
||||
Logs.warn (fun m -> m "git kv is not ready yet, thus not updating");
|
||||
Lwt.return_unit
|
||||
| Some git_kv ->
|
||||
if Disk.completely_checked disk then
|
||||
Serve.update_git ~remote serve git_kv >>= function
|
||||
| None | Some ([], _) -> Lwt.return_unit
|
||||
| Some (_changes, urls) ->
|
||||
Serve.dump_index index serve >>= fun () ->
|
||||
dump_git git_dump git_kv >>= fun () ->
|
||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||
else begin
|
||||
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
||||
Lwt.return_unit
|
||||
end
|
||||
in
|
||||
(Serve.restore_index index >>= function
|
||||
| Ok serve ->
|
||||
let service =
|
||||
Paf.http_service
|
||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||
(Serve.dispatch serve disk (K.hook_url ()) (update serve))
|
||||
in
|
||||
let `Initialized th = Paf.serve service t in
|
||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||
Lwt.return (serve, true, th, false, SM.empty)
|
||||
| Error () ->
|
||||
init_git_kv () >>= fun (need_dump, git) ->
|
||||
Serve.commit_id git >>= fun commit_id ->
|
||||
Logs.info (fun m -> m "git: %s" commit_id);
|
||||
Serve.create remote git >>= fun (serve, urls) ->
|
||||
let service =
|
||||
Paf.http_service
|
||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||
(Serve.dispatch serve disk (K.hook_url ()) (update serve))
|
||||
in
|
||||
let `Initialized th = Paf.serve service t in
|
||||
reynir marked this conversation as resolved
reynir
commented
Shouldn't this be Shouldn't this be ``let (`Initialized th) = ... in``?
reynir
commented
Oh, apparently not! It compiles fine. It probably changed at the same time when you can write Oh, apparently not! It compiles fine. It probably changed at the same time when you can write `match _ with Ok `Data data -> ...`
|
||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||
Lwt.return (serve, false, th, need_dump, urls)) >>= fun (serve, need_git_update, th, need_dump, urls) ->
|
||||
Lwt.join [
|
||||
(if need_git_update then
|
||||
init_git_kv () >>= fun (need_dump, git) ->
|
||||
Serve.commit_id git >>= fun commit_id ->
|
||||
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||
Serve.dump_index index serve >>= fun () ->
|
||||
dump_git git_dump git
|
||||
else if need_dump then
|
||||
match !git_kv with
|
||||
| None ->
|
||||
Logs.err (fun m -> m "git_kv not yet set");
|
||||
Lwt.return_unit
|
||||
| Some git ->
|
||||
Serve.commit_id git >>= fun commit_id ->
|
||||
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||
Serve.dump_index index serve >>= fun () ->
|
||||
dump_git git_dump git
|
||||
else
|
||||
Lwt.return_unit) ;
|
||||
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||
] >>= fun () ->
|
||||
Lwt.async (fun () ->
|
||||
let rec go () =
|
||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||
update () >>= fun () ->
|
||||
go ()
|
||||
in
|
||||
go ());
|
||||
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
||||
(th >|= fun _v -> ())
|
||||
Lwt.async (fun () ->
|
||||
let rec go () =
|
||||
update serve () >>= fun () ->
|
||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||
go ()
|
||||
in
|
||||
go ());
|
||||
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
||||
(th >|= fun _v -> ())
|
||||
end
|
||||
|
||||
let start block _time _pclock stack git_ctx http_ctx =
|
||||
let initialize_disk = K.initialize_disk ()
|
||||
and cache_size = K.cache_size ()
|
||||
and git_size = K.git_size ()
|
||||
and swap_size = K.swap_size () in
|
||||
and swap_size = K.swap_size ()
|
||||
and index_size = K.index_size () in
|
||||
if initialize_disk then
|
||||
Part.format block ~cache_size ~git_size ~swap_size >>= function
|
||||
Part.format block ~cache_size ~git_size ~swap_size ~index_size >>= function
|
||||
| Ok () ->
|
||||
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
||||
Lwt.return_unit
|
||||
|
|
Loading…
Reference in a new issue
We can also use Marshal like we do for checksum caches. It would make
of_string
easier, I think.