Reserve an area for dumping the Serve.t -- basically the tarball and last_modified date #28
2 changed files with 172 additions and 70 deletions
|
@ -8,6 +8,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
type partitions = {
|
type partitions = {
|
||||||
tar : Part.t ;
|
tar : Part.t ;
|
||||||
swap : Part.t ;
|
swap : Part.t ;
|
||||||
|
index : Part.t ;
|
||||||
git_dump : Part.t ;
|
git_dump : Part.t ;
|
||||||
md5s : Part.t ;
|
md5s : Part.t ;
|
||||||
sha512s : Part.t ;
|
sha512s : Part.t ;
|
||||||
|
@ -18,6 +19,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
||||||
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
||||||
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
||||||
|
let index_guid = Uuidm.of_string "1cf8c2dc-a7fd-11ef-a2a6-68f728e7bbbc" |> Option.get
|
||||||
|
|
||||||
(* GPT uses a 72 byte utf16be encoded string for partition names *)
|
(* GPT uses a 72 byte utf16be encoded string for partition names *)
|
||||||
let utf16be_of_ascii s =
|
let utf16be_of_ascii s =
|
||||||
|
@ -56,37 +58,41 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let connect block =
|
let connect block =
|
||||||
let* info = BLOCK.get_info block in
|
let* info = BLOCK.get_info block in
|
||||||
let* gpt = read_partition_table info block in
|
let* gpt = read_partition_table info block in
|
||||||
let tar, swap, git_dump, md5s, sha512s =
|
let tar, swap, index, git_dump, md5s, sha512s =
|
||||||
match
|
match
|
||||||
List.fold_left
|
List.fold_left
|
||||||
(fun (tar, swap, git_dump, md5s, sha512s) p ->
|
(fun (tar, swap, index, git_dump, md5s, sha512s) p ->
|
||||||
if String.equal p.Gpt.Partition.name
|
if String.equal p.Gpt.Partition.name
|
||||||
(utf16be_of_ascii "tar")
|
(utf16be_of_ascii "tar")
|
||||||
then
|
then
|
||||||
(Some p, swap, git_dump, md5s, sha512s)
|
(Some p, swap, index, git_dump, md5s, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "git_dump")
|
(utf16be_of_ascii "git_dump")
|
||||||
then
|
then
|
||||||
(tar, swap, Some p, md5s, sha512s)
|
(tar, swap, index, Some p, md5s, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "md5s")
|
(utf16be_of_ascii "md5s")
|
||||||
then
|
then
|
||||||
(tar, swap, git_dump, Some p, sha512s)
|
(tar, swap, index, git_dump, Some p, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "sha512s")
|
(utf16be_of_ascii "sha512s")
|
||||||
then
|
then
|
||||||
(tar, swap, git_dump, md5s, Some p)
|
(tar, swap, index, git_dump, md5s, Some p)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "swap")
|
(utf16be_of_ascii "swap")
|
||||||
then
|
then
|
||||||
(tar, Some p, git_dump, md5s, sha512s)
|
(tar, Some p, index, git_dump, md5s, sha512s)
|
||||||
|
else if String.equal p.name
|
||||||
|
(utf16be_of_ascii "index")
|
||||||
|
then
|
||||||
|
(tar, swap, Some p, git_dump, md5s, sha512s)
|
||||||
else
|
else
|
||||||
Format.kasprintf failwith "Unknown partition %S" p.name)
|
Format.kasprintf failwith "Unknown partition %S" p.name)
|
||||||
(None, None, None, None, None)
|
(None, None, None, None, None, None)
|
||||||
gpt.partitions
|
gpt.partitions
|
||||||
with
|
with
|
||||||
| (Some tar, Some swap, Some git_dump, Some md5s, Some sha512s) ->
|
| (Some tar, Some swap, Some index, Some git_dump, Some md5s, Some sha512s) ->
|
||||||
(tar, swap, git_dump, md5s, sha512s)
|
(tar, swap, index, git_dump, md5s, sha512s)
|
||||||
| _ ->
|
| _ ->
|
||||||
failwith "not all partitions found :("
|
failwith "not all partitions found :("
|
||||||
in
|
in
|
||||||
|
@ -97,11 +103,12 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let (part, _after) = Part.subpartition len after in
|
let (part, _after) = Part.subpartition len after in
|
||||||
part
|
part
|
||||||
in
|
in
|
||||||
let tar = get_part tar and swap = get_part swap and git_dump = get_part git_dump
|
let tar = get_part tar and swap = get_part swap and index =get_part index
|
||||||
|
and git_dump = get_part git_dump
|
||||||
and md5s = get_part md5s and sha512s = get_part sha512s in
|
and md5s = get_part md5s and sha512s = get_part sha512s in
|
||||||
{ tar ; swap; git_dump ; md5s ; sha512s }
|
{ tar ; swap ; index ; git_dump ; md5s ; sha512s }
|
||||||
|
|
||||||
let format block ~cache_size ~git_size ~swap_size =
|
let format block ~cache_size ~git_size ~swap_size ~index_size =
|
||||||
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
||||||
let ( let*? ) = Lwt_result.bind in
|
let ( let*? ) = Lwt_result.bind in
|
||||||
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
||||||
|
@ -119,13 +126,14 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let sectors_cache = mb_in_sectors cache_size
|
let sectors_cache = mb_in_sectors cache_size
|
||||||
and sectors_git = mb_in_sectors git_size
|
and sectors_git = mb_in_sectors git_size
|
||||||
and sectors_swap = mb_in_sectors swap_size
|
and sectors_swap = mb_in_sectors swap_size
|
||||||
|
and sectors_index = mb_in_sectors index_size
|
||||||
in
|
in
|
||||||
let*? () =
|
let*? () =
|
||||||
if size_sectors <
|
if size_sectors <
|
||||||
(* protective MBR + GPT header + GPT table *)
|
(* protective MBR + GPT header + GPT table *)
|
||||||
let ( + ) = Int64.add in
|
let ( + ) = Int64.add in
|
||||||
empty.first_usable_lba +
|
empty.first_usable_lba +
|
||||||
min 1L (Int64.of_int (2 * Tar.Header.length / sector_size)) + sectors_cache + sectors_cache + sectors_git
|
min 1L (Int64.of_int (2 * Tar.Header.length / sector_size)) + sectors_cache + sectors_cache + sectors_git + sectors_index
|
||||||
+ 1L (* backup GPT header *) then
|
+ 1L (* backup GPT header *) then
|
||||||
Lwt.return_error (`Msg "too small disk")
|
Lwt.return_error (`Msg "too small disk")
|
||||||
else Lwt_result.return ()
|
else Lwt_result.return ()
|
||||||
|
@ -160,13 +168,22 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
(Int64.pred md5s.starting_lba)
|
(Int64.pred md5s.starting_lba)
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
in
|
in
|
||||||
|
let index =
|
||||||
|
Gpt.Partition.make
|
||||||
|
~name:(utf16be_of_ascii "index")
|
||||||
|
~type_guid:index_guid
|
||||||
|
~attributes
|
||||||
|
(Int64.sub git_dump.starting_lba sectors_swap)
|
||||||
|
(Int64.pred git_dump.starting_lba)
|
||||||
|
|> Result.get_ok
|
||||||
|
in
|
||||||
let swap =
|
let swap =
|
||||||
Gpt.Partition.make
|
Gpt.Partition.make
|
||||||
~name:(utf16be_of_ascii "swap")
|
~name:(utf16be_of_ascii "swap")
|
||||||
~type_guid:swap_guid
|
~type_guid:swap_guid
|
||||||
~attributes
|
~attributes
|
||||||
(Int64.sub git_dump.starting_lba sectors_swap)
|
(Int64.sub index.starting_lba sectors_swap)
|
||||||
(Int64.pred git_dump.starting_lba)
|
(Int64.pred index.starting_lba)
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
in
|
in
|
||||||
let tar =
|
let tar =
|
||||||
|
@ -180,7 +197,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
in
|
in
|
||||||
let gpt =
|
let gpt =
|
||||||
let partitions =
|
let partitions =
|
||||||
[ tar; swap; git_dump; md5s; sha512s ]
|
[ tar; swap; index; git_dump; md5s; sha512s ]
|
||||||
in
|
in
|
||||||
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
|
|
|
@ -59,6 +59,11 @@ module K = struct
|
||||||
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
|
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
|
||||||
Mirage_runtime.register_arg Arg.(value & opt int 80 doc)
|
Mirage_runtime.register_arg Arg.(value & opt int 80 doc)
|
||||||
|
|
||||||
|
let index_size =
|
||||||
|
let doc = "Number of MB reserved for the index tarball. Only used with --initialize-disk." in
|
||||||
|
let doc = Arg.info ~doc ["index-size"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & opt int 10 doc)
|
||||||
|
|
||||||
let cache_size =
|
let cache_size =
|
||||||
let doc = "Number of MB reserved for each checksum cache (md5, sha512). Only used with --initialize-disk." in
|
let doc = "Number of MB reserved for each checksum cache (md5, sha512). Only used with --initialize-disk." in
|
||||||
let doc = Arg.info ~doc ["cache-size"] in
|
let doc = Arg.info ~doc ["cache-size"] in
|
||||||
|
@ -718,6 +723,46 @@ stamp: %S
|
||||||
mutable index : string ;
|
mutable index : string ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let to_string { commit_id ; modified ; repo ; index } =
|
||||||
|
String.concat ";" [ commit_id ; modified ; repo ; index ]
|
||||||
|
|
||||||
|
let of_string str =
|
||||||
|
match String.index_opt str ';' with
|
||||||
|
| None -> Error (`Msg "no separator found")
|
||||||
|
| Some commit_end ->
|
||||||
|
match String.index_from_opt str (succ commit_end) ';' with
|
||||||
|
| None -> Error (`Msg "no separator found")
|
||||||
|
| Some modified_end ->
|
||||||
|
match String.index_from_opt str (succ modified_end) ';' with
|
||||||
|
| None -> Error (`Msg "no separator found")
|
||||||
|
| Some repo_end ->
|
||||||
|
let last_chunk = String.length str - succ repo_end in
|
||||||
|
Ok { commit_id = String.sub str 0 commit_end ;
|
||||||
|
modified = String.sub str (succ commit_end) modified_end ;
|
||||||
|
repo = String.sub str (succ modified_end) repo_end ;
|
||||||
|
index = String.sub str (succ repo_end) last_chunk }
|
||||||
|
|
||||||
|
let dump_index index_dump t =
|
||||||
|
let data = to_string t in
|
||||||
|
Cache.write index_dump data >|= function
|
||||||
|
| Ok () ->
|
||||||
|
Logs.info (fun m -> m "dumped index %d bytes" (String.length data))
|
||||||
|
| Error e ->
|
||||||
|
Logs.warn (fun m -> m "failed to dump index: %a" Cache.pp_write_error e)
|
||||||
|
|
||||||
|
let restore_index index_dump =
|
||||||
|
Cache.read index_dump >|= function
|
||||||
|
| Ok None -> Error ()
|
||||||
|
| Error e ->
|
||||||
|
Logs.warn (fun m -> m "failed to read index state: %a" Cache.pp_error e);
|
||||||
|
Error ()
|
||||||
|
| Ok Some data ->
|
||||||
|
match of_string data with
|
||||||
|
| Error `Msg msg ->
|
||||||
|
Logs.warn (fun m -> m "failed to decode index: %s" msg);
|
||||||
|
Error ()
|
||||||
|
| Ok t -> Ok t
|
||||||
|
|
||||||
let create remote git_kv =
|
let create remote git_kv =
|
||||||
commit_id git_kv >>= fun commit_id ->
|
commit_id git_kv >>= fun commit_id ->
|
||||||
modified git_kv >>= fun modified ->
|
modified git_kv >>= fun modified ->
|
||||||
|
@ -1063,11 +1108,12 @@ stamp: %S
|
||||||
|
|
||||||
module Paf = Paf_mirage.Make(Stack.TCP)
|
module Paf = Paf_mirage.Make(Stack.TCP)
|
||||||
|
|
||||||
let start_mirror { Part.tar; swap; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
let start_mirror { Part.tar; swap; index; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
||||||
KV.connect tar >>= fun kv ->
|
KV.connect tar >>= fun kv ->
|
||||||
Cache.connect git_dump >>= fun git_dump ->
|
Cache.connect git_dump >>= fun git_dump ->
|
||||||
Cache.connect md5s >>= fun md5s ->
|
Cache.connect md5s >>= fun md5s ->
|
||||||
Cache.connect sha512s >>= fun sha512s ->
|
Cache.connect sha512s >>= fun sha512s ->
|
||||||
|
Cache.connect index >>= fun index ->
|
||||||
Swap.connect swap >>= fun swap ->
|
Swap.connect swap >>= fun swap ->
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
let disk = Disk.empty kv md5s sha512s swap in
|
let disk = Disk.empty kv md5s sha512s swap in
|
||||||
|
@ -1076,65 +1122,104 @@ stamp: %S
|
||||||
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
||||||
else
|
else
|
||||||
begin
|
begin
|
||||||
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||||
(if K.ignore_local_git () then
|
Logs.info (fun m -> m "Restoring index.");
|
||||||
Lwt.return (Error ())
|
let git_kv = ref None in
|
||||||
else
|
let init_git_kv () =
|
||||||
restore_git ~remote git_dump git_ctx) >>= function
|
Logs.info (fun m -> m "Initializing git state. This may take a while");
|
||||||
| Ok git_kv -> Lwt.return (false, git_kv)
|
((if K.ignore_local_git () then
|
||||||
| Error () ->
|
Lwt.return (Error ())
|
||||||
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
else
|
||||||
Lwt.return (true, git_kv)
|
restore_git ~remote git_dump git_ctx) >>= function
|
||||||
end >>= fun (need_dump, git_kv) ->
|
| Ok git -> Lwt.return (false, git)
|
||||||
Logs.info (fun m -> m "Done initializing git state!");
|
| Error () ->
|
||||||
Serve.commit_id git_kv >>= fun commit_id ->
|
Git_kv.connect git_ctx remote >>= fun git ->
|
||||||
Logs.info (fun m -> m "git: %s" commit_id);
|
Lwt.return (true, git)) >>= fun (need_dump, git) ->
|
||||||
Serve.create remote git_kv >>= fun (serve, urls) ->
|
Logs.info (fun m -> m "Done initializing git state!");
|
||||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
git_kv := Some git;
|
||||||
let update () =
|
Lwt.return (need_dump, git)
|
||||||
if Disk.completely_checked disk then
|
in
|
||||||
Serve.update_git ~remote serve git_kv >>= function
|
let update serve () =
|
||||||
| None | Some ([], _) -> Lwt.return_unit
|
match !git_kv with
|
||||||
| Some (_changes, urls) ->
|
| None ->
|
||||||
dump_git git_dump git_kv >>= fun () ->
|
Logs.warn (fun m -> m "git kv is not ready yet, thus not updating");
|
||||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
Lwt.return_unit
|
||||||
else begin
|
| Some git_kv ->
|
||||||
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
if Disk.completely_checked disk then
|
||||||
Lwt.return_unit
|
Serve.update_git ~remote serve git_kv >>= function
|
||||||
end
|
| None | Some ([], _) -> Lwt.return_unit
|
||||||
in
|
| Some (_changes, urls) ->
|
||||||
let service =
|
Serve.dump_index index serve >>= fun () ->
|
||||||
Paf.http_service
|
dump_git git_dump git_kv >>= fun () ->
|
||||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||||
(Serve.dispatch serve disk (K.hook_url ()) update)
|
else begin
|
||||||
in
|
Logs.warn (fun m -> m "disk is not ready yet, thus not updating");
|
||||||
let `Initialized th = Paf.serve service t in
|
Lwt.return_unit
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
end
|
||||||
Lwt.join [
|
in
|
||||||
(if need_dump then begin
|
(Serve.restore_index index >>= function
|
||||||
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
| Ok serve ->
|
||||||
dump_git git_dump git_kv
|
let service =
|
||||||
end else
|
Paf.http_service
|
||||||
Lwt.return_unit) ;
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
(Serve.dispatch serve disk (K.hook_url ()) (update serve))
|
||||||
|
in
|
||||||
|
let `Initialized th = Paf.serve service t in
|
||||||
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||||
|
Lwt.return (serve, true, th, false, SM.empty)
|
||||||
|
| Error () ->
|
||||||
|
init_git_kv () >>= fun (need_dump, git) ->
|
||||||
|
Serve.commit_id git >>= fun commit_id ->
|
||||||
|
Logs.info (fun m -> m "git: %s" commit_id);
|
||||||
|
Serve.create remote git >>= fun (serve, urls) ->
|
||||||
|
let service =
|
||||||
|
Paf.http_service
|
||||||
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
|
(Serve.dispatch serve disk (K.hook_url ()) (update serve))
|
||||||
|
in
|
||||||
|
let `Initialized th = Paf.serve service t in
|
||||||
reynir marked this conversation as resolved
|
|||||||
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||||
|
Lwt.return (serve, false, th, need_dump, urls)) >>= fun (serve, need_git_update, th, need_dump, urls) ->
|
||||||
|
Lwt.join [
|
||||||
|
(if need_git_update then
|
||||||
|
init_git_kv () >>= fun (need_dump, git) ->
|
||||||
|
Serve.commit_id git >>= fun commit_id ->
|
||||||
|
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||||
|
Serve.dump_index index serve >>= fun () ->
|
||||||
|
dump_git git_dump git
|
||||||
|
else if need_dump then
|
||||||
|
match !git_kv with
|
||||||
|
| None ->
|
||||||
|
Logs.err (fun m -> m "git_kv not yet set");
|
||||||
|
Lwt.return_unit
|
||||||
|
| Some git ->
|
||||||
|
Serve.commit_id git >>= fun commit_id ->
|
||||||
|
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||||
|
Serve.dump_index index serve >>= fun () ->
|
||||||
|
dump_git git_dump git
|
||||||
|
else
|
||||||
|
Lwt.return_unit) ;
|
||||||
|
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||||
] >>= fun () ->
|
] >>= fun () ->
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let rec go () =
|
let rec go () =
|
||||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
update serve () >>= fun () ->
|
||||||
update () >>= fun () ->
|
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||||
go ()
|
go ()
|
||||||
in
|
in
|
||||||
go ());
|
go ());
|
||||||
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
||||||
(th >|= fun _v -> ())
|
(th >|= fun _v -> ())
|
||||||
|
end
|
||||||
|
|
||||||
let start block _time _pclock stack git_ctx http_ctx =
|
let start block _time _pclock stack git_ctx http_ctx =
|
||||||
let initialize_disk = K.initialize_disk ()
|
let initialize_disk = K.initialize_disk ()
|
||||||
and cache_size = K.cache_size ()
|
and cache_size = K.cache_size ()
|
||||||
and git_size = K.git_size ()
|
and git_size = K.git_size ()
|
||||||
and swap_size = K.swap_size () in
|
and swap_size = K.swap_size ()
|
||||||
|
and index_size = K.index_size () in
|
||||||
if initialize_disk then
|
if initialize_disk then
|
||||||
Part.format block ~cache_size ~git_size ~swap_size >>= function
|
Part.format block ~cache_size ~git_size ~swap_size ~index_size >>= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
Loading…
Reference in a new issue
Shouldn't this be
let (`Initialized th) = ... in
?Oh, apparently not! It compiles fine. It probably changed at the same time when you can write
match _ with Ok
Data data -> ...`