From f62f2a09ce919a766d5bac99df6e80ec0ffbbd42 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Sun, 4 Sep 2022 10:01:45 +0200 Subject: [PATCH] in addition to retrieve stuff, also serve a repository: - archive cache - repository (index.tar.gz) dynamically created from tarball - repo file (generated including the commit id and repository URL) --- mirage/config.ml | 9 +- mirage/http_mirage_client.ml | 10 +- mirage/unikernel.ml | 348 +++++++++++++++++++++++++++++------ 3 files changed, 311 insertions(+), 56 deletions(-) diff --git a/mirage/config.ml b/mirage/config.ml index 7739d89..e39e4a2 100644 --- a/mirage/config.ml +++ b/mirage/config.ml @@ -28,6 +28,10 @@ let remote = in Key.(create "remote" Arg.(opt string "https://github.com/ocaml/opam-repository.git#master" doc)) +let port = + let doc = Key.Arg.info ~doc:"HTTP listen port." ["port"] in + Key.(create "port" Arg.(opt int 80 doc)) + let tls_authenticator = (* this will not look the same in the help printout *) let doc = "TLS host authenticator. See git_http in lib/mirage/mirage.mli for a description of the format." @@ -37,14 +41,15 @@ let tls_authenticator = let mirror = foreign "Unikernel.Make" - ~keys:[ Key.v key_hex ; Key.v check ; Key.v remote ; Key.v tls_authenticator ] + ~keys:[ Key.v key_hex ; Key.v check ; Key.v remote ; Key.v tls_authenticator ; Key.v port ] ~packages:[ - package "paf" ; + package ~min:"0.1.0" ~sublibs:[ "mirage" ] "paf" ; package "h2" ; package "httpaf" ; package ~min:"3.0.0" "irmin-mirage-git" ; package ~min:"3.7.0" "git-paf" ; package "opam-file-format" ; + package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ; ] (kv_rw @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job) diff --git a/mirage/http_mirage_client.ml b/mirage/http_mirage_client.ml index bb76d10..07c6472 100644 --- a/mirage/http_mirage_client.ml +++ b/mirage/http_mirage_client.ml @@ -129,6 +129,7 @@ let prepare_http_1_1_headers headers host user_pass body_length = add_authentication ~add headers user_pass let single_http_1_1_request ~sleep ?config flow user_pass host meth path headers body = + Logs.info (fun m -> m "http 1.1 request %s path %s" host path); let body_length = Option.map String.length body in let headers = prepare_http_1_1_headers headers host user_pass body_length in let req = Httpaf.Request.create ~headers meth path in @@ -166,7 +167,9 @@ let single_http_1_1_request ~sleep ?config flow user_pass host meth path headers Lwt.async (fun () -> Paf.run (module HTTP_1_1) ~sleep conn flow) ; Option.iter (Httpaf.Body.write_string request_body) body ; Httpaf.Body.close_writer request_body ; - finished + finished >|= fun r -> + Logs.info (fun m -> m "http 1.1 request %s path %s finished" host path); + r let prepare_h2_headers headers host user_pass body_length = let headers = H2.Headers.of_list headers in @@ -176,6 +179,7 @@ let prepare_h2_headers headers host user_pass body_length = add_authentication ~add headers user_pass let single_h2_request ~sleep ?config ~scheme flow user_pass host meth path headers body = + Logs.info (fun m -> m "http2 request %s path %s" host path); let body_length = Option.map String.length body in let headers = prepare_h2_headers headers host user_pass body_length in let req = H2.Request.create ~scheme ~headers meth path in @@ -220,7 +224,9 @@ let single_h2_request ~sleep ?config ~scheme flow user_pass host meth path heade Option.iter (H2.Body.Writer.write_string request_body) body ; H2.Body.Writer.close request_body ; finished >|= fun v -> - H2.Client_connection.shutdown conn ; v + H2.Client_connection.shutdown conn ; + Logs.info (fun m -> m "http2 request %s path %s finished" host path); + v let decode_uri ~ctx uri = let ( >>= ) = Result.bind in diff --git a/mirage/unikernel.ml b/mirage/unikernel.ml index c85c56b..f757935 100644 --- a/mirage/unikernel.ml +++ b/mirage/unikernel.ml @@ -35,6 +35,12 @@ module Make | `SHA384 -> "sha384" | `SHA512 -> "sha512" + let hash_of_string = function + | "md5" -> Ok `MD5 + | "sha256" -> Ok `SHA256 + | "sha512" -> Ok `SHA512 + | h -> Error (`Msg ("unknown hash algorithm: " ^ h)) + let hex_to_string h = let `Hex h = Hex.of_string h in h @@ -73,7 +79,7 @@ module Make Logs.info (fun m -> m "pulling from remote!"); Sync.pull ~depth:1 store upstream `Set >|= fun r -> match r with - | Ok (`Head _ as s) -> Ok (Fmt.str "pulled %a" Sync.pp_status s) + | Ok (`Head c as s) -> Ok (c, Fmt.str "pulled %a" Sync.pp_status s) | Ok `Empty -> Error (`Msg "pulled empty repository") | Error (`Msg e) -> Error (`Msg ("pull error " ^ e)) | Error (`Conflict msg) -> Error (`Msg ("pull conflict " ^ msg)) @@ -91,9 +97,7 @@ module Make | Some `Contents -> Lwt.return (full_path :: acc) | Some `Node -> go store full_path acc) acc steps in - go store [] [] >|= fun contents -> - Logs.info (fun m -> m "%d contents" (List.length contents)); - contents + go store [] [] let decode_digest filename str = let hex h s = @@ -218,9 +222,7 @@ module Make Logs.warn (fun m -> m "some error in %s, ignoring" (String.concat "/" path)); acc) | None -> acc) - SM.empty opam_paths >|= fun urls -> - Logs.info (fun m -> m "map contains %d urls" (SM.cardinal urls)); - urls + SM.empty opam_paths end module Disk = struct @@ -276,9 +278,7 @@ module Make and sha512s = SM.add sha512 name t.sha512s in t.md5s <- md5s ; t.sha512s <- sha512s; - Logs.info (fun m -> m "added %s (md5 %s sha512 %s)" - (key_to_string t name) (key_to_string t md5) - (key_to_string t sha512)); + Logs.debug (fun m -> m "added %s" (key_to_string t name)); Lwt.return_unit end else begin Logs.err (fun m -> m "corrupt data, expected %s, read %s" @@ -297,7 +297,7 @@ module Make entries >|= fun () -> t - let write t data hm = + let write t ~url data hm = let cs = Cstruct.of_string data in let sha256 = Mirage_crypto.Hash.digest `SHA256 cs |> key t and md5 = Mirage_crypto.Hash.digest `MD5 cs |> key t @@ -312,7 +312,7 @@ module Make if String.equal v v' then true else begin - Logs.err (fun m -> m "hash mismatch %s: expected %s, got %s" + Logs.err (fun m -> m "%s hash mismatch %s: expected %s, got %s" url (hash_to_string h) (key_to_string t v) (key_to_string t v')); false end) hm @@ -324,8 +324,8 @@ module Make Logs.debug (fun m -> m "wrote %s (%d bytes)" (key_to_string t sha256) (String.length data)) | Error e -> - Logs.err (fun m -> m "error %a while writing %s" - KV.pp_write_error e (key_to_string t sha256)) + Logs.err (fun m -> m "error %a while writing %s (key %s)" + KV.pp_write_error e url (key_to_string t sha256)) end else Lwt.return_unit @@ -340,7 +340,7 @@ module Make | _ -> None with | None -> - Logs.err (fun m -> m "couldn't find %s" v); + (* Logs.err (fun m -> m "couldn't find %s" v); *) Error `Not_found | Some x -> Ok x @@ -371,50 +371,294 @@ module Make Logs.err (fun m -> m "error %a while reading %s %s" KV.pp_error e (hash_to_string h) v); Error `Not_found + + let last_modified t h v = + match find_key t h v with + | Error _ as e -> Lwt.return e + | Ok x -> + KV.last_modified t.dev (Mirage_kv.Key.v x) >|= function + | Ok data -> Ok data + | Error e -> + Logs.err (fun m -> m "error %a while reading %s %s" + KV.pp_error e (hash_to_string h) v); + Error `Not_found end - let one_request = Http_mirage_client.one_request ~alpn_protocol:HTTP.alpn_protocol - ~authenticator:HTTP.authenticator + module Tarball = struct + module Async = struct + type 'a t = 'a + let ( >>= ) x f = f x + let return x = x + end - let start kv _time _pclock _stack git_ctx http_ctx = + module Writer = struct + type out_channel = Buffer.t + type 'a t = 'a + let really_write buf data = + Buffer.add_string buf (Cstruct.to_string data) + end + + (* That's not very interesting here, we just ignore everything*) + module Reader = struct + type in_channel = unit + type 'a t = 'a + let really_read _in _data = () + let skip _in _len = () + let read _in _data = 0 + end + + module Tar_Gz = Tar_gz.Make (Async)(Writer)(Reader) + + let of_git repo store = + let out_channel = Buffer.create 1024 in + let now = Ptime.v (Pclock.now_d_ps ()) in + let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in + let gz_out = + Tar_Gz.of_out_channel ~level:4 ~mtime:(Int32.of_int mtime) + Gz.Unix out_channel + in + Git.find_contents store >>= fun paths -> + Lwt_list.iter_s (fun path -> + Store.find store path >|= function + | Some data -> + let data = + if path = [ "repo" ] then repo else data + in + let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *) + and mod_time = Int64.of_int mtime + and user_id = 0 + and group_id = 0 + and size = String.length data + in + let hdr = + Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id + (String.concat "/" path) (Int64.of_int size) + in + let o = ref false in + let stream () = if !o then None else (o := true; Some data) in + Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream + | None -> ()) + paths >|= fun () -> + Tar_Gz.write_end gz_out; + Buffer.contents out_channel + end + + module Serve = struct + let ptime_to_http_date ptime = + let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime + and weekday = match Ptime.weekday ptime with + | `Mon -> "Mon" | `Tue -> "Tue" | `Wed -> "Wed" | `Thu -> "Thu" + | `Fri -> "Fri" | `Sat -> "Sat" | `Sun -> "Sun" + and month = + [| "Jan" ; "Feb" ; "Mar" ; "Apr" ; "May" ; "Jun" ; + "Jul" ; "Aug" ; "Sep" ; "Oct" ; "Nov" ; "Dec" |] + in + let m' = Array.get month (pred m) in + Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss + + let commit_id commit = + Fmt.to_to_string (Irmin.Type.pp Store.Hash.t) (Store.Commit.hash commit) + + let repo commit = + let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) + and commit = commit_id commit + in + Fmt.str + {|opam-version: "2.0" +upstream: "%s#%s" +archive-mirrors: "cache" +stamp: %S +|} upstream commit commit + + let modified commit = + let info = Store.Commit.info commit in + let ptime = + Option.value ~default:(Ptime.v (Pclock.now_d_ps ())) + (Ptime.of_float_s (Int64.to_float (Store.Info.date info))) + in + ptime_to_http_date ptime + + type t = { + commit_id : string ; + modified : string ; + repo : string ; + index : string ; + } + + let create commit repo index = + let commit_id = commit_id commit + and modified = modified commit + in + { commit_id ; modified ; repo ; index } + + let not_modified request (modified, etag) = + match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with + | Some ts -> String.equal ts modified + | None -> match Httpaf.Headers.get request.Httpaf.Request.headers "if-none-match" with + | Some etags -> List.mem etag (String.split_on_char ',' etags) + | None -> false + + let not_found reqd path = + let data = "Resource not found " ^ path in + let headers = Httpaf.Headers.of_list + [ "content-length", string_of_int (String.length data) ] in + let resp = Httpaf.Response.create ~headers `Not_found in + Httpaf.Reqd.respond_with_string reqd resp data + + let respond_with_empty reqd resp = + let hdr = + Httpaf.Headers.add_unless_exists resp.Httpaf.Response.headers + "connection" "close" + in + let resp = { resp with Httpaf.Response.headers = hdr } in + Httpaf.Reqd.respond_with_string reqd resp "" + + (* From the OPAM manual, all we need: + /repo -- repository configuration file + /cache -- cached archives + /index.tar.gz -- archive containing the whole repository contents + *) + (* may include "announce: [ string { filter } ... ]" *) + (* use Key_gen.remote for browse & upstream *) + + (* for repo and index.tar.gz: + if Last_modified.not_modified request then + let resp = Httpaf.Response.create `Not_modified in + respond_with_empty reqd resp + else *) + let dispatch t store _flow _conn reqd = + let request = Httpaf.Reqd.request reqd in + Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target); + match String.split_on_char '/' request.Httpaf.Request.target with + | [ ""; "repo" ] -> + if not_modified request (t.modified, t.commit_id) then + let resp = Httpaf.Response.create `Not_modified in + respond_with_empty reqd resp + else + let data = t.repo in + let mime_type = "text/plain" in + let headers = [ + "content-type", mime_type ; + "etag", t.commit_id ; + "last-modified", t.modified ; + "content-length", string_of_int (String.length data) ; + ] in + let headers = Httpaf.Headers.of_list headers in + let resp = Httpaf.Response.create ~headers `OK in + Httpaf.Reqd.respond_with_string reqd resp data + | [ ""; "index.tar.gz" ] -> + (* deliver prepared tarball *) + if not_modified request (t.modified, t.commit_id) then + let resp = Httpaf.Response.create `Not_modified in + respond_with_empty reqd resp + else + let data = t.index in + let mime_type = "application/octet-stream" in + let headers = [ + "content-type", mime_type ; + "etag", t.commit_id ; + "last-modified", t.modified ; + "content-length", string_of_int (String.length data) ; + ] in + let headers = Httpaf.Headers.of_list headers in + let resp = Httpaf.Response.create ~headers `OK in + Httpaf.Reqd.respond_with_string reqd resp data + | "" :: "cache" :: hash_algo :: _ :: hash :: [] -> + (* `//` *) + begin + match hash_of_string hash_algo with + | Error `Msg msg -> + Logs.warn (fun m -> m "error decoding hash algo: %s" msg); + not_found reqd request.Httpaf.Request.target + | Ok h -> + Lwt.async (fun () -> + (Disk.last_modified store h hash >|= function + | Error _ -> + Logs.warn (fun m -> m "error retrieving last modified"); + t.modified + | Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified -> + if not_modified request (last_modified, hash) then + let resp = Httpaf.Response.create `Not_modified in + respond_with_empty reqd resp; + Lwt.return_unit + else + Disk.read store h hash >>= function + | Error _ -> + not_found reqd request.Httpaf.Request.target; + Lwt.return_unit + | Ok data -> + let mime_type = "application/octet-stream" in + let headers = [ + "content-type", mime_type ; + "etag", hash ; + "last-modified", last_modified ; + "content-length", string_of_int (String.length data) ; + ] in + let headers = Httpaf.Headers.of_list headers in + let resp = Httpaf.Response.create ~headers `OK in + Httpaf.Reqd.respond_with_string reqd resp data ; + Lwt.return_unit) + end + | _ -> + Logs.warn (fun m -> m "unknown request %s" request.Httpaf.Request.target); + not_found reqd request.Httpaf.Request.target + + end + + let download_archives disk http_ctx store = + Git.find_urls store >>= fun urls -> + let pool = Lwt_pool.create 20 (Fun.const Lwt.return_unit) in + Lwt_list.iter_p (fun (url, csums) -> + Lwt_pool.use pool @@ fun () -> + HM.fold (fun h v r -> + r >>= function + | true -> Disk.exists disk h (hex_to_string v) + | false -> Lwt.return false) + csums (Lwt.return true) >>= function + | true -> + Logs.debug (fun m -> m "ignoring %s (already present)" url); + Lwt.return_unit + | false -> + Logs.debug (fun m -> m "downloading %s" url); + Http_mirage_client.one_request + ~alpn_protocol:HTTP.alpn_protocol + ~authenticator:HTTP.authenticator + ~ctx:http_ctx url >>= function + | Ok (resp, Some str) -> + if resp.status = `OK then begin + Logs.info (fun m -> m "downloaded %s" url); + Disk.write disk ~url str csums + end else begin + Logs.warn (fun m -> m "%s: %a (reason %s)" + url H2.Status.pp_hum resp.status resp.reason); + Lwt.return_unit + end + | _ -> Lwt.return_unit) + (SM.bindings urls) + + module Paf = Paf_mirage.Make(Time)(Stack.TCP) + + let start kv _time _pclock stack git_ctx http_ctx = let key_hex = Key_gen.key_hex () in Disk.init ~key_hex kv >>= fun disk -> - if Key_gen.check () then begin - Logs.info (fun m -> m "done"); - Lwt.return_unit - end else + if Key_gen.check () then Lwt.return_unit + else Git.connect git_ctx >>= fun (store, upstream) -> Git.pull store upstream >>= function | Error `Msg msg -> Lwt.fail_with msg - | Ok msg -> + | Ok (commit, msg) -> Logs.info (fun m -> m "git: %s" msg); - Git.find_urls store >>= fun urls -> - let pool = Lwt_pool.create 20 (Fun.const Lwt.return_unit) in - Lwt_list.iter_p (fun (url, csums) -> - Lwt_pool.use pool @@ fun () -> - HM.fold (fun h v r -> - r >>= function - | true -> Disk.exists disk h (hex_to_string v) - | false -> Lwt.return false) - csums (Lwt.return true) >>= function - | true -> - Logs.debug (fun m -> m "ignoring %s (already present)" url); - Lwt.return_unit - | false -> - Logs.info (fun m -> m "downloading %s" url); - one_request ~ctx:http_ctx url >>= function - | Ok (resp, Some str) -> - Logs.info (fun m -> m "downloaded %s" url); - if resp.status = `OK then - Disk.write disk str csums - else begin - Logs.warn (fun m -> m "received for %s: %a (reason %s) (headers %a)" - url H2.Status.pp_hum resp.status resp.reason - H2.Headers.pp_hum resp.headers - ); - Lwt.return_unit - end - | _ -> Lwt.return_unit) - (SM.bindings urls) >|= fun () -> - Logs.info (fun m -> m "done") + let repo = Serve.repo commit in + Tarball.of_git repo store >>= fun index -> + let serve = Serve.create commit repo index in + Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t -> + let service = + Paf.http_service + ~error_handler:(fun _ ?request:_ _ _ -> ()) + (Serve.dispatch serve disk) + in + let `Initialized th = Paf.serve service t in + Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ())); + download_archives disk http_ctx store >>= fun () -> + (th >|= fun _v -> ()) end