Compare commits
No commits in common. "f9620e901163ce50876d6b08bf46531c2438e15b" and "9bb86b3507399fa1ea3323f05c2bee2692d656ec" have entirely different histories.
f9620e9011
...
9bb86b3507
1 changed files with 8 additions and 101 deletions
|
@ -154,34 +154,6 @@ module Make
|
||||||
SM.empty opam_paths
|
SM.empty opam_paths
|
||||||
end
|
end
|
||||||
|
|
||||||
let active_downloads = ref SM.empty
|
|
||||||
|
|
||||||
let add_to_active url ts =
|
|
||||||
active_downloads := SM.add url (ts, 0, "unknown size") !active_downloads
|
|
||||||
|
|
||||||
let remove_active url =
|
|
||||||
active_downloads := SM.remove url !active_downloads
|
|
||||||
|
|
||||||
let active_length url written length =
|
|
||||||
match SM.find_opt url !active_downloads with
|
|
||||||
| None -> ()
|
|
||||||
| Some (ts, written', _) ->
|
|
||||||
active_downloads := SM.add url (ts, written + written', length)
|
|
||||||
!active_downloads
|
|
||||||
|
|
||||||
let active_add_bytes url written =
|
|
||||||
match SM.find_opt url !active_downloads with
|
|
||||||
| None -> ()
|
|
||||||
| Some (ts, written', l) ->
|
|
||||||
active_downloads := SM.add url (ts, written + written', l)
|
|
||||||
!active_downloads
|
|
||||||
|
|
||||||
let failed_downloads = ref SM.empty
|
|
||||||
|
|
||||||
let add_failed url ts reason =
|
|
||||||
remove_active url;
|
|
||||||
failed_downloads := SM.add url (ts, reason) !failed_downloads
|
|
||||||
|
|
||||||
module Disk = struct
|
module Disk = struct
|
||||||
type t = {
|
type t = {
|
||||||
mutable md5s : string SM.t ;
|
mutable md5s : string SM.t ;
|
||||||
|
@ -197,6 +169,7 @@ module Make
|
||||||
|
|
||||||
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
let empty dev dev_md5s dev_sha512s = { md5s = SM.empty ; sha512s = SM.empty ; dev; dev_md5s; dev_sha512s }
|
||||||
|
|
||||||
|
|
||||||
let marshal_sm (sm : string SM.t) =
|
let marshal_sm (sm : string SM.t) =
|
||||||
let version = char_of_int 1 in
|
let version = char_of_int 1 in
|
||||||
String.make 1 version ^ Marshal.to_string sm []
|
String.make 1 version ^ Marshal.to_string sm []
|
||||||
|
@ -332,7 +305,7 @@ module Make
|
||||||
in
|
in
|
||||||
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
|
Mirage_kv.Key.(to_delete / hash_to_string hash / (encoded_csum ^ "." ^ rand))
|
||||||
|
|
||||||
let write_partial t (hash, csum) url =
|
let write_partial t (hash, csum) =
|
||||||
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
(* XXX: we may be in trouble if different hash functions are used for the same archive *)
|
||||||
let key = pending_key (hash, csum) in
|
let key = pending_key (hash, csum) in
|
||||||
let ( >>>= ) = Lwt_result.bind in
|
let ( >>>= ) = Lwt_result.bind in
|
||||||
|
@ -351,10 +324,8 @@ module Make
|
||||||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||||
let len = String.length data in
|
let len = String.length data in
|
||||||
let offset = Optint.Int63.of_int len in
|
let offset = Optint.Int63.of_int len in
|
||||||
active_length url len (Int64.to_string size ^ " bytes");
|
|
||||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||||
| `Unknown ->
|
| `Unknown ->
|
||||||
active_add_bytes url (String.length data);
|
|
||||||
Lwt.return_ok (digests, `Unknown data)
|
Lwt.return_ok (digests, `Unknown data)
|
||||||
end
|
end
|
||||||
| `Fixed_body (size, offset) ->
|
| `Fixed_body (size, offset) ->
|
||||||
|
@ -362,10 +333,8 @@ module Make
|
||||||
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
|> Lwt_result.map_error (fun e -> `Write_error e) >>>= fun () ->
|
||||||
let len = String.length data in
|
let len = String.length data in
|
||||||
let offset = Optint.Int63.(add offset (of_int len)) in
|
let offset = Optint.Int63.(add offset (of_int len)) in
|
||||||
active_add_bytes url len;
|
|
||||||
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
Lwt.return_ok (digests, `Fixed_body (size, offset))
|
||||||
| `Unknown body ->
|
| `Unknown body ->
|
||||||
active_add_bytes url (String.length data);
|
|
||||||
Lwt.return_ok (digests, `Unknown (body ^ data))
|
Lwt.return_ok (digests, `Unknown (body ^ data))
|
||||||
|
|
||||||
let check_csums_digests csums digests =
|
let check_csums_digests csums digests =
|
||||||
|
@ -399,29 +368,18 @@ module Make
|
||||||
| `Init -> assert false
|
| `Init -> assert false
|
||||||
end >|= function
|
end >|= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
remove_active url;
|
|
||||||
t.md5s <- SM.add md5 sha256 t.md5s;
|
t.md5s <- SM.add md5 sha256 t.md5s;
|
||||||
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
t.sha512s <- SM.add sha512 sha256 t.sha512s
|
||||||
| Error e ->
|
| Error e ->
|
||||||
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e);
|
Logs.err (fun m -> m "Write failure for %s: %a" url KV.pp_write_error e)
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(Fmt.str "Write failure for %s: %a" url KV.pp_write_error e)
|
|
||||||
else begin
|
else begin
|
||||||
(if sizes_match then begin
|
(if sizes_match then
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(Fmt.str "Bad checksum %s:%s: computed %s expected %s" url
|
|
||||||
(hash_to_string hash)
|
|
||||||
(Ohex.encode (Archive_checksum.get digests hash))
|
|
||||||
(Ohex.encode csum));
|
|
||||||
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
|
Logs.err (fun m -> m "Bad checksum %s:%s: computed %s expected %s" url
|
||||||
(hash_to_string hash)
|
(hash_to_string hash)
|
||||||
(Ohex.encode (Archive_checksum.get digests hash))
|
(Ohex.encode (Archive_checksum.get digests hash))
|
||||||
(Ohex.encode csum))
|
(Ohex.encode csum))
|
||||||
end else match body with
|
else match body with
|
||||||
| `Fixed_body (reported, actual) ->
|
| `Fixed_body (reported, actual) ->
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(Fmt.str "Size mismatch %s: received %a bytes expected %Lu bytes"
|
|
||||||
url Optint.Int63.pp actual reported);
|
|
||||||
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
|
Logs.err (fun m -> m "Size mismatch %s: received %a bytes expected %Lu bytes"
|
||||||
url Optint.Int63.pp actual reported)
|
url Optint.Int63.pp actual reported)
|
||||||
| `Unknown _ -> assert false
|
| `Unknown _ -> assert false
|
||||||
|
@ -446,6 +404,7 @@ module Make
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
(* on disk, we use a flat file system where the filename is the sha256 of the data *)
|
||||||
let init ~verify_sha256 dev dev_md5s dev_sha512s =
|
let init ~verify_sha256 dev dev_md5s dev_sha512s =
|
||||||
KV.list dev Mirage_kv.Key.empty >>= function
|
KV.list dev Mirage_kv.Key.empty >>= function
|
||||||
|
@ -723,40 +682,6 @@ stamp: %S
|
||||||
t.index <- index;
|
t.index <- index;
|
||||||
Some changes)
|
Some changes)
|
||||||
|
|
||||||
let status disk =
|
|
||||||
(* report status:
|
|
||||||
- archive size (can we easily measure?) and number of "good" elements
|
|
||||||
- list of current downloads
|
|
||||||
- list of failed downloads
|
|
||||||
*)
|
|
||||||
let archive_stats =
|
|
||||||
Fmt.str "<ul><li>%u validated archives on disk</li><li>%Lu bytes free</li></ul>"
|
|
||||||
(SM.cardinal disk.Disk.md5s)
|
|
||||||
(KV.free disk.Disk.dev)
|
|
||||||
in
|
|
||||||
let active_downloads =
|
|
||||||
let header = "<h2>Active downloads</h2><ul>" in
|
|
||||||
let content =
|
|
||||||
SM.fold (fun url (ts, bytes_written, length_or_unknown) acc ->
|
|
||||||
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ string_of_int bytes_written ^ " bytes written to disk, " ^ length_or_unknown ^ "</li>")
|
|
||||||
^ acc)
|
|
||||||
!active_downloads ""
|
|
||||||
in
|
|
||||||
header ^ content ^ "</ul>"
|
|
||||||
and failed_downloads =
|
|
||||||
let header = "<h2>Failed downloads</h2><ul>" in
|
|
||||||
let content =
|
|
||||||
SM.fold (fun url (ts, reason) acc ->
|
|
||||||
("<li>" ^ Ptime.to_rfc3339 ?tz_offset_s:None ts ^ ": " ^ url ^ " " ^ reason ^ "</li>")
|
|
||||||
^ acc)
|
|
||||||
!failed_downloads ""
|
|
||||||
in
|
|
||||||
header ^ content ^ "</ul>"
|
|
||||||
in
|
|
||||||
"<html><head><title>Opam-mirror status page</title></head><body><h1>Opam mirror status</h1><div>"
|
|
||||||
^ String.concat "</div><div>" [ archive_stats ; active_downloads ; failed_downloads ]
|
|
||||||
^ "</div></body></html>"
|
|
||||||
|
|
||||||
let not_modified request (modified, etag) =
|
let not_modified request (modified, etag) =
|
||||||
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
||||||
| Some ts -> String.equal ts modified
|
| Some ts -> String.equal ts modified
|
||||||
|
@ -809,16 +734,6 @@ stamp: %S
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data
|
Httpaf.Reqd.respond_with_string reqd resp data
|
||||||
| [ ""; x ] when String.equal x "status" ->
|
|
||||||
let data = status store in
|
|
||||||
let mime_type = "text/html" in
|
|
||||||
let headers = [
|
|
||||||
"content-type", mime_type ;
|
|
||||||
"content-length", string_of_int (String.length data) ;
|
|
||||||
] in
|
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data
|
|
||||||
| [ ""; "repo" ] ->
|
| [ ""; "repo" ] ->
|
||||||
if not_modified request (t.modified, t.commit_id) then
|
if not_modified request (t.modified, t.commit_id) then
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
|
@ -928,31 +843,23 @@ stamp: %S
|
||||||
if !idx mod 10 = 0 then Gc.full_major () ;
|
if !idx mod 10 = 0 then Gc.full_major () ;
|
||||||
Logs.info (fun m -> m "downloading %s" url);
|
Logs.info (fun m -> m "downloading %s" url);
|
||||||
let quux, body_init = Archive_checksum.init_write csums in
|
let quux, body_init = Archive_checksum.init_write csums in
|
||||||
add_to_active url (Ptime.v (Pclock.now_d_ps ()));
|
Http_mirage_client.request http_client url (Disk.write_partial disk quux) body_init >>= function
|
||||||
Http_mirage_client.request http_client url (Disk.write_partial disk quux url) body_init >>= function
|
|
||||||
| Ok (resp, r) ->
|
| Ok (resp, r) ->
|
||||||
begin match r with
|
begin match r with
|
||||||
| Error `Bad_response ->
|
| Error `Bad_response ->
|
||||||
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
||||||
url H2.Status.pp_hum resp.status resp.reason);
|
url H2.Status.pp_hum resp.status resp.reason);
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(Fmt.str "%a %s" H2.Status.pp_hum resp.status resp.reason);
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error `Write_error e ->
|
| Error `Write_error e ->
|
||||||
Logs.err (fun m -> m "%s: write error %a %a"
|
Logs.err (fun m -> m "%s: write error %a %a"
|
||||||
url
|
url
|
||||||
Mirage_kv.Key.pp (Disk.pending_key quux)
|
Mirage_kv.Key.pp (Disk.pending_key quux)
|
||||||
KV.pp_write_error e);
|
KV.pp_write_error e);
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(Fmt.str "write error: %a" KV.pp_write_error e);
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok (digests, body) ->
|
| Ok (digests, body) ->
|
||||||
Disk.finalize_write disk quux ~url body csums digests
|
Disk.finalize_write disk quux ~url body csums digests
|
||||||
end
|
end
|
||||||
| Error me ->
|
| _ -> Lwt.return_unit)
|
||||||
add_failed url (Ptime.v (Pclock.now_d_ps ()))
|
|
||||||
(Fmt.str "mimic error: %a" Mimic.pp_error me);
|
|
||||||
Lwt.return_unit)
|
|
||||||
(SM.bindings urls) >>= fun () ->
|
(SM.bindings urls) >>= fun () ->
|
||||||
Disk.update_caches disk >|= fun () ->
|
Disk.update_caches disk >|= fun () ->
|
||||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||||
|
|
Loading…
Reference in a new issue