read and dump to a kv, taking csums into account
This commit is contained in:
parent
6fe03ce867
commit
a61f944d4f
2 changed files with 179 additions and 13 deletions
|
@ -32,7 +32,7 @@ let mirror =
|
|||
package ~min:"3.7.0" "git-paf" ;
|
||||
package "opam-file-format" ;
|
||||
]
|
||||
(time @-> pclock @-> stackv4v6 @-> dns_client @-> paf @-> git_client @-> job)
|
||||
(kv_rw @-> time @-> pclock @-> stackv4v6 @-> dns_client @-> paf @-> git_client @-> job)
|
||||
|
||||
let paf time stackv4v6 = paf_conf () $ time $ tcpv4v6_of_stackv4v6 stackv4v6
|
||||
|
||||
|
@ -48,4 +48,4 @@ let git_client =
|
|||
(git_http ~authenticator:tls_authenticator tcp git)
|
||||
|
||||
let () = register "mirror"
|
||||
[ mirror $ default_time $ default_posix_clock $ stack $ dns $ paf default_time stack $ git_client ]
|
||||
[ mirror $ kv_rw_mem () $ default_time $ default_posix_clock $ stack $ dns $ paf default_time stack $ git_client ]
|
||||
|
|
|
@ -10,6 +10,7 @@ open Lwt.Infix
|
|||
let argument_error = 64
|
||||
|
||||
module Make
|
||||
(KV : Mirage_kv.RW)
|
||||
(Time : Mirage_time.S)
|
||||
(Pclock : Mirage_clock.PCLOCK)
|
||||
(Stack : Tcpip.Stack.V4V6)
|
||||
|
@ -86,6 +87,11 @@ module Make
|
|||
let `Hex h = Hex.of_string h in
|
||||
h
|
||||
|
||||
let hex_of_string s =
|
||||
match Hex.to_string (`Hex s) with
|
||||
| d -> Ok d
|
||||
| exception Invalid_argument err -> Error (`Msg err)
|
||||
|
||||
let hm_to_s hm =
|
||||
HM.fold (fun h v acc ->
|
||||
hash_to_string h ^ "=" ^ hex_to_string v ^ "\n" ^ acc)
|
||||
|
@ -139,9 +145,10 @@ module Make
|
|||
|
||||
let decode_digest filename str =
|
||||
let hex h s =
|
||||
match Hex.to_string (`Hex s) with
|
||||
| d -> Some (h, d)
|
||||
| exception Invalid_argument _ -> Logs.warn (fun m -> m "%s invalid hex %s" filename s); None
|
||||
match hex_of_string s with
|
||||
| Ok d -> Some (h, d)
|
||||
| Error `Msg msg ->
|
||||
Logs.warn (fun m -> m "%s invalid hex (%s) %s" filename msg s); None
|
||||
in
|
||||
match String.split_on_char '=' str with
|
||||
| [ data ] -> hex `MD5 data
|
||||
|
@ -260,18 +267,142 @@ module Make
|
|||
acc)
|
||||
| None -> acc)
|
||||
SM.empty opam_paths >|= fun urls ->
|
||||
Logs.info (fun m -> m "map contains %d urls" (SM.cardinal urls))
|
||||
(* SM.iter (fun url csums -> Logs.info (fun m -> m "%s: %s" url (hm_to_s csums))) urls *)
|
||||
Logs.info (fun m -> m "map contains %d urls" (SM.cardinal urls));
|
||||
urls
|
||||
end
|
||||
|
||||
let start _time _pclock stack dns _paf_cohttp git_ctx =
|
||||
module Disk = struct
|
||||
type t = {
|
||||
mutable md5s : string SM.t ;
|
||||
mutable sha512s : string SM.t ;
|
||||
dev : KV.t ;
|
||||
}
|
||||
|
||||
let empty dev = { md5s = SM.empty ; sha512s = SM.empty ; dev }
|
||||
|
||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||
(* on startup, we read + validate all data, and also store in the overlays (md5/sha512) the pointers *)
|
||||
(* the read can be md5/sha256/sha512 sum, and will output the data requested *)
|
||||
(* a write will compute the hashes and save the data (also validating potential other hashes) *)
|
||||
let init dev =
|
||||
KV.list dev Mirage_kv.Key.empty >>= function
|
||||
| Error e -> Logs.err (fun m -> m "error %a listing kv" KV.pp_error e); assert false
|
||||
| Ok entries ->
|
||||
let t = empty dev in
|
||||
Lwt_list.iter_s (fun (name, typ) ->
|
||||
match typ with
|
||||
| `Dictionary ->
|
||||
Logs.warn (fun m -> m "unexpected dictionary at %s" name);
|
||||
Lwt.return_unit
|
||||
| `Value ->
|
||||
KV.get dev (Mirage_kv.Key.v name) >>= function
|
||||
| Ok data ->
|
||||
let cs = Cstruct.of_string data in
|
||||
let digest = Mirage_crypto.Hash.digest `SHA256 cs in
|
||||
if Cstruct.equal digest (Cstruct.of_string name) then
|
||||
let md5 = Mirage_crypto.Hash.digest `MD5 cs
|
||||
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs
|
||||
in
|
||||
let md5s = SM.add (Cstruct.to_string md5) name t.md5s
|
||||
and sha512s = SM.add (Cstruct.to_string sha512) name t.sha512s
|
||||
in
|
||||
t.md5s <- md5s ; t.sha512s <- sha512s;
|
||||
Lwt.return_unit
|
||||
else begin
|
||||
Logs.err (fun m -> m "corrupt data, expected %s, read %s"
|
||||
(hex_to_string name)
|
||||
(hex_to_string (Cstruct.to_string digest)));
|
||||
KV.remove dev (Mirage_kv.Key.v name) >|= function
|
||||
| Ok () -> ()
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a while removing %s"
|
||||
KV.pp_write_error e (hex_to_string name))
|
||||
end
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a reading %s"
|
||||
KV.pp_error e (hex_to_string name));
|
||||
Lwt.return_unit)
|
||||
entries >|= fun () ->
|
||||
t
|
||||
|
||||
let write t data hm =
|
||||
let cs = Cstruct.of_string data in
|
||||
let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> Cstruct.to_string
|
||||
and md5 = Mirage_crypto.Hash.digest `MD5 cs |> Cstruct.to_string
|
||||
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> Cstruct.to_string
|
||||
in
|
||||
if
|
||||
HM.for_all (fun h v ->
|
||||
let v' =
|
||||
match h with `MD5 -> md5 | `SHA256 -> sha256 | `SHA512 -> sha512 | _ -> assert false
|
||||
in
|
||||
if String.equal v v' then
|
||||
true
|
||||
else begin
|
||||
Logs.err (fun m -> m "hash mismatch %s: expected %s, got %s"
|
||||
(hash_to_string h) (hex_to_string v) (hex_to_string v'));
|
||||
false
|
||||
end) hm
|
||||
then
|
||||
KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function
|
||||
| Ok () ->
|
||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||
t.sha512s <- SM.add sha512 sha256 t.sha512s;
|
||||
Logs.info (fun m -> m "wrote %s (%d bytes)" (hex_to_string sha256)
|
||||
(String.length data))
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a while writing %s"
|
||||
KV.pp_write_error e (hex_to_string sha256))
|
||||
else
|
||||
Lwt.return_unit
|
||||
|
||||
let read t h v =
|
||||
match hex_of_string v with
|
||||
| Error `Msg msg ->
|
||||
Logs.err (fun m -> m "error %s while decoding hex %s" msg v);
|
||||
Lwt.return (Error `Bad_request)
|
||||
| Ok bin ->
|
||||
match
|
||||
match h with
|
||||
| `MD5 -> SM.find_opt bin t.md5s
|
||||
| `SHA512 -> SM.find_opt bin t.sha512s
|
||||
| `SHA256 -> Some bin
|
||||
with
|
||||
| None ->
|
||||
Logs.err (fun m -> m "couldn't find %s" v);
|
||||
Lwt.return (Error `Not_found)
|
||||
| Some x ->
|
||||
KV.get t.dev (Mirage_kv.Key.v x) >|= function
|
||||
| Ok data -> Ok data
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a while reading %s %s"
|
||||
KV.pp_error e (hash_to_string h) v);
|
||||
Error `Not_found
|
||||
end
|
||||
|
||||
let resolve_location ~uri ~location =
|
||||
match String.split_on_char '/' location with
|
||||
| "http:" :: "" :: _ -> Ok location
|
||||
| "https:" :: "" :: _ -> Ok location
|
||||
| "" :: "" :: _ ->
|
||||
let schema = String.sub uri 0 (String.index uri '/') in
|
||||
Ok (schema ^ location)
|
||||
| "" :: _ ->
|
||||
(match String.split_on_char '/' uri with
|
||||
| schema :: "" :: user_pass_host_port :: _ ->
|
||||
Ok (String.concat "/" [schema ; "" ; user_pass_host_port ^ location])
|
||||
| _ -> Error (`Msg ("expected an absolute uri, got: " ^ uri)))
|
||||
| _ -> Error (`Msg ("unknown location (relative path): " ^ location))
|
||||
|
||||
let start kv _time _pclock stack dns _paf_cohttp git_ctx =
|
||||
Git.connect git_ctx >>= fun (store, upstream) ->
|
||||
Git.pull store upstream >>= function
|
||||
| Error `Msg msg -> Lwt.fail_with msg
|
||||
| Ok msg ->
|
||||
Logs.info (fun m -> m "store: %s" msg);
|
||||
Git.find_urls store >|= fun () ->
|
||||
let _ctx =
|
||||
Logs.info (fun m -> m "git: %s" msg);
|
||||
Git.find_urls store >>= fun urls ->
|
||||
Disk.init kv >>= fun disk ->
|
||||
let ctx =
|
||||
Mimic.empty
|
||||
|> with_sleep
|
||||
|> with_tcp (* stack -> ipaddr -> port => (stack * ipaddr * port) *)
|
||||
|
@ -279,7 +410,42 @@ module Make
|
|||
|> with_resolv (* domain_name => ipaddr *)
|
||||
|> with_stack stack (* stack *)
|
||||
|> with_dns dns (* dns *) in
|
||||
(* Client.get ~ctx uri >>= fun (_resp, body) ->
|
||||
Cohttp_lwt.Body.to_string body >|= fun str -> *)
|
||||
let rec follow count uri =
|
||||
if count = 0 then begin
|
||||
Logs.err (fun m -> m "redirection limit exceeded");
|
||||
Lwt.return None
|
||||
end else begin
|
||||
Logs.info (fun m -> m "retrieving %s" uri);
|
||||
Client.get ~ctx (Uri.of_string uri) >>= fun (resp, body) ->
|
||||
match resp.Cohttp.Response.status with
|
||||
| `OK ->
|
||||
Cohttp_lwt.Body.to_string body >|= fun str ->
|
||||
Some str
|
||||
| #Cohttp.Code.redirection_status ->
|
||||
begin match Cohttp.Header.get resp.Cohttp.Response.headers "location" with
|
||||
| Some location ->
|
||||
begin match resolve_location ~uri ~location with
|
||||
| Error `Msg msg ->
|
||||
Logs.err (fun m -> m "error %s resolving redirect location %s"
|
||||
msg location);
|
||||
Lwt.return None
|
||||
| Ok new_uri -> follow (pred count) new_uri
|
||||
end
|
||||
| None ->
|
||||
Logs.err (fun m -> m "redirect without location");
|
||||
Lwt.return None
|
||||
end
|
||||
| s ->
|
||||
Logs.err (fun m -> m "error %s while fetching %s"
|
||||
(Cohttp.Code.string_of_status resp.Cohttp.Response.status)
|
||||
uri);
|
||||
Lwt.return None
|
||||
end
|
||||
in
|
||||
Lwt_list.iter_p (fun (url, csums) ->
|
||||
follow 20 url >>= function
|
||||
| Some str -> Disk.write disk str csums
|
||||
| None -> Lwt.return_unit)
|
||||
(SM.bindings urls) >|= fun () ->
|
||||
Logs.info (fun m -> m "done")
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue