Reserve an area for dumping the Serve.t -- basically the tarball and last_modified date #28
2 changed files with 163 additions and 70 deletions
|
@ -8,6 +8,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
type partitions = {
|
type partitions = {
|
||||||
tar : Part.t ;
|
tar : Part.t ;
|
||||||
swap : Part.t ;
|
swap : Part.t ;
|
||||||
|
index : Part.t ;
|
||||||
git_dump : Part.t ;
|
git_dump : Part.t ;
|
||||||
md5s : Part.t ;
|
md5s : Part.t ;
|
||||||
sha512s : Part.t ;
|
sha512s : Part.t ;
|
||||||
|
@ -18,6 +19,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
||||||
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
||||||
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
||||||
|
let index_guid = Uuidm.of_string "1cf8c2dc-a7fd-11ef-a2a6-68f728e7bbbc" |> Option.get
|
||||||
|
|
||||||
(* GPT uses a 72 byte utf16be encoded string for partition names *)
|
(* GPT uses a 72 byte utf16be encoded string for partition names *)
|
||||||
let utf16be_of_ascii s =
|
let utf16be_of_ascii s =
|
||||||
|
@ -56,37 +58,41 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let connect block =
|
let connect block =
|
||||||
let* info = BLOCK.get_info block in
|
let* info = BLOCK.get_info block in
|
||||||
let* gpt = read_partition_table info block in
|
let* gpt = read_partition_table info block in
|
||||||
let tar, swap, git_dump, md5s, sha512s =
|
let tar, swap, index, git_dump, md5s, sha512s =
|
||||||
match
|
match
|
||||||
List.fold_left
|
List.fold_left
|
||||||
(fun (tar, swap, git_dump, md5s, sha512s) p ->
|
(fun (tar, swap, index, git_dump, md5s, sha512s) p ->
|
||||||
if String.equal p.Gpt.Partition.name
|
if String.equal p.Gpt.Partition.name
|
||||||
(utf16be_of_ascii "tar")
|
(utf16be_of_ascii "tar")
|
||||||
then
|
then
|
||||||
(Some p, swap, git_dump, md5s, sha512s)
|
(Some p, swap, index, git_dump, md5s, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "git_dump")
|
(utf16be_of_ascii "git_dump")
|
||||||
then
|
then
|
||||||
(tar, swap, Some p, md5s, sha512s)
|
(tar, swap, index, Some p, md5s, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "md5s")
|
(utf16be_of_ascii "md5s")
|
||||||
then
|
then
|
||||||
(tar, swap, git_dump, Some p, sha512s)
|
(tar, swap, index, git_dump, Some p, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "sha512s")
|
(utf16be_of_ascii "sha512s")
|
||||||
then
|
then
|
||||||
(tar, swap, git_dump, md5s, Some p)
|
(tar, swap, index, git_dump, md5s, Some p)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "swap")
|
(utf16be_of_ascii "swap")
|
||||||
then
|
then
|
||||||
(tar, Some p, git_dump, md5s, sha512s)
|
(tar, Some p, index, git_dump, md5s, sha512s)
|
||||||
|
else if String.equal p.name
|
||||||
|
(utf16be_of_ascii "index")
|
||||||
|
then
|
||||||
|
(tar, swap, Some p, git_dump, md5s, sha512s)
|
||||||
else
|
else
|
||||||
Format.kasprintf failwith "Unknown partition %S" p.name)
|
Format.kasprintf failwith "Unknown partition %S" p.name)
|
||||||
(None, None, None, None, None)
|
(None, None, None, None, None, None)
|
||||||
gpt.partitions
|
gpt.partitions
|
||||||
with
|
with
|
||||||
| (Some tar, Some swap, Some git_dump, Some md5s, Some sha512s) ->
|
| (Some tar, Some swap, Some index, Some git_dump, Some md5s, Some sha512s) ->
|
||||||
(tar, swap, git_dump, md5s, sha512s)
|
(tar, swap, index, git_dump, md5s, sha512s)
|
||||||
| _ ->
|
| _ ->
|
||||||
failwith "not all partitions found :("
|
failwith "not all partitions found :("
|
||||||
in
|
in
|
||||||
|
@ -97,11 +103,12 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let (part, _after) = Part.subpartition len after in
|
let (part, _after) = Part.subpartition len after in
|
||||||
part
|
part
|
||||||
in
|
in
|
||||||
let tar = get_part tar and swap = get_part swap and git_dump = get_part git_dump
|
let tar = get_part tar and swap = get_part swap and index =get_part index
|
||||||
|
and git_dump = get_part git_dump
|
||||||
and md5s = get_part md5s and sha512s = get_part sha512s in
|
and md5s = get_part md5s and sha512s = get_part sha512s in
|
||||||
{ tar ; swap; git_dump ; md5s ; sha512s }
|
{ tar ; swap ; index ; git_dump ; md5s ; sha512s }
|
||||||
|
|
||||||
let format block ~cache_size ~git_size ~swap_size =
|
let format block ~cache_size ~git_size ~swap_size ~index_size =
|
||||||
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
||||||
let ( let*? ) = Lwt_result.bind in
|
let ( let*? ) = Lwt_result.bind in
|
||||||
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
||||||
|
@ -119,13 +126,14 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let sectors_cache = mb_in_sectors cache_size
|
let sectors_cache = mb_in_sectors cache_size
|
||||||
and sectors_git = mb_in_sectors git_size
|
and sectors_git = mb_in_sectors git_size
|
||||||
and sectors_swap = mb_in_sectors swap_size
|
and sectors_swap = mb_in_sectors swap_size
|
||||||
|
and sectors_index = mb_in_sectors index_size
|
||||||
in
|
in
|
||||||
let*? () =
|
let*? () =
|
||||||
if size_sectors <
|
if size_sectors <
|
||||||
(* protective MBR + GPT header + GPT table *)
|
(* protective MBR + GPT header + GPT table *)
|
||||||
let ( + ) = Int64.add in
|
let ( + ) = Int64.add in
|
||||||
empty.first_usable_lba +
|
empty.first_usable_lba +
|
||||||
min 1L (Int64.of_int (2 * Tar.Header.length / sector_size)) + sectors_cache + sectors_cache + sectors_git
|
min 1L (Int64.of_int (2 * Tar.Header.length / sector_size)) + sectors_cache + sectors_cache + sectors_git + sectors_index
|
||||||
+ 1L (* backup GPT header *) then
|
+ 1L (* backup GPT header *) then
|
||||||
Lwt.return_error (`Msg "too small disk")
|
Lwt.return_error (`Msg "too small disk")
|
||||||
else Lwt_result.return ()
|
else Lwt_result.return ()
|
||||||
|
@ -160,13 +168,22 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
(Int64.pred md5s.starting_lba)
|
(Int64.pred md5s.starting_lba)
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
in
|
in
|
||||||
|
let index =
|
||||||
|
Gpt.Partition.make
|
||||||
|
~name:(utf16be_of_ascii "index")
|
||||||
|
~type_guid:index_guid
|
||||||
|
~attributes
|
||||||
|
(Int64.sub git_dump.starting_lba sectors_swap)
|
||||||
|
(Int64.pred git_dump.starting_lba)
|
||||||
|
|> Result.get_ok
|
||||||
|
in
|
||||||
let swap =
|
let swap =
|
||||||
Gpt.Partition.make
|
Gpt.Partition.make
|
||||||
~name:(utf16be_of_ascii "swap")
|
~name:(utf16be_of_ascii "swap")
|
||||||
~type_guid:swap_guid
|
~type_guid:swap_guid
|
||||||
~attributes
|
~attributes
|
||||||
(Int64.sub git_dump.starting_lba sectors_swap)
|
(Int64.sub index.starting_lba sectors_swap)
|
||||||
(Int64.pred git_dump.starting_lba)
|
(Int64.pred index.starting_lba)
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
in
|
in
|
||||||
let tar =
|
let tar =
|
||||||
|
@ -180,7 +197,7 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
in
|
in
|
||||||
let gpt =
|
let gpt =
|
||||||
let partitions =
|
let partitions =
|
||||||
[ tar; swap; git_dump; md5s; sha512s ]
|
[ tar; swap; index; git_dump; md5s; sha512s ]
|
||||||
in
|
in
|
||||||
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
|
|
|
@ -59,6 +59,11 @@ module K = struct
|
||||||
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
|
let doc = Arg.info ~doc:"HTTP listen port." ["port"] in
|
||||||
Mirage_runtime.register_arg Arg.(value & opt int 80 doc)
|
Mirage_runtime.register_arg Arg.(value & opt int 80 doc)
|
||||||
|
|
||||||
|
let index_size =
|
||||||
|
let doc = "Number of MB reserved for the index tarball. Only used with --initialize-disk." in
|
||||||
|
let doc = Arg.info ~doc ["index-size"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & opt int 10 doc)
|
||||||
|
|
||||||
let cache_size =
|
let cache_size =
|
||||||
let doc = "Number of MB reserved for each checksum cache (md5, sha512). Only used with --initialize-disk." in
|
let doc = "Number of MB reserved for each checksum cache (md5, sha512). Only used with --initialize-disk." in
|
||||||
let doc = Arg.info ~doc ["cache-size"] in
|
let doc = Arg.info ~doc ["cache-size"] in
|
||||||
|
@ -718,6 +723,37 @@ stamp: %S
|
||||||
mutable index : string ;
|
mutable index : string ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let marshal t =
|
||||||
hannes marked this conversation as resolved
Outdated
|
|||||||
|
let version = char_of_int 1 in
|
||||||
|
String.make 1 version ^ Marshal.to_string t []
|
||||||
|
|
||||||
|
let unmarshal s =
|
||||||
|
let version = int_of_char s.[0] in
|
||||||
|
match version with
|
||||||
|
| 1 -> Ok (Marshal.from_string s 1)
|
||||||
|
| _ -> Error ("Unsupported version " ^ string_of_int version)
|
||||||
|
|
||||||
|
let dump_index index_dump t =
|
||||||
|
let data = marshal t in
|
||||||
|
Cache.write index_dump data >|= function
|
||||||
|
| Ok () ->
|
||||||
|
Logs.info (fun m -> m "dumped index %d bytes" (String.length data))
|
||||||
|
| Error e ->
|
||||||
|
Logs.warn (fun m -> m "failed to dump index: %a" Cache.pp_write_error e)
|
||||||
|
|
||||||
|
let restore_index index_dump =
|
||||||
|
Cache.read index_dump >|= function
|
||||||
|
| Ok None -> Error ()
|
||||||
|
| Error e ->
|
||||||
|
Logs.warn (fun m -> m "failed to read index state: %a" Cache.pp_error e);
|
||||||
|
Error ()
|
||||||
|
| Ok Some data ->
|
||||||
|
match unmarshal data with
|
||||||
|
| Error msg ->
|
||||||
|
Logs.warn (fun m -> m "failed to decode index: %s" msg);
|
||||||
|
Error ()
|
||||||
|
| Ok t -> Ok t
|
||||||
|
|
||||||
let create remote git_kv =
|
let create remote git_kv =
|
||||||
commit_id git_kv >>= fun commit_id ->
|
commit_id git_kv >>= fun commit_id ->
|
||||||
modified git_kv >>= fun modified ->
|
modified git_kv >>= fun modified ->
|
||||||
|
@ -1063,11 +1099,12 @@ stamp: %S
|
||||||
|
|
||||||
module Paf = Paf_mirage.Make(Stack.TCP)
|
module Paf = Paf_mirage.Make(Stack.TCP)
|
||||||
|
|
||||||
let start_mirror { Part.tar; swap; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
let start_mirror { Part.tar; swap; index; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
||||||
KV.connect tar >>= fun kv ->
|
KV.connect tar >>= fun kv ->
|
||||||
Cache.connect git_dump >>= fun git_dump ->
|
Cache.connect git_dump >>= fun git_dump ->
|
||||||
Cache.connect md5s >>= fun md5s ->
|
Cache.connect md5s >>= fun md5s ->
|
||||||
Cache.connect sha512s >>= fun sha512s ->
|
Cache.connect sha512s >>= fun sha512s ->
|
||||||
|
Cache.connect index >>= fun index ->
|
||||||
Swap.connect swap >>= fun swap ->
|
Swap.connect swap >>= fun swap ->
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
let disk = Disk.empty kv md5s sha512s swap in
|
let disk = Disk.empty kv md5s sha512s swap in
|
||||||
|
@ -1076,26 +1113,34 @@ stamp: %S
|
||||||
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk
|
||||||
else
|
else
|
||||||
begin
|
begin
|
||||||
Logs.info (fun m -> m "Initializing git state. This may take a while...");
|
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
||||||
(if K.ignore_local_git () then
|
Logs.info (fun m -> m "Restoring index.");
|
||||||
|
let git_kv = ref None in
|
||||||
|
let init_git_kv () =
|
||||||
|
Logs.info (fun m -> m "Initializing git state. This may take a while");
|
||||||
|
((if K.ignore_local_git () then
|
||||||
Lwt.return (Error ())
|
Lwt.return (Error ())
|
||||||
else
|
else
|
||||||
restore_git ~remote git_dump git_ctx) >>= function
|
restore_git ~remote git_dump git_ctx) >>= function
|
||||||
| Ok git_kv -> Lwt.return (false, git_kv)
|
| Ok git -> Lwt.return (false, git)
|
||||||
| Error () ->
|
| Error () ->
|
||||||
Git_kv.connect git_ctx remote >>= fun git_kv ->
|
Git_kv.connect git_ctx remote >>= fun git ->
|
||||||
Lwt.return (true, git_kv)
|
Lwt.return (true, git)) >>= fun (need_dump, git) ->
|
||||||
end >>= fun (need_dump, git_kv) ->
|
|
||||||
Logs.info (fun m -> m "Done initializing git state!");
|
Logs.info (fun m -> m "Done initializing git state!");
|
||||||
Serve.commit_id git_kv >>= fun commit_id ->
|
git_kv := Some git;
|
||||||
Logs.info (fun m -> m "git: %s" commit_id);
|
Lwt.return (need_dump, git)
|
||||||
Serve.create remote git_kv >>= fun (serve, urls) ->
|
in
|
||||||
Paf.init ~port:(K.port ()) (Stack.tcp stack) >>= fun t ->
|
let update serve () =
|
||||||
let update () =
|
match !git_kv with
|
||||||
|
| None ->
|
||||||
|
Logs.warn (fun m -> m "git kv is not ready yet, thus not updating");
|
||||||
|
Lwt.return_unit
|
||||||
|
| Some git_kv ->
|
||||||
if Disk.completely_checked disk then
|
if Disk.completely_checked disk then
|
||||||
Serve.update_git ~remote serve git_kv >>= function
|
Serve.update_git ~remote serve git_kv >>= function
|
||||||
| None | Some ([], _) -> Lwt.return_unit
|
| None | Some ([], _) -> Lwt.return_unit
|
||||||
| Some (_changes, urls) ->
|
| Some (_changes, urls) ->
|
||||||
|
Serve.dump_index index serve >>= fun () ->
|
||||||
dump_git git_dump git_kv >>= fun () ->
|
dump_git git_dump git_kv >>= fun () ->
|
||||||
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
download_archives (K.parallel_downloads ()) disk http_ctx urls
|
||||||
else begin
|
else begin
|
||||||
|
@ -1103,38 +1148,69 @@ stamp: %S
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end
|
end
|
||||||
in
|
in
|
||||||
|
(Serve.restore_index index >>= function
|
||||||
|
| Ok serve ->
|
||||||
let service =
|
let service =
|
||||||
Paf.http_service
|
Paf.http_service
|
||||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
(Serve.dispatch serve disk (K.hook_url ()) update)
|
(Serve.dispatch serve disk (K.hook_url ()) (update serve))
|
||||||
in
|
in
|
||||||
let `Initialized th = Paf.serve service t in
|
let `Initialized th = Paf.serve service t in
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||||
|
Lwt.return (serve, true, th, false, SM.empty)
|
||||||
|
| Error () ->
|
||||||
|
init_git_kv () >>= fun (need_dump, git) ->
|
||||||
|
Serve.commit_id git >>= fun commit_id ->
|
||||||
|
Logs.info (fun m -> m "git: %s" commit_id);
|
||||||
|
Serve.create remote git >>= fun (serve, urls) ->
|
||||||
|
let service =
|
||||||
|
Paf.http_service
|
||||||
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
|
(Serve.dispatch serve disk (K.hook_url ()) (update serve))
|
||||||
|
in
|
||||||
|
let `Initialized th = Paf.serve service t in
|
||||||
|
Logs.info (fun f -> f "listening on %d/HTTP" (K.port ()));
|
||||||
|
Lwt.return (serve, false, th, need_dump, urls)) >>= fun (serve, need_git_update, th, need_dump, urls) ->
|
||||||
Lwt.join [
|
Lwt.join [
|
||||||
(if need_dump then begin
|
(if need_git_update then
|
||||||
|
init_git_kv () >>= fun (need_dump, git) ->
|
||||||
|
Serve.commit_id git >>= fun commit_id ->
|
||||||
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||||
dump_git git_dump git_kv
|
Serve.dump_index index serve >>= fun () ->
|
||||||
end else
|
dump_git git_dump git
|
||||||
reynir marked this conversation as resolved
reynir
commented
Shouldn't this be Shouldn't this be ``let (`Initialized th) = ... in``?
reynir
commented
Oh, apparently not! It compiles fine. It probably changed at the same time when you can write Oh, apparently not! It compiles fine. It probably changed at the same time when you can write `match _ with Ok `Data data -> ...`
|
|||||||
|
else if need_dump then
|
||||||
|
match !git_kv with
|
||||||
|
| None ->
|
||||||
|
Logs.err (fun m -> m "git_kv not yet set");
|
||||||
|
Lwt.return_unit
|
||||||
|
| Some git ->
|
||||||
|
Serve.commit_id git >>= fun commit_id ->
|
||||||
|
Logs.info (fun m -> m "dumping git state %s" commit_id);
|
||||||
|
Serve.dump_index index serve >>= fun () ->
|
||||||
|
dump_git git_dump git
|
||||||
|
else
|
||||||
Lwt.return_unit) ;
|
Lwt.return_unit) ;
|
||||||
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
(Disk.check ~skip_verify_sha256:(K.skip_verify_sha256 ()) disk)
|
||||||
] >>= fun () ->
|
] >>= fun () ->
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let rec go () =
|
let rec go () =
|
||||||
|
update serve () >>= fun () ->
|
||||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||||
update () >>= fun () ->
|
|
||||||
go ()
|
go ()
|
||||||
in
|
in
|
||||||
go ());
|
go ());
|
||||||
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
download_archives (K.parallel_downloads ()) disk http_ctx urls >>= fun () ->
|
||||||
(th >|= fun _v -> ())
|
(th >|= fun _v -> ())
|
||||||
|
end
|
||||||
|
|
||||||
let start block _time _pclock stack git_ctx http_ctx =
|
let start block _time _pclock stack git_ctx http_ctx =
|
||||||
let initialize_disk = K.initialize_disk ()
|
let initialize_disk = K.initialize_disk ()
|
||||||
and cache_size = K.cache_size ()
|
and cache_size = K.cache_size ()
|
||||||
and git_size = K.git_size ()
|
and git_size = K.git_size ()
|
||||||
and swap_size = K.swap_size () in
|
and swap_size = K.swap_size ()
|
||||||
|
and index_size = K.index_size () in
|
||||||
if initialize_disk then
|
if initialize_disk then
|
||||||
Part.format block ~cache_size ~git_size ~swap_size >>= function
|
Part.format block ~cache_size ~git_size ~swap_size ~index_size >>= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
Loading…
Reference in a new issue
We can also use Marshal like we do for checksum caches. It would make
of_string
easier, I think.that's a good suggestion. yes we should :) (I noticed we already use Marshal for the md5/sha512 maps)
done in
4ae8486