Use swapfs #16

Merged
hannes merged 16 commits from swap into main 2024-11-08 12:54:01 +00:00
Showing only changes of commit 1f9e3e6e23 - Show all commits

View file

@ -118,20 +118,12 @@ module Make
| step :: tl -> | step :: tl ->
explore := tl; explore := tl;
Store.exists store step >>= function Store.exists store step >>= function
| Error e -> | Error e -> go ()
Logs.err (fun m -> m "error %a for exists %a" Store.pp_error e | Ok None -> go ()
Mirage_kv.Key.pp step);
go ()
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp step);
go ()
| Ok Some `Value -> Lwt.return (Some step) | Ok Some `Value -> Lwt.return (Some step)
| Ok Some `Dictionary -> | Ok Some `Dictionary ->
Store.list store step >>= function Store.list store step >>= function
| Error e -> | Error e -> go ()
Logs.err (fun m -> m "error %a while listing %a"
Store.pp_error e Mirage_kv.Key.pp step);
go ()
| Ok steps -> | Ok steps ->
explore := List.map fst steps @ !explore; explore := List.map fst steps @ !explore;
go () go ()
@ -147,8 +139,7 @@ module Make
List.iter (fun (`Msg msg) -> add_parse_error path msg) errs; List.iter (fun (`Msg msg) -> add_parse_error path msg) errs;
List.fold_left (fun acc (url, csums) -> List.fold_left (fun acc (url, csums) ->
if HM.cardinal csums = 0 then if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); (add_parse_error path ("no checksums for " ^ url);
add_parse_error path ("no checksums for " ^ url);
acc) acc)
else else
SM.update url (function SM.update url (function
@ -161,8 +152,6 @@ module Make
then then
Some (HM.union (fun _h v _v' -> Some v) csums csums') Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
add_parse_error path (Fmt.str add_parse_error path (Fmt.str
"mismatching hashes for %s: %s vs %s" "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums)); url (hm_to_s csums') (hm_to_s csums));
@ -265,11 +254,11 @@ module Make
let update_caches t = let update_caches t =
Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r -> Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r ->
(match r with (match r with
| Ok () -> Logs.info (fun m -> m "Set 'md5s'") | Ok () -> ()
| Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e)); | Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e));
Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r -> Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r ->
match r with match r with
| Ok () -> Logs.info (fun m -> m "Set 'sha512s'"); Lwt.return_unit | Ok () -> Lwt.return_unit
| Error e -> | Error e ->
Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e); Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e);
Lwt.return_unit Lwt.return_unit
@ -295,8 +284,6 @@ module Make
| Ok key -> | Ok key ->
KV.size t.dev key >>= function KV.size t.dev key >>= function
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Lwt.return (Error (`Not_found key)) Lwt.return (Error (`Not_found key))
| Ok len -> | Ok len ->
let chunk_size = 4096 in let chunk_size = 4096 in
@ -307,8 +294,6 @@ module Make
f a data >>= fun a -> f a data >>= fun a ->
read_more a Optint.Int63.(add offset (of_int chunk_size)) read_more a Optint.Int63.(add offset (of_int chunk_size))
| Error e -> | Error e ->
Logs.err (fun m -> m "error %a while reading %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Lwt.return (Error e) Lwt.return (Error e)
else else
Lwt.return (Ok a) Lwt.return (Ok a)
@ -376,7 +361,6 @@ module Make
and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5)) and md5 = Ohex.encode Digestif.MD5.(to_raw_string (get digests.md5))
and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in and sha512 = Ohex.encode Digestif.SHA512.(to_raw_string (get digests.sha512)) in
let dest = Mirage_kv.Key.v sha256 in let dest = Mirage_kv.Key.v sha256 in
Logs.info (fun m -> m "downloaded %s, now writing" url);
let temp = Mirage_kv.Key.(v "pending" // dest) in let temp = Mirage_kv.Key.(v "pending" // dest) in
Lwt_result.bind Lwt_result.bind
(Lwt.finalize (fun () -> set_from_handle t.dev temp swap) (Lwt.finalize (fun () -> set_from_handle t.dev temp swap)
@ -388,29 +372,18 @@ module Make
remove_active url; remove_active url;
t.md5s <- SM.add md5 sha256 t.md5s; t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s t.sha512s <- SM.add sha512 sha256 t.sha512s
| Error e -> | Error `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
let pp_error ppf = function | Error `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
| `Write_error e -> KV.pp_write_error ppf e
| `Swap e -> Swap.pp_error ppf e
in
Logs.err (fun m -> m "Write failure for %s: %a" url pp_error e);
match e with
| `Write_error e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e)
| `Swap e -> add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e)
else begin else begin
add_failed url (Ptime.v (Pclock.now_d_ps ())) add_failed url (Ptime.v (Pclock.now_d_ps ()))
(`Bad_checksum (hash, Archive_checksum.get digests hash, csum)); (`Bad_checksum (hash, Archive_checksum.get digests hash, csum));
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum));
Lwt.return_unit Lwt.return_unit
end end
(* on disk, we use a flat file system where the filename is the sha256 of the data *) (* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap = let init ~verify_sha256 dev dev_md5s dev_sha512s dev_swap =
KV.list dev Mirage_kv.Key.empty >>= function KV.list dev Mirage_kv.Key.empty >>= function
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false | Error e -> invalid_arg (Fmt.str "error %a listing kv" KV.pp_error e)
| Ok entries -> | Ok entries ->
let t = empty dev dev_md5s dev_sha512s dev_swap in let t = empty dev dev_md5s dev_sha512s dev_swap in
Cache.read t.dev_md5s >>= fun r -> Cache.read t.dev_md5s >>= fun r ->
@ -418,24 +391,20 @@ module Make
| Ok Some s -> | Ok Some s ->
if not verify_sha256 then if not verify_sha256 then
Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s) Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
| Ok None -> Logs.debug (fun m -> m "No md5s cached") | Ok None -> ()
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e)); | Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
Cache.read t.dev_sha512s >>= fun r -> Cache.read t.dev_sha512s >>= fun r ->
(match r with (match r with
| Ok Some s -> | Ok Some s ->
if not verify_sha256 then if not verify_sha256 then
Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s) Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
| Ok None -> Logs.debug (fun m -> m "No sha512s cached") | Ok None -> ()
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e)); | Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s)) let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
let idx = ref 1 in
Lwt_list.iter_s (fun (path, typ) -> Lwt_list.iter_s (fun (path, typ) ->
if !idx mod 10 = 0 then Gc.full_major () ;
match typ with match typ with
| `Dictionary -> | `Dictionary -> Lwt.return_unit
Logs.warn (fun m -> m "unexpected dictionary at %a" Mirage_kv.Key.pp path);
Lwt.return_unit
| `Value -> | `Value ->
let open Digestif in let open Digestif in
let md5_final = let md5_final =
@ -501,7 +470,6 @@ module Make
else begin else begin
Option.iter (fun f -> f (Option.get md5)) md5_final; Option.iter (fun f -> f (Option.get md5)) md5_final;
Option.iter (fun f -> f (Option.get sha512)) sha512_final; Option.iter (fun f -> f (Option.get sha512)) sha512_final;
Logs.info (fun m -> m "added %a" Mirage_kv.Key.pp path);
Lwt.return_unit Lwt.return_unit
end) end)
entries >>= fun () -> entries >>= fun () ->
@ -514,15 +482,9 @@ module Make
| Ok x -> | Ok x ->
KV.exists t.dev x >|= function KV.exists t.dev x >|= function
| Ok Some `Value -> true | Ok Some `Value -> true
| Ok Some `Dictionary -> | Ok Some `Dictionary -> false
Logs.err (fun m -> m "unexpected dictionary for %s %a"
(hash_to_string h) Mirage_kv.Key.pp v);
false
| Ok None -> false | Ok None -> false
| Error e -> | Error _ -> false
Logs.err (fun m -> m "exists %s %a returned %a"
(hash_to_string h) Mirage_kv.Key.pp v KV.pp_error e);
false
let last_modified t h v = let last_modified t h v =
match find_key t h v with match find_key t h v with
@ -530,10 +492,7 @@ module Make
| Ok x -> | Ok x ->
KV.last_modified t.dev x >|= function KV.last_modified t.dev x >|= function
| Ok data -> Ok data | Ok data -> Ok data
| Error e -> | Error _ -> Error `Not_found
Logs.err (fun m -> m "error %a while last_modified %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Error `Not_found
let size t h v = let size t h v =
match find_key t h v with match find_key t h v with
@ -541,10 +500,7 @@ module Make
| Ok x -> | Ok x ->
KV.size t.dev x >|= function KV.size t.dev x >|= function
| Ok s -> Ok s | Ok s -> Ok s
| Error e -> | Error _ -> Error `Not_found
Logs.err (fun m -> m "error %a while size %s %a"
KV.pp_error e (hash_to_string h) Mirage_kv.Key.pp v);
Error `Not_found
end end
module Tarball = struct module Tarball = struct
@ -802,7 +758,6 @@ stamp: %S
else *) else *)
let dispatch t store hook_url update _flow _conn reqd = let dispatch t store hook_url update _flow _conn reqd =
let request = Httpaf.Reqd.request reqd in let request = Httpaf.Reqd.request reqd in
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
match String.split_on_char '/' request.Httpaf.Request.target with match String.split_on_char '/' request.Httpaf.Request.target with
| [ ""; x ] when String.equal x hook_url -> | [ ""; x ] when String.equal x hook_url ->
Lwt.async update; Lwt.async update;
@ -865,15 +820,12 @@ stamp: %S
begin begin
match hash_of_string hash_algo with match hash_of_string hash_algo with
| Error `Msg msg -> | Error `Msg msg ->
Logs.warn (fun m -> m "error decoding hash algo: %s" msg);
not_found reqd request.Httpaf.Request.target not_found reqd request.Httpaf.Request.target
| Ok h -> | Ok h ->
let hash = Mirage_kv.Key.v hash in let hash = Mirage_kv.Key.v hash in
Lwt.async (fun () -> Lwt.async (fun () ->
(Disk.last_modified store h hash >|= function (Disk.last_modified store h hash >|= function
| Error _ -> | Error _ -> t.modified
Logs.warn (fun m -> m "error retrieving last modified");
t.modified
| Ok v -> ptime_to_http_date v) >>= fun last_modified -> | Ok v -> ptime_to_http_date v) >>= fun last_modified ->
if not_modified request (last_modified, Mirage_kv.Key.basename hash) then if not_modified request (last_modified, Mirage_kv.Key.basename hash) then
let resp = Httpaf.Response.create `Not_modified in let resp = Httpaf.Response.create `Not_modified in
@ -882,7 +834,6 @@ stamp: %S
else else
Disk.size store h hash >>= function Disk.size store h hash >>= function
| Error _ -> | Error _ ->
Logs.warn (fun m -> m "error retrieving size");
not_found reqd request.Httpaf.Request.target; not_found reqd request.Httpaf.Request.target;
Lwt.return_unit Lwt.return_unit
| Ok size -> | Ok size ->
@ -916,7 +867,6 @@ stamp: %S
(* FIXME: handle resuming partial downloads *) (* FIXME: handle resuming partial downloads *)
reset_failed_downloads (); reset_failed_downloads ();
let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in let pool = Lwt_pool.create parallel_downloads (Fun.const Lwt.return_unit) in
let idx = ref 0 in
Lwt_list.iter_p (fun (url, csums) -> Lwt_list.iter_p (fun (url, csums) ->
Lwt_pool.use pool @@ fun () -> Lwt_pool.use pool @@ fun () ->
HM.fold (fun h v r -> HM.fold (fun h v r ->
@ -924,34 +874,21 @@ stamp: %S
| true -> Disk.exists disk h (hex_to_key v) | true -> Disk.exists disk h (hex_to_key v)
| false -> Lwt.return false) | false -> Lwt.return false)
csums (Lwt.return true) >>= function csums (Lwt.return true) >>= function
| true -> | true -> Lwt.return_unit
Logs.debug (fun m -> m "ignoring %s (already present)" url);
Lwt.return_unit
| false -> | false ->
incr idx;
if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url);
let quux, body_init = Disk.init_write disk csums in let quux, body_init = Disk.init_write disk csums in
add_to_active url (Ptime.v (Pclock.now_d_ps ())); add_to_active url (Ptime.v (Pclock.now_d_ps ()));
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
| Ok (resp, r) -> | Ok (resp, r) ->
begin match r with begin match r with
| Error `Bad_response -> | Error `Bad_response ->
Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason);
add_failed url (Ptime.v (Pclock.now_d_ps ())) add_failed url (Ptime.v (Pclock.now_d_ps ()))
(`Bad_response (resp.status, resp.reason)); (`Bad_response (resp.status, resp.reason));
Lwt.return_unit Lwt.return_unit
| Error `Write_error e -> | Error `Write_error e ->
Logs.err (fun m -> m "%s: write error %a"
url
KV.pp_write_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Write_error e);
Lwt.return_unit Lwt.return_unit
| Error `Swap e -> | Error `Swap e ->
Logs.err (fun m -> m "%s: swap error %a"
url
Swap.pp_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e); add_failed url (Ptime.v (Pclock.now_d_ps ())) (`Swap e);
Lwt.return_unit Lwt.return_unit
| Ok (digests, body) -> | Ok (digests, body) ->