Compare commits

...

4 commits

Author SHA1 Message Date
f9620e9011 Merge pull request 'Add a download status page, recording:' (#15) from dl-status into main
Reviewed-on: #15
Reviewed-by: Reynir Björnsson <reynir@reynir.dk>
2024-11-01 11:01:51 +00:00
53af2665fa record free bytes in tar archive 2024-10-31 17:45:34 +01:00
f40083692a tweaks 2024-10-31 17:17:34 +01:00
eb95821b2e Add a download status page, recording:
- number of archives on disk
- current downloads
- failed downloads
2024-10-31 11:30:52 +01:00

View file

@ -154,6 +154,34 @@ module Make
SM.empty opam_paths
end
let active_downloads = ref SM.empty
let add_to_active url ts =
active_downloads := SM.add url (ts, 0, "unknown size") !active_downloads
let remove_active url =
active_downloads := SM.remove url !active_downloads
let active_length url written length =
match SM.find_opt url !active_downloads with
| None -> ()
| Some (ts, written', _) ->
active_downloads := SM.add url (ts, written + written', length)
!active_downloads
let active_add_bytes url written =
match SM.find_opt url !active_downloads with
| None -> ()
| Some (ts, written', l) ->
active_downloads := SM.add url (ts, written + written', l)
!active_downloads
let failed_downloads = ref SM.empty
let add_failed url ts reason =
remove_active url;
failed_downloads := SM.add url (ts, reason) !failed_downloads
module Disk = struct
type t = {
mutable md5s : string SM.t ;
@ -169,7 +197,6 @@ module Make
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
let marshal_sm (sm : string SM.t) =
let version = char_of_int 1 in
String.make 1 version ^ Marshal.to_string sm []
@ -305,7 +332,7 @@ module Make
in
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
let write_partial t (hash, csum) =
let write_partial t (hash, csum) url =
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
let key = pending_key (hash, csum) in
let ( >>>= ) = Lwt_result.bind in
@ -324,8 +351,10 @@ module Make
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.of_int len in
active_length url len (Int64.to_string size ^ " bytes");
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown ->
active_add_bytes url (String.length data);
Lwt.return_ok (digests, `Unknown data)
end
| `Fixed_body (size, offset) ->
@ -333,8 +362,10 @@ module Make
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
let len = String.length data in
let offset = Optint.Int63.(add offset (of_int len)) in
active_add_bytes url len;
Lwt.return_ok (digests, `Fixed_body (size, offset))
| `Unknown body ->
active_add_bytes url (String.length data);
Lwt.return_ok (digests, `Unknown (body ^ data))
let check_csums_digests csums digests =
@ -368,18 +399,29 @@ module Make
| `Init -> assert false
end >|= function
| Ok () ->
remove_active url;
t.md5s <- SM.add md5 sha256 t.md5s;
t.sha512s <- SM.add sha512 sha256 t.sha512s
| Error e ->
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e)
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Write failure for %s: %a" url KV.pp_write_error e)
else begin
(if sizes_match then
(if sizes_match then begin
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum));
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
(hash_to_string hash)
(Ohex.encode (Archive_checksum.get digests hash))
(Ohex.encode csum))
else match body with
end else match body with
| `Fixed_body (reported, actual) ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported);
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
url Optint.Int63.pp actual reported)
| `Unknown _ -> assert false
@ -404,7 +446,6 @@ module Make
Lwt.return_unit
end
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
let init ~verify_sha256 dev dev_md5s dev_sha512s =
KV.list dev Mirage_kv.Key.empty >>= function
@ -682,6 +723,40 @@ stamp: %S
t.index <- index;
Some changes)
let status disk =
(* report status:
- archive size (can we easily measure?) and number of "good" elements
- list of current downloads
- list of failed downloads
*)
let archive_stats =
Fmt.str "<ul><li>%u validated archives on disk</li><li>%Lu bytes free</li></ul>"
(SM.cardinal disk.Disk.md5s)
(KV.free disk.Disk.dev)
in
let active_downloads =
let header = "<h2>Active downloads</h2><ul>" in
let content =
SM.fold (fun url (ts, bytes_written, length_or_unknown) acc ->
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to disk, " ^ length_or_unknown ^ "</li>")
^ acc)
!active_downloads ""
in
header ^ content ^ "</ul>"
and failed_downloads =
let header = "<h2>Failed downloads</h2><ul>" in
let content =
SM.fold (fun url (ts, reason) acc ->
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ reason ^ "</li>")
^ acc)
!failed_downloads ""
in
header ^ content ^ "</ul>"
in
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>"
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ]
^ "</div></body></html>"
let not_modified request (modified, etag) =
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
| Some ts -> String.equal ts modified
@ -734,6 +809,16 @@ stamp: %S
let headers = Httpaf.Headers.of_list headers in
let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data
| [ ""; x ] when String.equal x "status" ->
let data = status store in
let mime_type = "text/html" in
let headers = [
"content-type", mime_type ;
"content-length", string_of_int (String.length data) ;
] in
let headers = Httpaf.Headers.of_list headers in
let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data
| [ ""; "repo" ] ->
if not_modified request (t.modified, t.commit_id) then
let resp = Httpaf.Response.create `Not_modified in
@ -843,23 +928,31 @@ stamp: %S
if !idx mod 10 = 0 then Gc.full_major () ;
Logs.info (fun m -> m "downloading %s" url);
let quux, body_init = Archive_checksum.init_write csums in
Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
| Ok (resp, r) ->
begin match r with
| Error `Bad_response ->
Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason);
Lwt.return_unit
| Error `Write_error e ->
Logs.err (fun m -> m "%s: write error %a %a"
url
Mirage_kv.Key.pp (Disk.pending_key quux)
KV.pp_write_error e);
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "write error: %a" KV.pp_write_error e);
Lwt.return_unit
| Ok (digests, body) ->
Disk.finalize_write disk quux ~url body csums digests
end
| _ -> Lwt.return_unit)
| Error me ->
add_failed url (Ptime.v (Pclock.now_d_ps ()))
(Fmt.str "mimic error: %a" Mimic.pp_error me);
Lwt.return_unit)
(SM.bindings urls) >>= fun () ->
Disk.update_caches disk >|= fun () ->
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))