Merge pull request 'Hash cache' (#14) from partition' into main
Reviewed-on: https://git.robur.io/robur/opam-mirror/pulls/14
This commit is contained in:
commit
c81ba101f9
3 changed files with 127 additions and 52 deletions
|
@ -3,5 +3,4 @@
|
||||||
This unikernel periodically (at startup, on request, every hour) updates the
|
This unikernel periodically (at startup, on request, every hour) updates the
|
||||||
provided opam-repository and downloads all referenced archives. It acts as
|
provided opam-repository and downloads all referenced archives. It acts as
|
||||||
an opam-repository including archive mirror. Only archives with appropriate
|
an opam-repository including archive mirror. Only archives with appropriate
|
||||||
checksums are stored. On startup, all data present on the block device is
|
checksums are stored.
|
||||||
validated.
|
|
||||||
|
|
|
@ -5,12 +5,16 @@ let http_client = typ HTTP_client
|
||||||
|
|
||||||
let check =
|
let check =
|
||||||
let doc =
|
let doc =
|
||||||
Key.Arg.info
|
Key.Arg.info ~doc:"Only check the cache" ["check"]
|
||||||
~doc:"Only check the cache"
|
|
||||||
["check"]
|
|
||||||
in
|
in
|
||||||
Key.(create "check" Arg.(flag doc))
|
Key.(create "check" Arg.(flag doc))
|
||||||
|
|
||||||
|
let verify =
|
||||||
|
let doc =
|
||||||
|
Key.Arg.info ~doc:"Verify the cache contents" ["verify"]
|
||||||
|
in
|
||||||
|
Key.(create "verify" Arg.(flag doc))
|
||||||
|
|
||||||
let remote =
|
let remote =
|
||||||
let doc =
|
let doc =
|
||||||
Key.Arg.info
|
Key.Arg.info
|
||||||
|
@ -46,9 +50,14 @@ let tls_authenticator =
|
||||||
let doc = Key.Arg.info ~doc ["tls-authenticator"] in
|
let doc = Key.Arg.info ~doc ["tls-authenticator"] in
|
||||||
Key.(create "tls-authenticator" Arg.(opt (some string) None doc))
|
Key.(create "tls-authenticator" Arg.(opt (some string) None doc))
|
||||||
|
|
||||||
|
let sectors_cache =
|
||||||
|
let doc = "Number of sectors reserved for each checksum cache (md5, sha512)." in
|
||||||
|
let doc = Key.Arg.info ~doc ["sectors-cache"] in
|
||||||
|
Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc))
|
||||||
|
|
||||||
let mirror =
|
let mirror =
|
||||||
foreign "Unikernel.Make"
|
foreign "Unikernel.Make"
|
||||||
~keys:[ Key.v check ; Key.v remote ; Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ; Key.v port ]
|
~keys:[ Key.v check ; Key.v verify ; Key.v remote ; Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ; Key.v port ; Key.v sectors_cache ]
|
||||||
~packages:[
|
~packages:[
|
||||||
package ~min:"0.1.0" ~sublibs:[ "mirage" ] "paf" ;
|
package ~min:"0.1.0" ~sublibs:[ "mirage" ] "paf" ;
|
||||||
package "h2" ;
|
package "h2" ;
|
||||||
|
@ -58,6 +67,8 @@ let mirror =
|
||||||
package "opam-file-format" ;
|
package "opam-file-format" ;
|
||||||
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
||||||
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw" "tar-mirage" ;
|
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw" "tar-mirage" ;
|
||||||
|
package ~pin:"git+https://github.com/reynir/mirage-block-partition.git" "mirage-block-partition" ;
|
||||||
|
package ~pin:"git+https://git.robur.io/reynir/oneffs.git" "oneffs" ;
|
||||||
]
|
]
|
||||||
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job)
|
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job)
|
||||||
|
|
||||||
|
|
|
@ -10,9 +10,12 @@ module Make
|
||||||
(_ : sig end)
|
(_ : sig end)
|
||||||
(HTTP : Http_mirage_client.S) = struct
|
(HTTP : Http_mirage_client.S) = struct
|
||||||
|
|
||||||
module KV = Tar_mirage.Make_KV_RW(BLOCK)
|
module Part = Mirage_block_partition.Make(BLOCK)
|
||||||
|
module KV = Tar_mirage.Make_KV_RW(Part)
|
||||||
|
module Cache = OneFFS.Make(Part)
|
||||||
|
|
||||||
module SM = Map.Make(String)
|
module SM = Map.Make(String)
|
||||||
|
module SSet = Set.Make(String)
|
||||||
|
|
||||||
module HM = Map.Make(struct
|
module HM = Map.Make(struct
|
||||||
type t = Mirage_crypto.Hash.hash
|
type t = Mirage_crypto.Hash.hash
|
||||||
|
@ -200,66 +203,120 @@ module Make
|
||||||
mutable md5s : string SM.t ;
|
mutable md5s : string SM.t ;
|
||||||
mutable sha512s : string SM.t ;
|
mutable sha512s : string SM.t ;
|
||||||
dev : KV.t ;
|
dev : KV.t ;
|
||||||
|
dev_md5s : Cache.t ;
|
||||||
|
dev_sha512s : Cache.t ;
|
||||||
}
|
}
|
||||||
|
|
||||||
let empty dev = { md5s = SM.empty ; sha512s = SM.empty ; dev }
|
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
||||||
|
|
||||||
let key _t d =
|
let to_hex d =
|
||||||
let d = Cstruct.to_string d in
|
let d = Cstruct.to_string d in
|
||||||
hex_to_string d
|
hex_to_string d
|
||||||
|
|
||||||
|
let marshal_sm (sm : string SM.t) =
|
||||||
|
let version = char_of_int 1 in
|
||||||
|
String.make 1 version ^ Marshal.to_string sm []
|
||||||
|
|
||||||
|
let unmarshal_sm s =
|
||||||
|
let version = int_of_char s.[0] in
|
||||||
|
match version with
|
||||||
|
| 1 -> Ok (Marshal.from_string s 1 : string SM.t)
|
||||||
|
| _ -> Error ("Unsupported version " ^ string_of_int version)
|
||||||
|
|
||||||
|
let update_caches t =
|
||||||
|
Cache.write t.dev_md5s (marshal_sm t.md5s) >>= fun r ->
|
||||||
|
(match r with
|
||||||
|
| Ok () -> Logs.info (fun m -> m "Set 'md5s'")
|
||||||
|
| Error e -> Logs.warn (fun m -> m "Failed to write 'md5s': %a" Cache.pp_write_error e));
|
||||||
|
Cache.write t.dev_sha512s (marshal_sm t.sha512s) >>= fun r ->
|
||||||
|
match r with
|
||||||
|
| Ok () -> Logs.info (fun m -> m "Set 'sha512s'"); Lwt.return_unit
|
||||||
|
| Error e ->
|
||||||
|
Logs.warn (fun m -> m "Failed to write 'sha512s': %a" Cache.pp_write_error e);
|
||||||
|
Lwt.return_unit
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
(* on startup, we read + validate all data, and also store in the overlays (md5/sha512) the pointers *)
|
let init ~verify dev dev_md5s dev_sha512s =
|
||||||
(* the read can be md5/sha256/sha512 sum, and will output the data requested *)
|
Logs.info (fun m -> m "init with verify %B" verify);
|
||||||
(* a write will compute the hashes and save the data (also validating potential other hashes) *)
|
|
||||||
let init dev =
|
|
||||||
KV.list dev Mirage_kv.Key.empty >>= function
|
KV.list dev Mirage_kv.Key.empty >>= function
|
||||||
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false
|
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false
|
||||||
| Ok entries ->
|
| Ok entries ->
|
||||||
let t = empty dev in
|
let t = empty dev dev_md5s dev_sha512s in
|
||||||
Lwt_list.iteri_s (fun idx (name, typ) ->
|
Cache.read t.dev_md5s >>= fun r ->
|
||||||
if idx mod 10 = 0 then Gc.full_major () ;
|
(match r with
|
||||||
|
| Ok Some s -> Result.iter (fun md5s -> t.md5s <- md5s) (unmarshal_sm s)
|
||||||
|
| Ok None -> Logs.debug (fun m -> m "No md5s cached")
|
||||||
|
| Error e -> Logs.warn (fun m -> m "Error reading md5s cache: %a" Cache.pp_error e));
|
||||||
|
Cache.read t.dev_sha512s >>= fun r ->
|
||||||
|
(match r with
|
||||||
|
| Ok Some s -> Result.iter (fun sha512s -> t.sha512s <- sha512s) (unmarshal_sm s)
|
||||||
|
| Ok None -> Logs.debug (fun m -> m "No sha512s cached")
|
||||||
|
| Error e -> Logs.warn (fun m -> m "Error reading sha512s cache: %a" Cache.pp_error e));
|
||||||
|
let md5s = SSet.of_list (List.map snd (SM.bindings t.md5s))
|
||||||
|
and sha512s = SSet.of_list (List.map snd (SM.bindings t.sha512s)) in
|
||||||
|
let idx = ref 1 in
|
||||||
|
Lwt_list.iter_s (fun (name, typ) ->
|
||||||
|
if !idx mod 10 = 0 then Gc.full_major () ;
|
||||||
match typ with
|
match typ with
|
||||||
| `Dictionary ->
|
| `Dictionary ->
|
||||||
Logs.warn (fun m -> m "unexpected dictionary at %s" name);
|
Logs.warn (fun m -> m "unexpected dictionary at %s" name);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| `Value ->
|
| `Value ->
|
||||||
KV.get dev (Mirage_kv.Key.v name) >>= function
|
let ( >|?= ) x f = Lwt_result.iter (fun v -> Lwt.return (f v)) x in
|
||||||
| Ok data ->
|
let _data = ref None in
|
||||||
let cs = Cstruct.of_string data in
|
let read_data () =
|
||||||
let digest = Mirage_crypto.Hash.digest `SHA256 cs in
|
match !_data with
|
||||||
if String.equal name (key t digest) then begin
|
| Some cs -> Lwt.return (Ok cs)
|
||||||
let md5 = Mirage_crypto.Hash.digest `MD5 cs |> key t
|
| None ->
|
||||||
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> key t
|
incr idx;
|
||||||
in
|
KV.get dev (Mirage_kv.Key.v name) >|= function
|
||||||
let md5s = SM.add md5 name t.md5s
|
|
||||||
and sha512s = SM.add sha512 name t.sha512s
|
|
||||||
in
|
|
||||||
t.md5s <- md5s ; t.sha512s <- sha512s;
|
|
||||||
Logs.info (fun m -> m "added %s" name);
|
|
||||||
Lwt.return_unit
|
|
||||||
end else begin
|
|
||||||
Logs.err (fun m -> m "corrupt data, expected %s, read %s (should remove)"
|
|
||||||
name (hex_to_string (Cstruct.to_string digest)));
|
|
||||||
(*KV.remove dev (Mirage_kv.Key.v name) >|= function
|
|
||||||
| Ok () -> ()
|
|
||||||
| Error e ->
|
|
||||||
Logs.err (fun m -> m "error %a while removing %s"
|
|
||||||
KV.pp_write_error e (key_to_string t name)) *)
|
|
||||||
Lwt.return_unit
|
|
||||||
end
|
|
||||||
| Error e ->
|
| Error e ->
|
||||||
Logs.err (fun m -> m "error %a reading %s"
|
Logs.err (fun m -> m "error %a reading %s"
|
||||||
KV.pp_error e name);
|
KV.pp_error e name);
|
||||||
Lwt.return_unit)
|
Error ()
|
||||||
entries >|= fun () ->
|
| Ok data ->
|
||||||
|
let cs = Cstruct.of_string data in
|
||||||
|
_data := Some cs;
|
||||||
|
Ok cs
|
||||||
|
in
|
||||||
|
begin
|
||||||
|
if verify then begin
|
||||||
|
read_data () >|?= fun cs ->
|
||||||
|
let digest = Mirage_crypto.Hash.digest `SHA256 cs in
|
||||||
|
if not (String.equal name (to_hex digest)) then
|
||||||
|
Logs.err (fun m -> m "corrupt data, expected %s, read %s (should remove)"
|
||||||
|
name (hex_to_string (Cstruct.to_string digest)));
|
||||||
|
end else
|
||||||
|
Lwt.return_unit
|
||||||
|
end >>= fun () ->
|
||||||
|
begin
|
||||||
|
if not (SSet.mem name md5s) then begin
|
||||||
|
read_data () >|?= fun cs ->
|
||||||
|
let md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex in
|
||||||
|
let md5s = SM.add md5 name t.md5s in
|
||||||
|
t.md5s <- md5s
|
||||||
|
end else
|
||||||
|
Lwt.return_unit
|
||||||
|
end >>= fun () ->
|
||||||
|
begin
|
||||||
|
if not (SSet.mem name sha512s) then begin
|
||||||
|
read_data () >|?= fun cs ->
|
||||||
|
let sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex in
|
||||||
|
let sha512s = SM.add sha512 name t.sha512s in
|
||||||
|
t.sha512s <- sha512s
|
||||||
|
end else
|
||||||
|
Lwt.return_unit
|
||||||
|
end >|= fun () ->
|
||||||
|
Logs.info (fun m -> m "added %s" name))
|
||||||
|
entries >>= fun () ->
|
||||||
|
update_caches t >|= fun () ->
|
||||||
t
|
t
|
||||||
|
|
||||||
let write t ~url data hm =
|
let write t ~url data hm =
|
||||||
let cs = Cstruct.of_string data in
|
let cs = Cstruct.of_string data in
|
||||||
let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> key t
|
let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex
|
||||||
and md5 = Mirage_crypto.Hash.digest `MD5 cs |> key t
|
and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex
|
||||||
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> key t
|
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex
|
||||||
in
|
in
|
||||||
if
|
if
|
||||||
HM.for_all (fun h v ->
|
HM.for_all (fun h v ->
|
||||||
|
@ -500,7 +557,7 @@ stamp: %S
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
respond_with_empty reqd resp
|
respond_with_empty reqd resp
|
||||||
else *)
|
else *)
|
||||||
let dispatch t store hook_url git_kv update _flow _conn reqd =
|
let dispatch t store hook_url update _flow _conn reqd =
|
||||||
let request = Httpaf.Reqd.request reqd in
|
let request = Httpaf.Reqd.request reqd in
|
||||||
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
||||||
match String.split_on_char '/' request.Httpaf.Request.target with
|
match String.split_on_char '/' request.Httpaf.Request.target with
|
||||||
|
@ -629,16 +686,24 @@ stamp: %S
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end
|
end
|
||||||
| _ -> Lwt.return_unit)
|
| _ -> Lwt.return_unit)
|
||||||
(SM.bindings urls) >|= fun () ->
|
(SM.bindings urls) >>= fun () ->
|
||||||
|
Disk.update_caches disk >|= fun () ->
|
||||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||||
|
|
||||||
module Paf = Paf_mirage.Make(Time)(Stack.TCP)
|
module Paf = Paf_mirage.Make(Time)(Stack.TCP)
|
||||||
|
|
||||||
let start block _time _pclock stack git_ctx http_ctx =
|
let start block _time _pclock stack git_ctx http_ctx =
|
||||||
KV.connect block >>= fun kv ->
|
BLOCK.get_info block >>= fun info ->
|
||||||
|
let sectors_cache = Key_gen.sectors_cache () in
|
||||||
|
Part.connect Int64.(sub info.size_sectors (mul 2L sectors_cache)) block >>= fun (b1, rest) ->
|
||||||
|
let b2, b3 = Part.subpartition sectors_cache rest in
|
||||||
|
KV.connect b1 >>= fun kv ->
|
||||||
|
Cache.connect b2 >>= fun md5s ->
|
||||||
|
Cache.connect b3 >>= fun sha512s ->
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
Disk.init kv >>= fun disk ->
|
Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk ->
|
||||||
if Key_gen.check () then Lwt.return_unit
|
if Key_gen.check () then
|
||||||
|
Lwt.return_unit
|
||||||
else
|
else
|
||||||
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
|
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
|
||||||
Serve.commit_id git_kv >>= fun commit_id ->
|
Serve.commit_id git_kv >>= fun commit_id ->
|
||||||
|
@ -653,7 +718,7 @@ stamp: %S
|
||||||
let service =
|
let service =
|
||||||
Paf.http_service
|
Paf.http_service
|
||||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
(Serve.dispatch serve disk (Key_gen.hook_url ()) git_kv update)
|
(Serve.dispatch serve disk (Key_gen.hook_url ()) update)
|
||||||
in
|
in
|
||||||
let `Initialized th = Paf.serve service t in
|
let `Initialized th = Paf.serve service t in
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
||||||
|
|
Loading…
Reference in a new issue