Use swapfs #16
3 changed files with 102 additions and 29 deletions
|
@ -18,6 +18,7 @@ let mirror =
|
||||||
package "gptar" ;
|
package "gptar" ;
|
||||||
package "oneffs" ;
|
package "oneffs" ;
|
||||||
package "digestif" ;
|
package "digestif" ;
|
||||||
|
package "swapfs" ;
|
||||||
]
|
]
|
||||||
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> alpn_client @-> job)
|
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> alpn_client @-> job)
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,14 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
|
|
||||||
type partitions = {
|
type partitions = {
|
||||||
tar : Part.t ;
|
tar : Part.t ;
|
||||||
|
swap : Part.t ;
|
||||||
git_dump : Part.t ;
|
git_dump : Part.t ;
|
||||||
md5s : Part.t ;
|
md5s : Part.t ;
|
||||||
sha512s : Part.t ;
|
sha512s : Part.t ;
|
||||||
}
|
}
|
||||||
|
|
||||||
(* I just made these ones up... *)
|
(* I just made these ones up... *)
|
||||||
|
let swap_guid = Uuidm.of_string "76515dc1-953f-4c59-8b41-90011bdddfcd" |> Option.get
|
||||||
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
let tar_guid = Uuidm.of_string "53cd6812-46cc-474e-a141-30b3aed85f53" |> Option.get
|
||||||
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
let cache_guid = Uuidm.of_string "22ab9cf5-6e51-45c2-998a-862e23aab264" |> Option.get
|
||||||
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
let git_guid = Uuidm.of_string "30faa50a-4c9d-47ff-a1a5-ecfb3401c027" |> Option.get
|
||||||
|
@ -54,33 +56,37 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let connect block =
|
let connect block =
|
||||||
let* info = BLOCK.get_info block in
|
let* info = BLOCK.get_info block in
|
||||||
let* gpt = read_partition_table info block in
|
let* gpt = read_partition_table info block in
|
||||||
let tar, git_dump, md5s, sha512s =
|
let tar, swap, git_dump, md5s, sha512s =
|
||||||
match
|
match
|
||||||
List.fold_left
|
List.fold_left
|
||||||
(fun (tar, git_dump, md5s, sha512s) p ->
|
(fun (tar, swap, git_dump, md5s, sha512s) p ->
|
||||||
if String.equal p.Gpt.Partition.name
|
if String.equal p.Gpt.Partition.name
|
||||||
(utf16be_of_ascii "tar")
|
(utf16be_of_ascii "tar")
|
||||||
then
|
then
|
||||||
(Some p, git_dump, md5s, sha512s)
|
(Some p, swap, git_dump, md5s, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "git_dump")
|
(utf16be_of_ascii "git_dump")
|
||||||
then
|
then
|
||||||
(tar, Some p, md5s, sha512s)
|
(tar, swap, Some p, md5s, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "md5s")
|
(utf16be_of_ascii "md5s")
|
||||||
then
|
then
|
||||||
(tar, git_dump, Some p, sha512s)
|
(tar, swap, git_dump, Some p, sha512s)
|
||||||
else if String.equal p.name
|
else if String.equal p.name
|
||||||
(utf16be_of_ascii "sha512s")
|
(utf16be_of_ascii "sha512s")
|
||||||
then
|
then
|
||||||
(tar, git_dump, md5s, Some p)
|
(tar, swap, git_dump, md5s, Some p)
|
||||||
|
else if String.equal p.name
|
||||||
|
(utf16be_of_ascii "swap")
|
||||||
|
then
|
||||||
|
(tar, Some p, git_dump, md5s, sha512s)
|
||||||
else
|
else
|
||||||
Format.kasprintf failwith "Unknown partition %S" p.name)
|
Format.kasprintf failwith "Unknown partition %S" p.name)
|
||||||
(None, None, None, None)
|
(None, None, None, None, None)
|
||||||
gpt.partitions
|
gpt.partitions
|
||||||
with
|
with
|
||||||
| (Some tar, Some git_dump, Some md5s, Some sha512s) ->
|
| (Some tar, Some swap, Some git_dump, Some md5s, Some sha512s) ->
|
||||||
(tar, git_dump, md5s, sha512s)
|
(tar, swap, git_dump, md5s, sha512s)
|
||||||
| _ ->
|
| _ ->
|
||||||
failwith "not all partitions found :("
|
failwith "not all partitions found :("
|
||||||
in
|
in
|
||||||
|
@ -91,11 +97,11 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
let (part, _after) = Part.subpartition len after in
|
let (part, _after) = Part.subpartition len after in
|
||||||
part
|
part
|
||||||
in
|
in
|
||||||
let tar = get_part tar and git_dump = get_part git_dump
|
let tar = get_part tar and swap = get_part swap and git_dump = get_part git_dump
|
||||||
and md5s = get_part md5s and sha512s = get_part sha512s in
|
and md5s = get_part md5s and sha512s = get_part sha512s in
|
||||||
{ tar ; git_dump ; md5s ; sha512s }
|
{ tar ; swap; git_dump ; md5s ; sha512s }
|
||||||
|
|
||||||
let format block ~sectors_cache ~sectors_git =
|
let format block ~sectors_cache ~sectors_git ~sectors_swap =
|
||||||
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
let* { size_sectors; sector_size; _ } = BLOCK.get_info block in
|
||||||
let ( let*? ) = Lwt_result.bind in
|
let ( let*? ) = Lwt_result.bind in
|
||||||
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
(* ocaml-gpt uses a fixed size partition entries table. Create an empty GPT
|
||||||
|
@ -144,18 +150,27 @@ module Make(BLOCK : Mirage_block.S) = struct
|
||||||
(Int64.pred md5s.starting_lba)
|
(Int64.pred md5s.starting_lba)
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
in
|
in
|
||||||
|
let swap =
|
||||||
|
Gpt.Partition.make
|
||||||
|
~name:(utf16be_of_ascii "swap")
|
||||||
|
~type_guid:swap_guid
|
||||||
|
~attributes
|
||||||
|
(Int64.sub git_dump.starting_lba sectors_swap)
|
||||||
|
(Int64.pred git_dump.starting_lba)
|
||||||
|
|> Result.get_ok
|
||||||
|
in
|
||||||
let tar =
|
let tar =
|
||||||
Gpt.Partition.make
|
Gpt.Partition.make
|
||||||
~name:(utf16be_of_ascii "tar")
|
~name:(utf16be_of_ascii "tar")
|
||||||
~type_guid:tar_guid
|
~type_guid:tar_guid
|
||||||
~attributes
|
~attributes
|
||||||
empty.first_usable_lba
|
empty.first_usable_lba
|
||||||
(Int64.pred git_dump.starting_lba)
|
(Int64.pred swap.starting_lba)
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
in
|
in
|
||||||
let gpt =
|
let gpt =
|
||||||
let partitions =
|
let partitions =
|
||||||
[ tar; git_dump; md5s; sha512s ]
|
[ tar; swap; git_dump; md5s; sha512s ]
|
||||||
in
|
in
|
||||||
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
Gpt.make ~sector_size ~disk_sectors:size_sectors partitions
|
||||||
|> Result.get_ok
|
|> Result.get_ok
|
||||||
|
|
|
@ -53,6 +53,11 @@ module K = struct
|
||||||
let doc = Arg.info ~doc ["sectors-git"] in
|
let doc = Arg.info ~doc ["sectors-git"] in
|
||||||
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 40L (mul 2L 1024L)) doc)
|
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 40L (mul 2L 1024L)) doc)
|
||||||
|
|
||||||
|
let sectors_swap =
|
||||||
|
let doc = "Number of sectors reserved for swap. Only used with --initialize-disk" in
|
||||||
|
let doc = Arg.info ~doc ["sectors-swap"] in
|
||||||
|
Mirage_runtime.register_arg Arg.(value & opt int64 Int64.(mul 1024L 2048L) doc)
|
||||||
|
|
||||||
let initialize_disk =
|
let initialize_disk =
|
||||||
let doc = "Initialize the disk with a partition table. THIS IS DESTRUCTIVE!" in
|
let doc = "Initialize the disk with a partition table. THIS IS DESTRUCTIVE!" in
|
||||||
let doc = Arg.info ~doc ["initialize-disk"] in
|
let doc = Arg.info ~doc ["initialize-disk"] in
|
||||||
|
@ -75,6 +80,7 @@ module Make
|
||||||
module Part = Partitions.Make(BLOCK)
|
module Part = Partitions.Make(BLOCK)
|
||||||
module KV = Tar_mirage.Make_KV_RW(Pclock)(Part)
|
module KV = Tar_mirage.Make_KV_RW(Pclock)(Part)
|
||||||
module Cache = OneFFS.Make(Part)
|
module Cache = OneFFS.Make(Part)
|
||||||
|
module Swap = Swapfs.Make(Part)
|
||||||
module Store = Git_kv.Make(Pclock)
|
module Store = Git_kv.Make(Pclock)
|
||||||
|
|
||||||
module SM = Map.Make(String)
|
module SM = Map.Make(String)
|
||||||
|
@ -189,13 +195,14 @@ module Make
|
||||||
dev : KV.t ;
|
dev : KV.t ;
|
||||||
dev_md5s : Cache.t ;
|
dev_md5s : Cache.t ;
|
||||||
dev_sha512s : Cache.t ;
|
dev_sha512s : Cache.t ;
|
||||||
|
dev_swap : Swap.t ;
|
||||||
}
|
}
|
||||||
|
|
||||||
let pending = Mirage_kv.Key.v "pending"
|
let pending = Mirage_kv.Key.v "pending"
|
||||||
|
|
||||||
let to_delete = Mirage_kv.Key.v "to-delete"
|
let to_delete = Mirage_kv.Key.v "to-delete"
|
||||||
|
|
||||||
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
let empty dev dev_md5s dev_sha512s dev_swap = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s ; dev_swap }
|
||||||
|
|
||||||
let marshal_sm (sm : string SM.t) =
|
let marshal_sm (sm : string SM.t) =
|
||||||
let version = char_of_int 1 in
|
let version = char_of_int 1 in
|
||||||
|
@ -355,7 +362,10 @@ module Make
|
||||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||||
| `Unknown ->
|
| `Unknown ->
|
||||||
active_add_bytes url (String.length data);
|
active_add_bytes url (String.length data);
|
||||||
Lwt.return_ok (digests, `Unknown data)
|
let h = Swap.empty t.dev_swap in
|
||||||
|
Swap.append h data >|= function
|
||||||
|
| Ok () -> Ok (digests, `Unknown h)
|
||||||
|
| Error swap_err -> Error (`Swap swap_err)
|
||||||
end
|
end
|
||||||
| `Fixed_body (size, offset) ->
|
| `Fixed_body (size, offset) ->
|
||||||
KV.set_partial t.dev key ~offset data
|
KV.set_partial t.dev key ~offset data
|
||||||
|
@ -364,9 +374,11 @@ module Make
|
||||||
let offset = Optint.Int63.(add offset (of_int len)) in
|
let offset = Optint.Int63.(add offset (of_int len)) in
|
||||||
active_add_bytes url len;
|
active_add_bytes url len;
|
||||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||||
| `Unknown body ->
|
| `Unknown h ->
|
||||||
active_add_bytes url (String.length data);
|
active_add_bytes url (String.length data);
|
||||||
Lwt.return_ok (digests, `Unknown (body ^ data))
|
Swap.append h data >|= function
|
||||||
|
| Ok () -> Ok (digests, `Unknown h)
|
||||||
|
| Error swap_err -> Error (`Swap swap_err)
|
||||||
|
|
||||||
let check_csums_digests csums digests =
|
let check_csums_digests csums digests =
|
||||||
let csums' = Archive_checksum.digests_to_hm digests in
|
let csums' = Archive_checksum.digests_to_hm digests in
|
||||||
|
@ -376,7 +388,32 @@ module Make
|
||||||
(fun (h, csum) -> String.equal csum (HM.find h csums))
|
(fun (h, csum) -> String.equal csum (HM.find h csums))
|
||||||
common_bindings
|
common_bindings
|
||||||
|
|
||||||
let finalize_write t (hash, csum) ~url (body : [ `Unknown of string | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests =
|
let set_from_handle dev dest h =
|
||||||
|
let size = Optint.Int63.of_int64 (Swap.size h) in
|
||||||
|
KV.allocate dev dest size >>= fun r ->
|
||||||
|
let rec loop offset =
|
||||||
|
if offset = Swap.size h then
|
||||||
|
Lwt.return_ok ()
|
||||||
|
else
|
||||||
|
let length = Int64.(to_int (min 4096L (sub (Swap.size h) offset))) in
|
||||||
|
Swap.get_partial h ~offset ~length >>= fun r ->
|
||||||
|
match r with
|
||||||
|
| Error e -> Lwt.return (Error (`Swap e))
|
||||||
|
| Ok data ->
|
||||||
|
KV.set_partial dev dest ~offset:(Optint.Int63.of_int64 offset) data
|
||||||
|
>>= fun r ->
|
||||||
|
match r with
|
||||||
|
| Error e -> Lwt.return (Error (`Write_error e))
|
||||||
|
| Ok () ->
|
||||||
|
loop Int64.(add offset (of_int length))
|
||||||
|
in
|
||||||
|
match r with
|
||||||
|
| Ok () ->
|
||||||
|
loop 0L
|
||||||
|
| Error e ->
|
||||||
|
Lwt.return (Error (`Write_error e))
|
||||||
|
|
||||||
|
let finalize_write t (hash, csum) ~url (body : [ `Unknown of Swap.handle | `Fixed_body of int64 * Optint.Int63.t | `Init ]) csums digests =
|
||||||
let sizes_match, body_size_in_header =
|
let sizes_match, body_size_in_header =
|
||||||
match body with
|
match body with
|
||||||
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true
|
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int64 reported) actual), true
|
||||||
|
@ -390,12 +427,18 @@ module Make
|
||||||
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
|
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
|
||||||
let dest = Mirage_kv.Key.v sha256 in
|
let dest = Mirage_kv.Key.v sha256 in
|
||||||
begin match body with
|
begin match body with
|
||||||
| `Unknown body ->
|
| `Unknown h ->
|
||||||
Logs.info (fun m -> m "downloaded %s, now writing" url);
|
Logs.info (fun m -> m "downloaded %s, now writing" url);
|
||||||
KV.set t.dev dest body
|
Lwt_result.bind
|
||||||
|
(Lwt.finalize (fun () -> set_from_handle t.dev source h)
|
||||||
|
(fun () -> Swap.free h))
|
||||||
|
(fun () ->
|
||||||
|
KV.rename t.dev ~source ~dest
|
||||||
|
|> Lwt_result.map_error (fun e -> `Write_error e))
|
||||||
| `Fixed_body (_reported_size, _actual_size) ->
|
| `Fixed_body (_reported_size, _actual_size) ->
|
||||||
Logs.info (fun m -> m "downloaded %s" url);
|
Logs.info (fun m -> m "downloaded %s" url);
|
||||||
KV.rename t.dev ~source ~dest
|
KV.rename t.dev ~source ~dest
|
||||||
|
|> Lwt_result.map_error (fun e -> `Write_error e)
|
||||||
| `Init -> assert false
|
| `Init -> assert false
|
||||||
end >|= function
|
end >|= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
|
@ -403,9 +446,13 @@ module Make
|
||||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
||||||
| Error e ->
|
| Error e ->
|
||||||
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e);
|
let pp_error ppf = function
|
||||||
|
| `Write_error e -> KV.pp_write_error ppf e
|
||||||
|
| `Swap e -> Swap.pp_error ppf e
|
||||||
|
in
|
||||||
|
Logs.err (fun m -> m "Write failure for %s: %a" url pp_error e);
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
(Fmt.str "Write failure for %s: %a" url KV.pp_write_error e)
|
(Fmt.str "Write failure for %s: %a" url pp_error e)
|
||||||
else begin
|
else begin
|
||||||
(if sizes_match then begin
|
(if sizes_match then begin
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
@ -447,11 +494,11 @@ module Make
|
||||||
end
|
end
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
let init ~verify_sha256 dev dev_md5s dev_sha512s =
|
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
|
||||||
KV.list dev Mirage_kv.Key.empty >>= function
|
KV.list dev Mirage_kv.Key.empty >>= function
|
||||||
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false
|
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false
|
||||||
| Ok entries ->
|
| Ok entries ->
|
||||||
let t = empty dev dev_md5s dev_sha512s in
|
let t = empty dev dev_md5s dev_sha512s dev_swap in
|
||||||
Cache.read t.dev_md5s >>= fun r ->
|
Cache.read t.dev_md5s >>= fun r ->
|
||||||
(match r with
|
(match r with
|
||||||
| Ok Some s ->
|
| Ok Some s ->
|
||||||
|
@ -946,6 +993,14 @@ stamp: %S
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
(Fmt.str "write error: %a" KV.pp_write_error e);
|
(Fmt.str "write error: %a" KV.pp_write_error e);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
| Error `Swap e ->
|
||||||
|
Logs.err (fun m -> m "%s: swap error %a %a"
|
||||||
|
url
|
||||||
|
Mirage_kv.Key.pp (Disk.pending_key quux)
|
||||||
|
Swap.pp_error e);
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(Fmt.str "swap error: %a" Swap.pp_error e);
|
||||||
|
Lwt.return_unit
|
||||||
| Ok (digests, body) ->
|
| Ok (digests, body) ->
|
||||||
Disk.finalize_write disk quux ~url body csums digests
|
Disk.finalize_write disk quux ~url body csums digests
|
||||||
end
|
end
|
||||||
|
@ -980,13 +1035,14 @@ stamp: %S
|
||||||
|
|
||||||
module Paf = Paf_mirage.Make(Stack.TCP)
|
module Paf = Paf_mirage.Make(Stack.TCP)
|
||||||
|
|
||||||
let start_mirror { Part.tar; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
let start_mirror { Part.tar; swap; git_dump; md5s; sha512s } stack git_ctx http_ctx =
|
||||||
KV.connect tar >>= fun kv ->
|
KV.connect tar >>= fun kv ->
|
||||||
Cache.connect git_dump >>= fun git_dump ->
|
Cache.connect git_dump >>= fun git_dump ->
|
||||||
Cache.connect md5s >>= fun md5s ->
|
Cache.connect md5s >>= fun md5s ->
|
||||||
Cache.connect sha512s >>= fun sha512s ->
|
Cache.connect sha512s >>= fun sha512s ->
|
||||||
|
Swap.connect swap >>= fun swap ->
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s >>= fun disk ->
|
Disk.init ~verify_sha256:(K.verify_sha256 ()) kv md5s sha512s swap >>= fun disk ->
|
||||||
let remote = K.remote () in
|
let remote = K.remote () in
|
||||||
if K.check () then
|
if K.check () then
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
@ -1035,9 +1091,10 @@ stamp: %S
|
||||||
let start block _time _pclock stack git_ctx http_ctx =
|
let start block _time _pclock stack git_ctx http_ctx =
|
||||||
let initialize_disk = K.initialize_disk ()
|
let initialize_disk = K.initialize_disk ()
|
||||||
and sectors_cache = K.sectors_cache ()
|
and sectors_cache = K.sectors_cache ()
|
||||||
and sectors_git = K.sectors_git () in
|
and sectors_git = K.sectors_git ()
|
||||||
|
and sectors_swap = K.sectors_swap () in
|
||||||
if initialize_disk then
|
if initialize_disk then
|
||||||
Part.format block ~sectors_cache ~sectors_git >>= function
|
Part.format block ~sectors_cache ~sectors_git ~sectors_swap >>= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
Logs.app (fun m -> m "Successfully initialized the disk! You may restart now without --initialize-disk.");
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
Loading…
Reference in a new issue