more wip
This commit is contained in:
parent
ebeadf69d8
commit
fd8ce3be03
1 changed files with 124 additions and 99 deletions
|
@ -18,9 +18,7 @@ module Make
|
|||
module SM = Map.Make(String)
|
||||
module SSet = Set.Make(String)
|
||||
|
||||
module HM = Map.Make(struct
|
||||
type t = Mirage_crypto.Hash.hash
|
||||
let compare h h' =
|
||||
let compare_hash h h' =
|
||||
match h, h' with
|
||||
| `SHA512, `SHA512 -> 0
|
||||
| `SHA512, _ -> 1
|
||||
|
@ -38,6 +36,10 @@ module Make
|
|||
| `SHA1, `MD5 -> 1
|
||||
| `MD5, `MD5 -> 0
|
||||
| `MD5, _ -> -1
|
||||
|
||||
module HM = Map.Make(struct
|
||||
type t = Mirage_crypto.Hash.hash
|
||||
let compare = compare_hash
|
||||
end)
|
||||
|
||||
let hash_to_string = function
|
||||
|
@ -312,6 +314,35 @@ module Make
|
|||
in
|
||||
read_more a Optint.Int63.zero
|
||||
|
||||
(*
|
||||
module HM_running = struct
|
||||
|
||||
let empty h =
|
||||
let module H = (val Mirage_crypto.Hash.module_of h) in
|
||||
(* We need MD5, SHA256 and SHA512. [h] is likely one of the
|
||||
aforementioned and in that case we don't compute the same hash twice
|
||||
*)
|
||||
HM.empty
|
||||
|> HM.add `MD5 Mirage_crypto.Hash.MD5.empty
|
||||
|> HM.add `SHA256 Mirage_crypto.Hash.SHA256.empty
|
||||
|> HM.add `SHA512 Mirage_crypto.Hash.SHA512.empty
|
||||
|> HM.add h H.empty
|
||||
|
||||
let feed t data =
|
||||
HM.map (fun h v ->
|
||||
let module H = (val Mirage_crypto.Hash.module_of h) in
|
||||
H.feed v data)
|
||||
t
|
||||
|
||||
let get =
|
||||
HM.map (fun h v ->
|
||||
let module H = (val Mirage_crypto.Hash.module_of h) in
|
||||
H.get v)
|
||||
|
||||
|
||||
end
|
||||
*)
|
||||
|
||||
module Running_hash = struct
|
||||
type _ t =
|
||||
| MD5 : Mirage_crypto.Hash.MD5.t -> [> `MD5 ] t
|
||||
|
@ -378,7 +409,7 @@ module Make
|
|||
|
||||
let init_write csums =
|
||||
let hash, csum = HM.max_binding csums in
|
||||
(hash, csum), Ok (empty_digests hash, Optint.Int63.zero, "")
|
||||
(hash, csum), Ok (empty_digests hash, `Init)
|
||||
|
||||
let content_length_of_string s =
|
||||
match Int64.of_string s with
|
||||
|
@ -423,29 +454,37 @@ module Make
|
|||
in
|
||||
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
|
||||
|
||||
let write_partial t (hash, csum) : _ -> (_ * Optint.Int63.t * string, _) result -> string -> _ result Lwt.t =
|
||||
let write_partial t (hash, csum) =
|
||||
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
||||
let key = pending_key (hash, csum) in
|
||||
let ( >>>= ) = Lwt_result.bind in
|
||||
fun response r data ->
|
||||
Lwt.return r >>>= fun (digests, offset, body) ->
|
||||
let len = String.length data in
|
||||
match body_length response with
|
||||
Lwt.return r >>>= fun (digests, acc) ->
|
||||
let digests = update_digests digests data in
|
||||
match acc with
|
||||
| `Init ->
|
||||
begin match body_length response with
|
||||
| `Bad_response -> Lwt.return (Error `Bad_response)
|
||||
| `Fixed size ->
|
||||
begin if Optint.Int63.equal offset Optint.Int63.zero then
|
||||
KV.allocate t.dev key (Optint.Int63.of_int64 size)
|
||||
|> Lwt_result.map_error (fun e -> `Write_error e)
|
||||
else
|
||||
Lwt.return (Ok ())
|
||||
end >>>= fun () ->
|
||||
>>>= fun () ->
|
||||
KV.set_partial t.dev key ~offset:Optint.Int63.zero data
|
||||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||
let len = String.length data in
|
||||
let offset = Optint.Int63.of_int len in
|
||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||
| `Unknown ->
|
||||
Lwt.return_ok (digests, `Unknown data)
|
||||
end
|
||||
| `Fixed_body (size, offset) ->
|
||||
KV.set_partial t.dev key ~offset data
|
||||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||
let digests = update_digests digests data in
|
||||
Lwt.return_ok (digests, Optint.Int63.(add offset (of_int len)), body)
|
||||
| `Unknown ->
|
||||
let digests = update_digests digests data in
|
||||
Lwt.return_ok (digests, Optint.Int63.(add offset (of_int len)), body ^ data)
|
||||
let len = String.length data in
|
||||
let offset = Optint.Int63.(add offset (of_int len)) in
|
||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||
| `Unknown body ->
|
||||
Lwt.return_ok (digests, `Unknown (body ^ data))
|
||||
|
||||
let digests_to_hm digests =
|
||||
HM.empty
|
||||
|
@ -464,40 +503,57 @@ module Make
|
|||
(fun (h, csum) -> String.equal csum (HM.find h csums))
|
||||
common_bindings
|
||||
|
||||
let finalize_write t (hash, csum) csums digests =
|
||||
let source = pending_key (hash, csum) in
|
||||
if check_csums_digests csums digests then
|
||||
let finalize_write t (hash, csum) ~url body csums digests =
|
||||
let sizes_match, body_size_in_header =
|
||||
match body with
|
||||
| `Fixed_body (reported, actual) -> Optint.Int63.(equal (of_int reported) actual), true
|
||||
| `Unknown _ -> true, false
|
||||
in
|
||||
if check_csums_digests csums digests && sizes_match then
|
||||
let sha256 = to_hex (Mirage_crypto.Hash.SHA256.get digests.sha256)
|
||||
and md5 = to_hex (Mirage_crypto.Hash.MD5.get digests.md5)
|
||||
and sha512 = to_hex (Mirage_crypto.Hash.SHA512.get digests.sha512) in
|
||||
let dest = Mirage_kv.Key.v sha256 in
|
||||
KV.rename t.dev ~source ~dest >|= function
|
||||
begin match body with
|
||||
| `Unknown body ->
|
||||
Logs.info (fun m -> m "downloaded %s, now writing" url);
|
||||
KV.set t.dev dest body
|
||||
| `Fixed_body (reported_size, actual_size) ->
|
||||
Logs.info (fun m -> m "downloaded %s" url);
|
||||
let source = pending_key (hash, csum) in
|
||||
KV.rename t.dev ~source ~dest
|
||||
end >|= function
|
||||
| Ok () ->
|
||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||
t.sha512s <- SM.add sha512 sha256 t.sha512s;
|
||||
Ok ()
|
||||
| Error e -> Error (`Write_error e)
|
||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e)
|
||||
else begin
|
||||
if sizes_match then
|
||||
Logs.err (fun m -> m "Bad checksum %s: computed %s expected %s" url
|
||||
(hash_to_string hash) (hex_to_string csum))
|
||||
else
|
||||
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %a bytes" url
|
||||
Optint.Int63.pp actual Optint.Int63.pp reported);
|
||||
if body_size_in_header then
|
||||
(* if the checksums mismatch we want to delete the file. We are only
|
||||
able to do so if it was the latest created file, so we expect and
|
||||
error. Ideally, we want to match for `Append_only or other errors *)
|
||||
KV.remove t.dev source >>= function
|
||||
| Ok () ->
|
||||
Logs.info (fun m -> m "Removed %a" Mirage_kv.Key.pp source);
|
||||
Lwt_result.fail (`Bad_checksum (hash, csum))
|
||||
| Ok () -> Lwt.return_unit
|
||||
| Error e ->
|
||||
Logs.debug (fun m -> m "Failed to remove %a: %a"
|
||||
Mirage_kv.Key.pp source KV.pp_write_error e);
|
||||
(* we failed to delete the file so we mark it for deletion *)
|
||||
let dest = to_delete_key (hash, csum) in
|
||||
Logs.warn (fun m -> m "Failed to remove %a: %a. Moving it to %a"
|
||||
Mirage_kv.Key.pp source KV.pp_write_error e Mirage_kv.Key.pp dest);
|
||||
KV.rename t.dev ~source ~dest >|= function
|
||||
| Ok () -> Error (`Bad_checksum (hash, csum))
|
||||
| Ok () -> ()
|
||||
| Error e ->
|
||||
Logs.warn (fun m -> m "Error renaming file %a -> %a: %a"
|
||||
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e);
|
||||
Error (`Bad_checksum (hash, csum))
|
||||
Mirage_kv.Key.pp source Mirage_kv.Key.pp dest KV.pp_write_error e)
|
||||
else
|
||||
Lwt.return_unit
|
||||
end
|
||||
|
||||
|
||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||
|
@ -593,28 +649,6 @@ module Make
|
|||
update_caches t >|= fun () ->
|
||||
t
|
||||
|
||||
let write t ~url data hm digests =
|
||||
let cs = Cstruct.of_string data in
|
||||
let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> to_hex
|
||||
and md5 = Mirage_crypto.Hash.digest `MD5 cs |> to_hex
|
||||
and sha512 = Mirage_crypto.Hash.digest `SHA512 cs |> to_hex
|
||||
in
|
||||
if check_csums_digests hm digests
|
||||
then begin
|
||||
KV.set t.dev (Mirage_kv.Key.v sha256) data >|= function
|
||||
| Ok () ->
|
||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||
t.sha512s <- SM.add sha512 sha256 t.sha512s;
|
||||
Logs.debug (fun m -> m "wrote %s (%d bytes)" sha256
|
||||
(String.length data))
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a while writing %s (key %s)"
|
||||
KV.pp_write_error e url sha256)
|
||||
end else begin
|
||||
Logs.err (fun m -> m "Bad checksum for %s" url);
|
||||
Lwt.return_unit
|
||||
end
|
||||
|
||||
let exists t h v =
|
||||
match find_key t h v with
|
||||
| Error _ -> Lwt.return false
|
||||
|
@ -952,26 +986,17 @@ stamp: %S
|
|||
let quux, body_init = Disk.init_write csums in
|
||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
|
||||
| Ok (resp, r) ->
|
||||
begin match Disk.body_length resp, r with
|
||||
| `Bad_response, _ | _, Error `Bad_response ->
|
||||
begin match r with
|
||||
| Error `Bad_response ->
|
||||
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
||||
url H2.Status.pp_hum resp.status resp.reason);
|
||||
Lwt.return_unit
|
||||
| _, Error `Write_error e ->
|
||||
Logs.warn (fun m -> m "%s: write error %a"
|
||||
| Error `Write_error e ->
|
||||
Logs.err (fun m -> m "%s: write error %a"
|
||||
url KV.pp_write_error e);
|
||||
Lwt.return_unit
|
||||
| `Unknown, Ok (digests, _, body) ->
|
||||
Logs.info (fun m -> m "downloaded %s, now writing..." url);
|
||||
Disk.write disk ~url body csums digests
|
||||
| `Fixed _size, Ok (digests, _, _) ->
|
||||
Logs.info (fun m -> m "downloaded %s" url);
|
||||
Disk.finalize_write disk quux csums digests >|= function
|
||||
| Ok () -> ()
|
||||
| Error (`Write_error e) ->
|
||||
Logs.warn (fun m -> m "Error writing %s: %a" url KV.pp_write_error e)
|
||||
| Error `Bad_checksum (hash, csum) ->
|
||||
Logs.err (fun m -> m "%s hash mismatch, expected %s:%s" url (hash_to_string hash) (hex_to_string csum))
|
||||
| Ok (digests, body) ->
|
||||
Disk.finalize_write disk quux ~url body csums digests
|
||||
end
|
||||
| _ -> Lwt.return_unit)
|
||||
(SM.bindings urls) >>= fun () ->
|
||||
|
|
Loading…
Reference in a new issue