Add a download status page, recording: #15
1 changed files with 94 additions and 7 deletions
|
@ -154,6 +154,34 @@ module Make
|
||||||
SM.empty opam_paths
|
SM.empty opam_paths
|
||||||
end
|
end
|
||||||
|
|
||||||
|
let active_downloads = ref SM.empty
|
||||||
|
|
||||||
|
let add_to_active url ts =
|
||||||
|
active_downloads := SM.add url (ts, 0, "unknown size") !active_downloads
|
||||||
|
|
||||||
|
let remove_active url =
|
||||||
|
active_downloads := SM.remove url !active_downloads
|
||||||
|
|
||||||
|
let active_length url written length =
|
||||||
|
match SM.find_opt url !active_downloads with
|
||||||
|
| None -> ()
|
||||||
|
| Some (ts, written', _) ->
|
||||||
|
active_downloads := SM.add url (ts, written + written', length)
|
||||||
|
!active_downloads
|
||||||
|
|
||||||
|
let active_add_bytes url written =
|
||||||
|
match SM.find_opt url !active_downloads with
|
||||||
|
| None -> ()
|
||||||
|
| Some (ts, written', l) ->
|
||||||
|
active_downloads := SM.add url (ts, written + written', l)
|
||||||
|
!active_downloads
|
||||||
|
|
||||||
|
let failed_downloads = ref SM.empty
|
||||||
|
|
||||||
|
let add_failed url ts reason =
|
||||||
|
remove_active url;
|
||||||
|
failed_downloads := SM.add url (ts, reason) !failed_downloads
|
||||||
|
|
||||||
module Disk = struct
|
module Disk = struct
|
||||||
type t = {
|
type t = {
|
||||||
mutable md5s : string SM.t ;
|
mutable md5s : string SM.t ;
|
||||||
|
@ -169,7 +197,6 @@ module Make
|
||||||
|
|
||||||
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
||||||
|
|
||||||
|
|
||||||
let marshal_sm (sm : string SM.t) =
|
let marshal_sm (sm : string SM.t) =
|
||||||
let version = char_of_int 1 in
|
let version = char_of_int 1 in
|
||||||
String.make 1 version ^ Marshal.to_string sm []
|
String.make 1 version ^ Marshal.to_string sm []
|
||||||
|
@ -305,7 +332,7 @@ module Make
|
||||||
in
|
in
|
||||||
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
|
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
|
||||||
|
|
||||||
let write_partial t (hash, csum) =
|
let write_partial t (hash, csum) url =
|
||||||
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
||||||
let key = pending_key (hash, csum) in
|
let key = pending_key (hash, csum) in
|
||||||
let ( >>>= ) = Lwt_result.bind in
|
let ( >>>= ) = Lwt_result.bind in
|
||||||
|
@ -324,8 +351,10 @@ module Make
|
||||||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||||
let len = String.length data in
|
let len = String.length data in
|
||||||
let offset = Optint.Int63.of_int len in
|
let offset = Optint.Int63.of_int len in
|
||||||
|
active_length url len (Int64.to_string size ^ " bytes");
|
||||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||||
| `Unknown ->
|
| `Unknown ->
|
||||||
|
active_add_bytes url (String.length data);
|
||||||
Lwt.return_ok (digests, `Unknown data)
|
Lwt.return_ok (digests, `Unknown data)
|
||||||
end
|
end
|
||||||
| `Fixed_body (size, offset) ->
|
| `Fixed_body (size, offset) ->
|
||||||
|
@ -333,8 +362,10 @@ module Make
|
||||||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||||
let len = String.length data in
|
let len = String.length data in
|
||||||
let offset = Optint.Int63.(add offset (of_int len)) in
|
let offset = Optint.Int63.(add offset (of_int len)) in
|
||||||
|
active_add_bytes url len;
|
||||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||||
| `Unknown body ->
|
| `Unknown body ->
|
||||||
|
active_add_bytes url (String.length data);
|
||||||
Lwt.return_ok (digests, `Unknown (body ^ data))
|
Lwt.return_ok (digests, `Unknown (body ^ data))
|
||||||
|
|
||||||
let check_csums_digests csums digests =
|
let check_csums_digests csums digests =
|
||||||
|
@ -368,18 +399,29 @@ module Make
|
||||||
| `Init -> assert false
|
| `Init -> assert false
|
||||||
end >|= function
|
end >|= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
|
remove_active url;
|
||||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
||||||
| Error e ->
|
| Error e ->
|
||||||
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e)
|
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e);
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(Fmt.str "Write failure for %s: %a" url KV.pp_write_error e)
|
||||||
else begin
|
else begin
|
||||||
(if sizes_match then
|
(if sizes_match then begin
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url
|
||||||
|
(hash_to_string hash)
|
||||||
|
(Ohex.encode (Archive_checksum.get digests hash))
|
||||||
|
(Ohex.encode csum));
|
||||||
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
|
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
|
||||||
(hash_to_string hash)
|
(hash_to_string hash)
|
||||||
(Ohex.encode (Archive_checksum.get digests hash))
|
(Ohex.encode (Archive_checksum.get digests hash))
|
||||||
(Ohex.encode csum))
|
(Ohex.encode csum))
|
||||||
else match body with
|
end else match body with
|
||||||
| `Fixed_body (reported, actual) ->
|
| `Fixed_body (reported, actual) ->
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
|
||||||
|
url Optint.Int63.pp actual reported);
|
||||||
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
|
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
|
||||||
url Optint.Int63.pp actual reported)
|
url Optint.Int63.pp actual reported)
|
||||||
| `Unknown _ -> assert false
|
| `Unknown _ -> assert false
|
||||||
|
@ -404,7 +446,6 @@ module Make
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
let init ~verify_sha256 dev dev_md5s dev_sha512s =
|
let init ~verify_sha256 dev dev_md5s dev_sha512s =
|
||||||
KV.list dev Mirage_kv.Key.empty >>= function
|
KV.list dev Mirage_kv.Key.empty >>= function
|
||||||
|
@ -682,6 +723,37 @@ stamp: %S
|
||||||
t.index <- index;
|
t.index <- index;
|
||||||
Some changes)
|
Some changes)
|
||||||
|
|
||||||
|
let status disk =
|
||||||
|
(* report status:
|
||||||
|
- archive size (can we easily measure?) and number of "good" elements
|
||||||
|
- list of current downloads
|
||||||
|
- list of failed downloads
|
||||||
|
*)
|
||||||
|
let hashes = SM.cardinal disk.Disk.md5s in
|
||||||
|
let archive_stats = Printf.sprintf "%u validated archives on disk" hashes in
|
||||||
|
let active_downloads =
|
||||||
|
let header = "<h2>Active downloads</h2><ul>" in
|
||||||
|
let content =
|
||||||
|
SM.fold (fun url (ts, bytes_written, length_or_unknown) acc ->
|
||||||
|
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to disk, " ^ length_or_unknown ^ "</li>")
|
||||||
|
^ acc)
|
||||||
|
!active_downloads ""
|
||||||
|
in
|
||||||
|
header ^ content ^ "</ul>"
|
||||||
|
and failed_downloads =
|
||||||
|
let header = "<h2>Failed downloads</h2><ul>" in
|
||||||
|
let content =
|
||||||
|
SM.fold (fun url (ts, reason) acc ->
|
||||||
|
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ reason ^ "</li>")
|
||||||
|
^ acc)
|
||||||
|
!failed_downloads ""
|
||||||
|
in
|
||||||
|
header ^ content ^ "</ul>"
|
||||||
|
in
|
||||||
|
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status></h1><div>"
|
||||||
|
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ]
|
||||||
|
^ "</div></body></html>"
|
||||||
|
|
||||||
let not_modified request (modified, etag) =
|
let not_modified request (modified, etag) =
|
||||||
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
||||||
| Some ts -> String.equal ts modified
|
| Some ts -> String.equal ts modified
|
||||||
|
@ -734,6 +806,16 @@ stamp: %S
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data
|
Httpaf.Reqd.respond_with_string reqd resp data
|
||||||
|
| [ ""; x ] when String.equal x "status" ->
|
||||||
|
let data = status store in
|
||||||
|
let mime_type = "text/html" in
|
||||||
|
let headers = [
|
||||||
|
"content-type", mime_type ;
|
||||||
|
"content-length", string_of_int (String.length data) ;
|
||||||
|
] in
|
||||||
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
|
Httpaf.Reqd.respond_with_string reqd resp data
|
||||||
| [ ""; "repo" ] ->
|
| [ ""; "repo" ] ->
|
||||||
if not_modified request (t.modified, t.commit_id) then
|
if not_modified request (t.modified, t.commit_id) then
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
|
@ -843,18 +925,23 @@ stamp: %S
|
||||||
if !idx mod 10 = 0 then Gc.full_major () ;
|
if !idx mod 10 = 0 then Gc.full_major () ;
|
||||||
Logs.info (fun m -> m "downloading %s" url);
|
Logs.info (fun m -> m "downloading %s" url);
|
||||||
let quux, body_init = Archive_checksum.init_write csums in
|
let quux, body_init = Archive_checksum.init_write csums in
|
||||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
|
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
||||||
|
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
||||||
| Ok (resp, r) ->
|
| Ok (resp, r) ->
|
||||||
begin match r with
|
begin match r with
|
||||||
| Error `Bad_response ->
|
| Error `Bad_response ->
|
||||||
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
||||||
url H2.Status.pp_hum resp.status resp.reason);
|
url H2.Status.pp_hum resp.status resp.reason);
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error `Write_error e ->
|
| Error `Write_error e ->
|
||||||
Logs.err (fun m -> m "%s: write error %a %a"
|
Logs.err (fun m -> m "%s: write error %a %a"
|
||||||
url
|
url
|
||||||
Mirage_kv.Key.pp (Disk.pending_key quux)
|
Mirage_kv.Key.pp (Disk.pending_key quux)
|
||||||
KV.pp_write_error e);
|
KV.pp_write_error e);
|
||||||
|
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
||||||
|
(Fmt.str "write error: %a" KV.pp_write_error e);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok (digests, body) ->
|
| Ok (digests, body) ->
|
||||||
Disk.finalize_write disk quux ~url body csums digests
|
Disk.finalize_write disk quux ~url body csums digests
|
||||||
|
|
Loading…
Reference in a new issue