From 391ca131dc98a47550625844eb5d883431843335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reynir=20Bj=C3=B6rnsson?= Date: Fri, 7 Oct 2022 16:54:17 +0200 Subject: [PATCH] Cache only version --- mirage/config.ml | 30 ++-- mirage/unikernel.ml | 398 +++++++++++++++----------------------------- 2 files changed, 141 insertions(+), 287 deletions(-) diff --git a/mirage/config.ml b/mirage/config.ml index aae9e0b..1530c78 100644 --- a/mirage/config.ml +++ b/mirage/config.ml @@ -18,11 +18,10 @@ let verify = let remote = let doc = Key.Arg.info - ~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \ - https://github.com/ocaml/opam-repository.git" + ~doc:"Remote repository url" ["remote"] in - Key.(create "remote" Arg.(opt string "https://github.com/ocaml/opam-repository.git#master" doc)) + Key.(create "remote" Arg.(opt string "https://opam.ocaml.org" doc)) let parallel_downloads = let doc = @@ -35,7 +34,7 @@ let parallel_downloads = let hook_url = let doc = Key.Arg.info - ~doc:"URL to conduct an update of the git repository" ["hook-url"] + ~doc:"URL to conduct an update of the opam repository" ["hook-url"] in Key.(create "hook-url" Arg.(opt string "update" doc)) @@ -55,29 +54,23 @@ let sectors_cache = let doc = Key.Arg.info ~doc ["sectors-cache"] in Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc)) -let sectors_git = - let doc = "Number of sectors reserved for git dump." in - let doc = Key.Arg.info ~doc ["sectors-git"] in - Key.(create "sectors-git" Arg.(opt int64 Int64.(mul 40L (mul 2L 1024L)) doc)) - let mirror = foreign "Unikernel.Make" ~keys:[ Key.v check ; Key.v verify ; Key.v remote ; Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ; - Key.v port ; Key.v sectors_cache ; Key.v sectors_git ; ] + Key.v port ; Key.v sectors_cache ; ] ~packages:[ package ~min:"0.2.0" ~sublibs:[ "mirage" ] "paf" ; package "h2" ; package "httpaf" ; - package ~pin:"git+https://git.robur.io/robur/git-kv.git#main" "git-kv" ; - package ~min:"3.7.0" "git-paf" ; package "opam-file-format" ; - package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ; + package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" ; package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ; package "mirage-block-partition" ; package "oneffs" ; + package "hex" ; ] - (block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job) + (block @-> time @-> pclock @-> stackv4v6 @-> http_client @-> job) let stack = generic_stackv4v6 default_network @@ -98,10 +91,9 @@ let http_client = 2 instances of happy-eyeballs will exists together which is not really good (it puts a pressure on the scheduler). *) -let git_client, http_client = - let happy_eyeballs = git_happy_eyeballs stack dns (generic_happy_eyeballs stack dns) in - merge_git_clients (git_tcp tcp happy_eyeballs) - (git_http ~authenticator:tls_authenticator tcp happy_eyeballs), +let http_client = + let happy_eyeballs = generic_happy_eyeballs stack dns in + let happy_eyeballs = git_happy_eyeballs stack dns happy_eyeballs in http_client $ default_time $ default_posix_clock $ tcp $ happy_eyeballs let program_block_size = @@ -111,4 +103,4 @@ let program_block_size = let block = block_of_file "tar" let () = register "mirror" - [ mirror $ block $ default_time $ default_posix_clock $ stack $ git_client $ http_client ] + [ mirror $ block $ default_time $ default_posix_clock $ stack $ http_client ] diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index f5e4d00..3bdf208 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -2,12 +2,13 @@ open Lwt.Infix let argument_error = 64 +external reraise : exn -> 'a = "%reraise" + module Make (BLOCK : Mirage_block.S) (Time : Mirage_time.S) (Pclock : Mirage_clock.PCLOCK) (Stack : Tcpip.Stack.V4V6) - (_ : sig end) (HTTP : Http_mirage_client.S) = struct module Part = Mirage_block_partition.Make(BLOCK) @@ -51,29 +52,6 @@ module Make hm "" module Git = struct - let find_contents store = - let rec go store path acc = - Git_kv.list store path >>= function - | Error e -> - Logs.err (fun m -> m "error %a while listing %a" - Git_kv.pp_error e Mirage_kv.Key.pp path); - Lwt.return acc - | Ok steps -> - Lwt_list.fold_left_s (fun acc (step, _) -> - let full_path = Mirage_kv.Key.add path step in - Git_kv.exists store full_path >>= function - | Error e -> - Logs.err (fun m -> m "error %a for exists %a" Git_kv.pp_error e - Mirage_kv.Key.pp full_path); - Lwt.return acc - | Ok None -> - Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path); - Lwt.return acc - | Ok Some `Value -> Lwt.return (full_path :: acc) - | Ok Some `Dictionary -> go store full_path acc) acc steps - in - go store Mirage_kv.Key.empty [] - let decode_digest filename str = let hex h s = match hex_of_string s with @@ -162,40 +140,94 @@ module Make end | _ -> Logs.debug (fun m -> m "no url section for %s" filename); None - let find_urls store = - find_contents store >>= fun paths -> - let opam_paths = - List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths + module Null_write = struct + (* we have to provide a WRITER, but we don't actually want to write. *) + type out_channel = | + type 'a t = 'a Lwt.t + let really_write o _ = match o with (_ : out_channel) -> . + end + module String_read = struct + type in_channel = { + data : string; + mutable offset : int; + } + type 'a t = 'a Lwt.t + let really_read t buf = + let buf_len = Cstruct.length buf in + if String.length t.data - t.offset < buf_len then + raise Stdlib.End_of_file; + Cstruct.blit_from_string t.data t.offset buf 0 buf_len; + t.offset <- t.offset + buf_len; + Lwt.pause () + let read t buf = + let len = min (String.length t.data - t.offset) (Cstruct.length buf) in + Cstruct.blit_from_string t.data t.offset buf 0 len; + Lwt.pause () >>= fun () -> + t.offset <- t.offset + len; + Lwt.return len + let skip t n = + t.offset <- t.offset + n; Lwt.pause () + end + module Index = Tar_gz.Make(Lwt)(Null_write)(String_read) + + let find_urls index = + let ic = + Index.of_in_channel ~internal:(Cstruct.create 0x1000) + { String_read.data = index ; offset = 0 ; } in - Lwt_list.fold_left_s (fun acc path -> - Git_kv.get store path >|= function - | Ok data -> - (* TODO report parser errors *) - (try - let url_csums = extract_urls (Mirage_kv.Key.to_string path) data in - Option.fold ~none:acc ~some:(fun (url, csums) -> - if HM.cardinal csums = 0 then - (Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc) - else - SM.update url (function - | None -> Some csums - | Some csums' -> - if HM.for_all (fun h v -> - match HM.find_opt h csums with - | None -> true | Some v' -> String.equal v v') - csums' - then - Some (HM.union (fun _h v _v' -> Some v) csums csums') - else begin - Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s" - url (hm_to_s csums') (hm_to_s csums)); - None - end) acc) url_csums - with _ -> - Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path); - acc) - | Error e -> Logs.warn (fun m -> m "Git_kv.get: %a" Git_kv.pp_error e); acc) - SM.empty opam_paths + let rec do_it acc = + Lwt.try_bind + (fun () -> Index.get_next_header ic) + (fun hdr -> + (* XXX: Int64.to_int *) + let file_size = Int64.to_int hdr.file_size in + Logs.debug (fun m -> m "%s: %Ld" hdr.file_name hdr.file_size); + if String.starts_with hdr.Tar.Header.file_name ~prefix:"/packages/" && + String.ends_with hdr.Tar.Header.file_name ~suffix:"/opam" then begin + let buf = Cstruct.create file_size in + Index.really_read ic buf >>= fun () -> + Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () -> + let data = Cstruct.to_string buf in + (* TODO report parser errors *) + let acc = + try + let url_csums = extract_urls hdr.file_name data in + Option.fold ~none:acc ~some:(fun (url, csums) -> + if HM.cardinal csums = 0 then + (Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc) + else + SM.update url (function + | None -> Some csums + | Some csums' -> + if HM.for_all (fun h v -> + match HM.find_opt h csums with + | None -> true | Some v' -> String.equal v v') + csums' + then + Some (HM.union (fun _h v _v' -> Some v) csums csums') + else begin + Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s" + url (hm_to_s csums') (hm_to_s csums)); + None + end) acc) url_csums + with _ -> + Logs.warn (fun m -> m "some error in %s, ignoring" hdr.file_name); + acc + in + (do_it [@tailcall]) acc + end else begin + Logs.debug (fun m -> m "skipping %d bytes" file_size); + Index.skip ic file_size >>= fun () -> + Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () -> + (do_it [@tailcall]) acc + end + ) + (function + | Tar.Header.End_of_stream -> + Lwt.return acc + | e -> reraise e) + in + do_it SM.empty end module Disk = struct @@ -452,65 +484,6 @@ module Make Error `Not_found end - module Tarball = struct - module Async = struct - type 'a t = 'a - let ( >>= ) x f = f x - let return x = x - end - - module Writer = struct - type out_channel = Buffer.t - type 'a t = 'a - let really_write buf data = - Buffer.add_string buf (Cstruct.to_string data) - end - - (* That's not very interesting here, we just ignore everything*) - module Reader = struct - type in_channel = unit - type 'a t = 'a - let really_read _in _data = () - let skip _in _len = () - let read _in _data = 0 - end - - module Tar_Gz = Tar_gz.Make (Async)(Writer)(Reader) - - let of_git repo store = - let out_channel = Buffer.create 1024 in - let now = Ptime.v (Pclock.now_d_ps ()) in - let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in - let gz_out = - Tar_Gz.of_out_channel ~level:4 ~mtime:(Int32.of_int mtime) - Gz.Unix out_channel - in - Git.find_contents store >>= fun paths -> - Lwt_list.iter_s (fun path -> - Git_kv.get store path >|= function - | Ok data -> - let data = - if Mirage_kv.Key.(equal path (v "repo")) then repo else data - in - let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *) - and mod_time = Int64.of_int mtime - and user_id = 0 - and group_id = 0 - and size = String.length data - in - let hdr = - Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id - (Mirage_kv.Key.to_string path) (Int64.of_int size) - in - let o = ref false in - let stream () = if !o then None else (o := true; Some data) in - Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream - | Error e -> Logs.warn (fun m -> m "Git_kv error: %a" Git_kv.pp_error e)) - paths >|= fun () -> - Tar_Gz.write_end gz_out; - Buffer.contents out_channel - end - module Serve = struct let ptime_to_http_date ptime = let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime @@ -524,62 +497,8 @@ module Make let m' = Array.get month (pred m) in Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss - let commit_id git_kv = - Git_kv.digest git_kv Mirage_kv.Key.empty >|= fun r -> - Result.get_ok r - - let repo commit = - let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in - Fmt.str - {|opam-version: "2.0" -upstream: "%s#%s" -archive-mirrors: "cache" -stamp: %S -|} upstream commit commit - - let modified git_kv = - Git_kv.last_modified git_kv Mirage_kv.Key.empty >|= fun r -> - let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in - ptime_to_http_date (Ptime.v v) - - type t = { - mutable commit_id : string ; - mutable modified : string ; - mutable repo : string ; - mutable index : string ; - } - - let create git_kv = - commit_id git_kv >>= fun commit_id -> - modified git_kv >>= fun modified -> - let repo = repo commit_id in - Tarball.of_git repo git_kv >|= fun index -> - { commit_id ; modified ; repo ; index } - let update_lock = Lwt_mutex.create () - let update_git t git_kv = - Lwt_mutex.with_lock update_lock (fun () -> - Logs.info (fun m -> m "pulling the git repository"); - Git_kv.pull git_kv >>= function - | Error `Msg msg -> - Logs.err (fun m -> m "error %s while updating git" msg); - Lwt.return None - | Ok [] -> - Logs.info (fun m -> m "git changes are empty"); - Lwt.return (Some []) - | Ok changes -> - commit_id git_kv >>= fun commit_id -> - modified git_kv >>= fun modified -> - Logs.info (fun m -> m "git: %s" commit_id); - let repo = repo commit_id in - Tarball.of_git repo git_kv >|= fun index -> - t.commit_id <- commit_id ; - t.modified <- modified ; - t.repo <- repo ; - t.index <- index; - Some changes) - let not_modified request (modified, etag) = match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with | Some ts -> String.equal ts modified @@ -615,7 +534,7 @@ stamp: %S let resp = Httpaf.Response.create `Not_modified in respond_with_empty reqd resp else *) - let dispatch t store hook_url update _flow _conn reqd = + let dispatch store hook_url update _flow _conn reqd = let request = Httpaf.Reqd.request reqd in Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target); match String.split_on_char '/' request.Httpaf.Request.target with @@ -625,46 +544,11 @@ stamp: %S let mime_type = "text/plain" in let headers = [ "content-type", mime_type ; - "etag", t.commit_id ; - "last-modified", t.modified ; "content-length", string_of_int (String.length data) ; ] in let headers = Httpaf.Headers.of_list headers in let resp = Httpaf.Response.create ~headers `OK in Httpaf.Reqd.respond_with_string reqd resp data - | [ ""; "repo" ] -> - if not_modified request (t.modified, t.commit_id) then - let resp = Httpaf.Response.create `Not_modified in - respond_with_empty reqd resp - else - let data = t.repo in - let mime_type = "text/plain" in - let headers = [ - "content-type", mime_type ; - "etag", t.commit_id ; - "last-modified", t.modified ; - "content-length", string_of_int (String.length data) ; - ] in - let headers = Httpaf.Headers.of_list headers in - let resp = Httpaf.Response.create ~headers `OK in - Httpaf.Reqd.respond_with_string reqd resp data - | [ ""; "index.tar.gz" ] -> - (* deliver prepared tarball *) - if not_modified request (t.modified, t.commit_id) then - let resp = Httpaf.Response.create `Not_modified in - respond_with_empty reqd resp - else - let data = t.index in - let mime_type = "application/octet-stream" in - let headers = [ - "content-type", mime_type ; - "etag", t.commit_id ; - "last-modified", t.modified ; - "content-length", string_of_int (String.length data) ; - ] in - let headers = Httpaf.Headers.of_list headers in - let resp = Httpaf.Response.create ~headers `OK in - Httpaf.Reqd.respond_with_string reqd resp data | "" :: "cache" :: hash_algo :: _ :: hash :: [] -> (* `//` *) begin @@ -677,7 +561,8 @@ stamp: %S (Disk.last_modified store h hash >|= function | Error _ -> Logs.warn (fun m -> m "error retrieving last modified"); - t.modified + (* XXX *) + ptime_to_http_date (Ptime.v (Pclock.now_d_ps ())) | Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified -> if not_modified request (last_modified, hash) then let resp = Httpaf.Response.create `Not_modified in @@ -754,82 +639,59 @@ stamp: %S Disk.update_caches disk >|= fun () -> Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls)) - let dump_git git_dump git_kv = - Git_kv.to_octets git_kv >>= fun data -> - Cache.write git_dump data >|= function - | Ok () -> - Logs.info (fun m -> m "dumped git %d bytes" (String.length data)) - | Error e -> - Logs.warn (fun m -> m "failed to dump git: %a" Cache.pp_write_error e) - - let restore_git git_dump git_ctx = - Cache.read git_dump >>= function - | Ok None -> Lwt.return (Error ()) - | Error e -> - Logs.warn (fun m -> m "failed to read git state: %a" Cache.pp_error e); - Lwt.return (Error ()) - | Ok Some data -> - Git_kv.of_octets git_ctx ~remote:(Key_gen.remote ()) data >|= function - | Ok git_kv -> Ok git_kv - | Error `Msg msg -> - Logs.err (fun m -> m "error restoring git state: %s" msg); - Error () module Paf = Paf_mirage.Make(Stack.TCP) - let start block _time _pclock stack git_ctx http_ctx = + let start block _time _pclock stack http_ctx = BLOCK.get_info block >>= fun info -> let sectors_cache = Key_gen.sectors_cache () in - let sectors_git = Key_gen.sectors_git () in - let git_start = + let sectors = let cache_size = Int64.(mul 2L sectors_cache) in - Int64.(sub info.size_sectors (add cache_size sectors_git)) + Int64.(sub info.size_sectors cache_size) in - Part.connect git_start block >>= fun (kv, rest) -> - let git_dump, rest = Part.subpartition sectors_git rest in + Part.connect sectors block >>= fun (kv, rest) -> let md5s, sha512s = Part.subpartition sectors_cache rest in KV.connect kv >>= fun kv -> Cache.connect md5s >>= fun md5s -> Cache.connect sha512s >>= fun sha512s -> - Cache.connect git_dump >>= fun git_dump -> Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv)); Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk -> if Key_gen.check () then Lwt.return_unit else begin - restore_git git_dump git_ctx >>= function - | Ok git_kv -> Lwt.return git_kv - | Error () -> - Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv -> - dump_git git_dump git_kv >|= fun () -> - git_kv - end >>= fun git_kv -> - Serve.commit_id git_kv >>= fun commit_id -> - Logs.info (fun m -> m "git: %s" commit_id); - Serve.create git_kv >>= fun serve -> - Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t -> - let update () = - Serve.update_git serve git_kv >>= function - | None | Some [] -> Lwt.return_unit - | Some _changes -> - dump_git git_dump git_kv >>= fun () -> - download_archives disk http_ctx git_kv - in - let service = - Paf.http_service - ~error_handler:(fun _ ?request:_ _ _ -> ()) - (Serve.dispatch serve disk (Key_gen.hook_url ()) update) - in - let `Initialized th = Paf.serve service t in - Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ())); - Lwt.async (fun () -> - let rec go () = - Time.sleep_ns (Duration.of_hour 1) >>= fun () -> - update () >>= fun () -> - go () - in - go ()); - download_archives disk http_ctx git_kv >>= fun () -> - (th >|= fun _v -> ()) + let url = Key_gen.remote () ^ "/index.tar.gz" in + Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t -> + let update () = + Http_mirage_client.one_request + ~alpn_protocol:HTTP.alpn_protocol + ~authenticator:HTTP.authenticator + ~ctx:http_ctx url >>= function + | Ok (resp, Some str) -> + if resp.status = `OK then + download_archives disk http_ctx str + else begin + Logs.warn (fun m -> m "%s: %a (reason %s)" + url H2.Status.pp_hum resp.status resp.reason); + Lwt.return_unit + end + | _ -> Lwt.return_unit + in + let service = + Paf.http_service + ~error_handler:(fun _ ?request:_ _ _ -> ()) + (Serve.dispatch disk (Key_gen.hook_url ()) update) + in + let `Initialized th = Paf.serve service t in + Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ())); + Lwt.async (fun () -> + let rec go () = + Time.sleep_ns (Duration.of_hour 1) >>= fun () -> + update () >>= fun () -> + go () + in + go ()); + update () >>= fun () -> + (th >|= fun _v -> ()) + end end -- 2.47.0