WIP: Cache only version #25

Draft
reynir wants to merge 1 commit from cache into main
2 changed files with 141 additions and 287 deletions

View file

@ -18,11 +18,10 @@ let verify =
let remote =
let doc =
Key.Arg.info
~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \
https://github.com/ocaml/opam-repository.git"
~doc:"Remote repository url"
["remote"]
in
Key.(create "remote" Arg.(opt string "https://github.com/ocaml/opam-repository.git#master" doc))
Key.(create "remote" Arg.(opt string "https://opam.ocaml.org" doc))
let parallel_downloads =
let doc =
@ -35,7 +34,7 @@ let parallel_downloads =
let hook_url =
let doc =
Key.Arg.info
~doc:"URL to conduct an update of the git repository" ["hook-url"]
~doc:"URL to conduct an update of the opam repository" ["hook-url"]
in
Key.(create "hook-url" Arg.(opt string "update" doc))
@ -55,29 +54,23 @@ let sectors_cache =
let doc = Key.Arg.info ~doc ["sectors-cache"] in
Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc))
let sectors_git =
let doc = "Number of sectors reserved for git dump." in
let doc = Key.Arg.info ~doc ["sectors-git"] in
Key.(create "sectors-git" Arg.(opt int64 Int64.(mul 40L (mul 2L 1024L)) doc))
let mirror =
foreign "Unikernel.Make"
~keys:[ Key.v check ; Key.v verify ; Key.v remote ;
Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ;
Key.v port ; Key.v sectors_cache ; Key.v sectors_git ; ]
Key.v port ; Key.v sectors_cache ; ]
~packages:[
package ~min:"0.2.0" ~sublibs:[ "mirage" ] "paf" ;
package "h2" ;
package "httpaf" ;
package ~pin:"git+https://git.robur.io/robur/git-kv.git#main" "git-kv" ;
package ~min:"3.7.0" "git-paf" ;
package "opam-file-format" ;
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" ;
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ;
package "mirage-block-partition" ;
package "oneffs" ;
package "hex" ;
]
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job)
(block @-> time @-> pclock @-> stackv4v6 @-> http_client @-> job)
let stack = generic_stackv4v6 default_network
@ -98,10 +91,9 @@ let http_client =
2 instances of happy-eyeballs will exists together which is not really good
(it puts a pressure on the scheduler). *)
let git_client, http_client =
let happy_eyeballs = git_happy_eyeballs stack dns (generic_happy_eyeballs stack dns) in
merge_git_clients (git_tcp tcp happy_eyeballs)
(git_http ~authenticator:tls_authenticator tcp happy_eyeballs),
let http_client =
let happy_eyeballs = generic_happy_eyeballs stack dns in
let happy_eyeballs = git_happy_eyeballs stack dns happy_eyeballs in
http_client $ default_time $ default_posix_clock $ tcp $ happy_eyeballs
let program_block_size =
@ -111,4 +103,4 @@ let program_block_size =
let block = block_of_file "tar"
let () = register "mirror"
[ mirror $ block $ default_time $ default_posix_clock $ stack $ git_client $ http_client ]
[ mirror $ block $ default_time $ default_posix_clock $ stack $ http_client ]

View file

@ -2,12 +2,13 @@ open Lwt.Infix
let argument_error = 64
external reraise : exn -> 'a = "%reraise"
module Make
(BLOCK : Mirage_block.S)
(Time : Mirage_time.S)
(Pclock : Mirage_clock.PCLOCK)
(Stack : Tcpip.Stack.V4V6)
(_ : sig end)
(HTTP : Http_mirage_client.S) = struct
module Part = Mirage_block_partition.Make(BLOCK)
@ -51,29 +52,6 @@ module Make
hm ""
module Git = struct
let find_contents store =
let rec go store path acc =
Git_kv.list store path >>= function
| Error e ->
Logs.err (fun m -> m "error %a while listing %a"
Git_kv.pp_error e Mirage_kv.Key.pp path);
Lwt.return acc
| Ok steps ->
Lwt_list.fold_left_s (fun acc (step, _) ->
let full_path = Mirage_kv.Key.add path step in
Git_kv.exists store full_path >>= function
| Error e ->
Logs.err (fun m -> m "error %a for exists %a" Git_kv.pp_error e
Mirage_kv.Key.pp full_path);
Lwt.return acc
| Ok None ->
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path);
Lwt.return acc
| Ok Some `Value -> Lwt.return (full_path :: acc)
| Ok Some `Dictionary -> go store full_path acc) acc steps
in
go store Mirage_kv.Key.empty []
let decode_digest filename str =
let hex h s =
match hex_of_string s with
@ -162,40 +140,94 @@ module Make
end
| _ -> Logs.debug (fun m -> m "no url section for %s" filename); None
let find_urls store =
find_contents store >>= fun paths ->
let opam_paths =
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths
module Null_write = struct
(* we have to provide a WRITER, but we don't actually want to write. *)
type out_channel = |
type 'a t = 'a Lwt.t
let really_write o _ = match o with (_ : out_channel) -> .
end
module String_read = struct
type in_channel = {
data : string;
mutable offset : int;
}
type 'a t = 'a Lwt.t
let really_read t buf =
let buf_len = Cstruct.length buf in
if String.length t.data - t.offset < buf_len then
raise Stdlib.End_of_file;
Cstruct.blit_from_string t.data t.offset buf 0 buf_len;
t.offset <- t.offset + buf_len;
Lwt.pause ()
let read t buf =
let len = min (String.length t.data - t.offset) (Cstruct.length buf) in
Cstruct.blit_from_string t.data t.offset buf 0 len;
Lwt.pause () >>= fun () ->
t.offset <- t.offset + len;
Lwt.return len
let skip t n =
t.offset <- t.offset + n; Lwt.pause ()
end
module Index = Tar_gz.Make(Lwt)(Null_write)(String_read)
let find_urls index =
let ic =
Index.of_in_channel ~internal:(Cstruct.create 0x1000)
{ String_read.data = index ; offset = 0 ; }
in
Lwt_list.fold_left_s (fun acc path ->
Git_kv.get store path >|= function
| Ok data ->
(* TODO report parser errors *)
(try
let url_csums = extract_urls (Mirage_kv.Key.to_string path) data in
Option.fold ~none:acc ~some:(fun (url, csums) ->
if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
else
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
csums'
then
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
None
end) acc) url_csums
with _ ->
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
acc)
| Error e -> Logs.warn (fun m -> m "Git_kv.get: %a" Git_kv.pp_error e); acc)
SM.empty opam_paths
let rec do_it acc =
Lwt.try_bind
(fun () -> Index.get_next_header ic)
(fun hdr ->
(* XXX: Int64.to_int *)
let file_size = Int64.to_int hdr.file_size in
Logs.debug (fun m -> m "%s: %Ld" hdr.file_name hdr.file_size);
if String.starts_with hdr.Tar.Header.file_name ~prefix:"/packages/" &&
String.ends_with hdr.Tar.Header.file_name ~suffix:"/opam" then begin
let buf = Cstruct.create file_size in
Index.really_read ic buf >>= fun () ->
Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () ->
let data = Cstruct.to_string buf in
(* TODO report parser errors *)
let acc =
try
let url_csums = extract_urls hdr.file_name data in
Option.fold ~none:acc ~some:(fun (url, csums) ->
if HM.cardinal csums = 0 then
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
else
SM.update url (function
| None -> Some csums
| Some csums' ->
if HM.for_all (fun h v ->
match HM.find_opt h csums with
| None -> true | Some v' -> String.equal v v')
csums'
then
Some (HM.union (fun _h v _v' -> Some v) csums csums')
else begin
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
url (hm_to_s csums') (hm_to_s csums));
None
end) acc) url_csums
with _ ->
Logs.warn (fun m -> m "some error in %s, ignoring" hdr.file_name);
acc
in
(do_it [@tailcall]) acc
end else begin
Logs.debug (fun m -> m "skipping %d bytes" file_size);
Index.skip ic file_size >>= fun () ->
Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () ->
(do_it [@tailcall]) acc
end
)
(function
| Tar.Header.End_of_stream ->
Lwt.return acc
| e -> reraise e)
in
do_it SM.empty
end
module Disk = struct
@ -452,65 +484,6 @@ module Make
Error `Not_found
end
module Tarball = struct
module Async = struct
type 'a t = 'a
let ( >>= ) x f = f x
let return x = x
end
module Writer = struct
type out_channel = Buffer.t
type 'a t = 'a
let really_write buf data =
Buffer.add_string buf (Cstruct.to_string data)
end
(* That's not very interesting here, we just ignore everything*)
module Reader = struct
type in_channel = unit
type 'a t = 'a
let really_read _in _data = ()
let skip _in _len = ()
let read _in _data = 0
end
module Tar_Gz = Tar_gz.Make (Async)(Writer)(Reader)
let of_git repo store =
let out_channel = Buffer.create 1024 in
let now = Ptime.v (Pclock.now_d_ps ()) in
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
let gz_out =
Tar_Gz.of_out_channel ~level:4 ~mtime:(Int32.of_int mtime)
Gz.Unix out_channel
in
Git.find_contents store >>= fun paths ->
Lwt_list.iter_s (fun path ->
Git_kv.get store path >|= function
| Ok data ->
let data =
if Mirage_kv.Key.(equal path (v "repo")) then repo else data
in
let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *)
and mod_time = Int64.of_int mtime
and user_id = 0
and group_id = 0
and size = String.length data
in
let hdr =
Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
(Mirage_kv.Key.to_string path) (Int64.of_int size)
in
let o = ref false in
let stream () = if !o then None else (o := true; Some data) in
Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream
| Error e -> Logs.warn (fun m -> m "Git_kv error: %a" Git_kv.pp_error e))
paths >|= fun () ->
Tar_Gz.write_end gz_out;
Buffer.contents out_channel
end
module Serve = struct
let ptime_to_http_date ptime =
let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime
@ -524,62 +497,8 @@ module Make
let m' = Array.get month (pred m) in
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
let commit_id git_kv =
Git_kv.digest git_kv Mirage_kv.Key.empty >|= fun r ->
Result.get_ok r
let repo commit =
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
Fmt.str
{|opam-version: "2.0"
upstream: "%s#%s"
archive-mirrors: "cache"
stamp: %S
|} upstream commit commit
let modified git_kv =
Git_kv.last_modified git_kv Mirage_kv.Key.empty >|= fun r ->
let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in
ptime_to_http_date (Ptime.v v)
type t = {
mutable commit_id : string ;
mutable modified : string ;
mutable repo : string ;
mutable index : string ;
}
let create git_kv =
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
let repo = repo commit_id in
Tarball.of_git repo git_kv >|= fun index ->
{ commit_id ; modified ; repo ; index }
let update_lock = Lwt_mutex.create ()
let update_git t git_kv =
Lwt_mutex.with_lock update_lock (fun () ->
Logs.info (fun m -> m "pulling the git repository");
Git_kv.pull git_kv >>= function
| Error `Msg msg ->
Logs.err (fun m -> m "error %s while updating git" msg);
Lwt.return None
| Ok [] ->
Logs.info (fun m -> m "git changes are empty");
Lwt.return (Some [])
| Ok changes ->
commit_id git_kv >>= fun commit_id ->
modified git_kv >>= fun modified ->
Logs.info (fun m -> m "git: %s" commit_id);
let repo = repo commit_id in
Tarball.of_git repo git_kv >|= fun index ->
t.commit_id <- commit_id ;
t.modified <- modified ;
t.repo <- repo ;
t.index <- index;
Some changes)
let not_modified request (modified, etag) =
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
| Some ts -> String.equal ts modified
@ -615,7 +534,7 @@ stamp: %S
let resp = Httpaf.Response.create `Not_modified in
respond_with_empty reqd resp
else *)
let dispatch t store hook_url update _flow _conn reqd =
let dispatch store hook_url update _flow _conn reqd =
let request = Httpaf.Reqd.request reqd in
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
match String.split_on_char '/' request.Httpaf.Request.target with
@ -625,46 +544,11 @@ stamp: %S
let mime_type = "text/plain" in
let headers = [
"content-type", mime_type ;
"etag", t.commit_id ;
"last-modified", t.modified ;
"content-length", string_of_int (String.length data) ;
] in
let headers = Httpaf.Headers.of_list headers in
let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data
| [ ""; "repo" ] ->
if not_modified request (t.modified, t.commit_id) then
let resp = Httpaf.Response.create `Not_modified in
respond_with_empty reqd resp
else
let data = t.repo in
let mime_type = "text/plain" in
let headers = [
"content-type", mime_type ;
"etag", t.commit_id ;
"last-modified", t.modified ;
"content-length", string_of_int (String.length data) ;
] in
let headers = Httpaf.Headers.of_list headers in
let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data
| [ ""; "index.tar.gz" ] ->
(* deliver prepared tarball *)
if not_modified request (t.modified, t.commit_id) then
let resp = Httpaf.Response.create `Not_modified in
respond_with_empty reqd resp
else
let data = t.index in
let mime_type = "application/octet-stream" in
let headers = [
"content-type", mime_type ;
"etag", t.commit_id ;
"last-modified", t.modified ;
"content-length", string_of_int (String.length data) ;
] in
let headers = Httpaf.Headers.of_list headers in
let resp = Httpaf.Response.create ~headers `OK in
Httpaf.Reqd.respond_with_string reqd resp data
| "" :: "cache" :: hash_algo :: _ :: hash :: [] ->
(* `<hash-algo>/<first-2-hash-characters>/<hash>` *)
begin
@ -677,7 +561,8 @@ stamp: %S
(Disk.last_modified store h hash >|= function
| Error _ ->
Logs.warn (fun m -> m "error retrieving last modified");
t.modified
(* XXX *)
ptime_to_http_date (Ptime.v (Pclock.now_d_ps ()))
| Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified ->
if not_modified request (last_modified, hash) then
let resp = Httpaf.Response.create `Not_modified in
@ -754,82 +639,59 @@ stamp: %S
Disk.update_caches disk >|= fun () ->
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
let dump_git git_dump git_kv =
Git_kv.to_octets git_kv >>= fun data ->
Cache.write git_dump data >|= function
| Ok () ->
Logs.info (fun m -> m "dumped git %d bytes" (String.length data))
| Error e ->
Logs.warn (fun m -> m "failed to dump git: %a" Cache.pp_write_error e)
let restore_git git_dump git_ctx =
Cache.read git_dump >>= function
| Ok None -> Lwt.return (Error ())
| Error e ->
Logs.warn (fun m -> m "failed to read git state: %a" Cache.pp_error e);
Lwt.return (Error ())
| Ok Some data ->
Git_kv.of_octets git_ctx ~remote:(Key_gen.remote ()) data >|= function
| Ok git_kv -> Ok git_kv
| Error `Msg msg ->
Logs.err (fun m -> m "error restoring git state: %s" msg);
Error ()
module Paf = Paf_mirage.Make(Stack.TCP)
let start block _time _pclock stack git_ctx http_ctx =
let start block _time _pclock stack http_ctx =
BLOCK.get_info block >>= fun info ->
let sectors_cache = Key_gen.sectors_cache () in
let sectors_git = Key_gen.sectors_git () in
let git_start =
let sectors =
let cache_size = Int64.(mul 2L sectors_cache) in
Int64.(sub info.size_sectors (add cache_size sectors_git))
Int64.(sub info.size_sectors cache_size)
in
Part.connect git_start block >>= fun (kv, rest) ->
let git_dump, rest = Part.subpartition sectors_git rest in
Part.connect sectors block >>= fun (kv, rest) ->
let md5s, sha512s = Part.subpartition sectors_cache rest in
KV.connect kv >>= fun kv ->
Cache.connect md5s >>= fun md5s ->
Cache.connect sha512s >>= fun sha512s ->
Cache.connect git_dump >>= fun git_dump ->
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk ->
if Key_gen.check () then
Lwt.return_unit
else
begin
restore_git git_dump git_ctx >>= function
| Ok git_kv -> Lwt.return git_kv
| Error () ->
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
dump_git git_dump git_kv >|= fun () ->
git_kv
end >>= fun git_kv ->
Serve.commit_id git_kv >>= fun commit_id ->
Logs.info (fun m -> m "git: %s" commit_id);
Serve.create git_kv >>= fun serve ->
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
let update () =
Serve.update_git serve git_kv >>= function
| None | Some [] -> Lwt.return_unit
| Some _changes ->
dump_git git_dump git_kv >>= fun () ->
download_archives disk http_ctx git_kv
in
let service =
Paf.http_service
~error_handler:(fun _ ?request:_ _ _ -> ())
(Serve.dispatch serve disk (Key_gen.hook_url ()) update)
in
let `Initialized th = Paf.serve service t in
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
Lwt.async (fun () ->
let rec go () =
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
update () >>= fun () ->
go ()
in
go ());
download_archives disk http_ctx git_kv >>= fun () ->
(th >|= fun _v -> ())
let url = Key_gen.remote () ^ "/index.tar.gz" in
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
let update () =
Http_mirage_client.one_request
~alpn_protocol:HTTP.alpn_protocol
~authenticator:HTTP.authenticator
~ctx:http_ctx url >>= function
| Ok (resp, Some str) ->
if resp.status = `OK then
download_archives disk http_ctx str
else begin
Logs.warn (fun m -> m "%s: %a (reason %s)"
url H2.Status.pp_hum resp.status resp.reason);
Lwt.return_unit
end
| _ -> Lwt.return_unit
in
let service =
Paf.http_service
~error_handler:(fun _ ?request:_ _ _ -> ())
(Serve.dispatch disk (Key_gen.hook_url ()) update)
in
let `Initialized th = Paf.serve service t in
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
Lwt.async (fun () ->
let rec go () =
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
update () >>= fun () ->
go ()
in
go ());
update () >>= fun () ->
(th >|= fun _v -> ())
end
end