Compare commits
4 commits
9bb86b3507
...
f9620e9011
Author | SHA1 | Date | |
---|---|---|---|
f9620e9011 | |||
53af2665fa | |||
f40083692a | |||
eb95821b2e |
1 changed files with 101 additions and 8 deletions
|
@ -154,6 +154,34 @@ module Make
|
|||
SM.empty opam_paths
|
||||
end
|
||||
|
||||
let active_downloads = ref SM.empty
|
||||
|
||||
let add_to_active url ts =
|
||||
active_downloads := SM.add url (ts, 0, "unknown size") !active_downloads
|
||||
|
||||
let remove_active url =
|
||||
active_downloads := SM.remove url !active_downloads
|
||||
|
||||
let active_length url written length =
|
||||
match SM.find_opt url !active_downloads with
|
||||
| None -> ()
|
||||
| Some (ts, written', _) ->
|
||||
active_downloads := SM.add url (ts, written + written', length)
|
||||
!active_downloads
|
||||
|
||||
let active_add_bytes url written =
|
||||
match SM.find_opt url !active_downloads with
|
||||
| None -> ()
|
||||
| Some (ts, written', l) ->
|
||||
active_downloads := SM.add url (ts, written + written', l)
|
||||
!active_downloads
|
||||
|
||||
let failed_downloads = ref SM.empty
|
||||
|
||||
let add_failed url ts reason =
|
||||
remove_active url;
|
||||
failed_downloads := SM.add url (ts, reason) !failed_downloads
|
||||
|
||||
module Disk = struct
|
||||
type t = {
|
||||
mutable md5s : string SM.t ;
|
||||
|
@ -169,7 +197,6 @@ module Make
|
|||
|
||||
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
||||
|
||||
|
||||
let marshal_sm (sm : string SM.t) =
|
||||
let version = char_of_int 1 in
|
||||
String.make 1 version ^ Marshal.to_string sm []
|
||||
|
@ -305,7 +332,7 @@ module Make
|
|||
in
|
||||
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
|
||||
|
||||
let write_partial t (hash, csum) =
|
||||
let write_partial t (hash, csum) url =
|
||||
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
||||
let key = pending_key (hash, csum) in
|
||||
let ( >>>= ) = Lwt_result.bind in
|
||||
|
@ -324,8 +351,10 @@ module Make
|
|||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||
let len = String.length data in
|
||||
let offset = Optint.Int63.of_int len in
|
||||
active_length url len (Int64.to_string size ^ " bytes");
|
||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||
| `Unknown ->
|
||||
active_add_bytes url (String.length data);
|
||||
Lwt.return_ok (digests, `Unknown data)
|
||||
end
|
||||
| `Fixed_body (size, offset) ->
|
||||
|
@ -333,8 +362,10 @@ module Make
|
|||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||
let len = String.length data in
|
||||
let offset = Optint.Int63.(add offset (of_int len)) in
|
||||
active_add_bytes url len;
|
||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||
| `Unknown body ->
|
||||
active_add_bytes url (String.length data);
|
||||
Lwt.return_ok (digests, `Unknown (body ^ data))
|
||||
|
||||
let check_csums_digests csums digests =
|
||||
|
@ -368,18 +399,29 @@ module Make
|
|||
| `Init -> assert false
|
||||
end >|= function
|
||||
| Ok () ->
|
||||
remove_active url;
|
||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e)
|
||||
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e);
|
||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||
(Fmt.str "Write failure for %s: %a" url KV.pp_write_error e)
|
||||
else begin
|
||||
(if sizes_match then
|
||||
(if sizes_match then begin
|
||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url
|
||||
(hash_to_string hash)
|
||||
(Ohex.encode (Archive_checksum.get digests hash))
|
||||
(Ohex.encode csum));
|
||||
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
|
||||
(hash_to_string hash)
|
||||
(Ohex.encode (Archive_checksum.get digests hash))
|
||||
(Ohex.encode csum))
|
||||
else match body with
|
||||
end else match body with
|
||||
| `Fixed_body (reported, actual) ->
|
||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
|
||||
url Optint.Int63.pp actual reported);
|
||||
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
|
||||
url Optint.Int63.pp actual reported)
|
||||
| `Unknown _ -> assert false
|
||||
|
@ -404,7 +446,6 @@ module Make
|
|||
Lwt.return_unit
|
||||
end
|
||||
|
||||
|
||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||
let init ~verify_sha256 dev dev_md5s dev_sha512s =
|
||||
KV.list dev Mirage_kv.Key.empty >>= function
|
||||
|
@ -682,6 +723,40 @@ stamp: %S
|
|||
t.index <- index;
|
||||
Some changes)
|
||||
|
||||
let status disk =
|
||||
(* report status:
|
||||
- archive size (can we easily measure?) and number of "good" elements
|
||||
- list of current downloads
|
||||
- list of failed downloads
|
||||
*)
|
||||
let archive_stats =
|
||||
Fmt.str "<ul><li>%u validated archives on disk</li><li>%Lu bytes free</li></ul>"
|
||||
(SM.cardinal disk.Disk.md5s)
|
||||
(KV.free disk.Disk.dev)
|
||||
in
|
||||
let active_downloads =
|
||||
let header = "<h2>Active downloads</h2><ul>" in
|
||||
let content =
|
||||
SM.fold (fun url (ts, bytes_written, length_or_unknown) acc ->
|
||||
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to disk, " ^ length_or_unknown ^ "</li>")
|
||||
^ acc)
|
||||
!active_downloads ""
|
||||
in
|
||||
header ^ content ^ "</ul>"
|
||||
and failed_downloads =
|
||||
let header = "<h2>Failed downloads</h2><ul>" in
|
||||
let content =
|
||||
SM.fold (fun url (ts, reason) acc ->
|
||||
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ reason ^ "</li>")
|
||||
^ acc)
|
||||
!failed_downloads ""
|
||||
in
|
||||
header ^ content ^ "</ul>"
|
||||
in
|
||||
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>"
|
||||
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ]
|
||||
^ "</div></body></html>"
|
||||
|
||||
let not_modified request (modified, etag) =
|
||||
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
||||
| Some ts -> String.equal ts modified
|
||||
|
@ -734,6 +809,16 @@ stamp: %S
|
|||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
Httpaf.Reqd.respond_with_string reqd resp data
|
||||
| [ ""; x ] when String.equal x "status" ->
|
||||
let data = status store in
|
||||
let mime_type = "text/html" in
|
||||
let headers = [
|
||||
"content-type", mime_type ;
|
||||
"content-length", string_of_int (String.length data) ;
|
||||
] in
|
||||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
Httpaf.Reqd.respond_with_string reqd resp data
|
||||
| [ ""; "repo" ] ->
|
||||
if not_modified request (t.modified, t.commit_id) then
|
||||
let resp = Httpaf.Response.create `Not_modified in
|
||||
|
@ -843,23 +928,31 @@ stamp: %S
|
|||
if !idx mod 10 = 0 then Gc.full_major () ;
|
||||
Logs.info (fun m -> m "downloading %s" url);
|
||||
let quux, body_init = Archive_checksum.init_write csums in
|
||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
|
||||
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
||||
| Ok (resp, r) ->
|
||||
begin match r with
|
||||
| Error `Bad_response ->
|
||||
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
||||
url H2.Status.pp_hum resp.status resp.reason);
|
||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason);
|
||||
Lwt.return_unit
|
||||
| Error `Write_error e ->
|
||||
Logs.err (fun m -> m "%s: write error %a %a"
|
||||
url
|
||||
Mirage_kv.Key.pp (Disk.pending_key quux)
|
||||
KV.pp_write_error e);
|
||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||
(Fmt.str "write error: %a" KV.pp_write_error e);
|
||||
Lwt.return_unit
|
||||
| Ok (digests, body) ->
|
||||
Disk.finalize_write disk quux ~url body csums digests
|
||||
end
|
||||
| _ -> Lwt.return_unit)
|
||||
| Error me ->
|
||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||
(Fmt.str "mimic error: %a" Mimic.pp_error me);
|
||||
Lwt.return_unit)
|
||||
(SM.bindings urls) >>= fun () ->
|
||||
Disk.update_caches disk >|= fun () ->
|
||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||
|
|
Loading…
Reference in a new issue