WIP: Cache only version #25
2 changed files with 141 additions and 287 deletions
|
@ -18,11 +18,10 @@ let verify =
|
|||
let remote =
|
||||
let doc =
|
||||
Key.Arg.info
|
||||
~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \
|
||||
https://github.com/ocaml/opam-repository.git"
|
||||
~doc:"Remote repository url"
|
||||
["remote"]
|
||||
in
|
||||
Key.(create "remote" Arg.(opt string "https://github.com/ocaml/opam-repository.git#master" doc))
|
||||
Key.(create "remote" Arg.(opt string "https://opam.ocaml.org" doc))
|
||||
|
||||
let parallel_downloads =
|
||||
let doc =
|
||||
|
@ -35,7 +34,7 @@ let parallel_downloads =
|
|||
let hook_url =
|
||||
let doc =
|
||||
Key.Arg.info
|
||||
~doc:"URL to conduct an update of the git repository" ["hook-url"]
|
||||
~doc:"URL to conduct an update of the opam repository" ["hook-url"]
|
||||
in
|
||||
Key.(create "hook-url" Arg.(opt string "update" doc))
|
||||
|
||||
|
@ -55,29 +54,23 @@ let sectors_cache =
|
|||
let doc = Key.Arg.info ~doc ["sectors-cache"] in
|
||||
Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc))
|
||||
|
||||
let sectors_git =
|
||||
let doc = "Number of sectors reserved for git dump." in
|
||||
let doc = Key.Arg.info ~doc ["sectors-git"] in
|
||||
Key.(create "sectors-git" Arg.(opt int64 Int64.(mul 40L (mul 2L 1024L)) doc))
|
||||
|
||||
let mirror =
|
||||
foreign "Unikernel.Make"
|
||||
~keys:[ Key.v check ; Key.v verify ; Key.v remote ;
|
||||
Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ;
|
||||
Key.v port ; Key.v sectors_cache ; Key.v sectors_git ; ]
|
||||
Key.v port ; Key.v sectors_cache ; ]
|
||||
~packages:[
|
||||
package ~min:"0.2.0" ~sublibs:[ "mirage" ] "paf" ;
|
||||
package "h2" ;
|
||||
package "httpaf" ;
|
||||
package ~pin:"git+https://git.robur.io/robur/git-kv.git#main" "git-kv" ;
|
||||
package ~min:"3.7.0" "git-paf" ;
|
||||
package "opam-file-format" ;
|
||||
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
||||
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" ;
|
||||
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ;
|
||||
package "mirage-block-partition" ;
|
||||
package "oneffs" ;
|
||||
package "hex" ;
|
||||
]
|
||||
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job)
|
||||
(block @-> time @-> pclock @-> stackv4v6 @-> http_client @-> job)
|
||||
|
||||
let stack = generic_stackv4v6 default_network
|
||||
|
||||
|
@ -98,10 +91,9 @@ let http_client =
|
|||
2 instances of happy-eyeballs will exists together which is not really good
|
||||
(it puts a pressure on the scheduler). *)
|
||||
|
||||
let git_client, http_client =
|
||||
let happy_eyeballs = git_happy_eyeballs stack dns (generic_happy_eyeballs stack dns) in
|
||||
merge_git_clients (git_tcp tcp happy_eyeballs)
|
||||
(git_http ~authenticator:tls_authenticator tcp happy_eyeballs),
|
||||
let http_client =
|
||||
let happy_eyeballs = generic_happy_eyeballs stack dns in
|
||||
let happy_eyeballs = git_happy_eyeballs stack dns happy_eyeballs in
|
||||
http_client $ default_time $ default_posix_clock $ tcp $ happy_eyeballs
|
||||
|
||||
let program_block_size =
|
||||
|
@ -111,4 +103,4 @@ let program_block_size =
|
|||
let block = block_of_file "tar"
|
||||
|
||||
let () = register "mirror"
|
||||
[ mirror $ block $ default_time $ default_posix_clock $ stack $ git_client $ http_client ]
|
||||
[ mirror $ block $ default_time $ default_posix_clock $ stack $ http_client ]
|
||||
|
|
|
@ -2,12 +2,13 @@ open Lwt.Infix
|
|||
|
||||
let argument_error = 64
|
||||
|
||||
external reraise : exn -> 'a = "%reraise"
|
||||
|
||||
module Make
|
||||
(BLOCK : Mirage_block.S)
|
||||
(Time : Mirage_time.S)
|
||||
(Pclock : Mirage_clock.PCLOCK)
|
||||
(Stack : Tcpip.Stack.V4V6)
|
||||
(_ : sig end)
|
||||
(HTTP : Http_mirage_client.S) = struct
|
||||
|
||||
module Part = Mirage_block_partition.Make(BLOCK)
|
||||
|
@ -51,29 +52,6 @@ module Make
|
|||
hm ""
|
||||
|
||||
module Git = struct
|
||||
let find_contents store =
|
||||
let rec go store path acc =
|
||||
Git_kv.list store path >>= function
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a while listing %a"
|
||||
Git_kv.pp_error e Mirage_kv.Key.pp path);
|
||||
Lwt.return acc
|
||||
| Ok steps ->
|
||||
Lwt_list.fold_left_s (fun acc (step, _) ->
|
||||
let full_path = Mirage_kv.Key.add path step in
|
||||
Git_kv.exists store full_path >>= function
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %a for exists %a" Git_kv.pp_error e
|
||||
Mirage_kv.Key.pp full_path);
|
||||
Lwt.return acc
|
||||
| Ok None ->
|
||||
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path);
|
||||
Lwt.return acc
|
||||
| Ok Some `Value -> Lwt.return (full_path :: acc)
|
||||
| Ok Some `Dictionary -> go store full_path acc) acc steps
|
||||
in
|
||||
go store Mirage_kv.Key.empty []
|
||||
|
||||
let decode_digest filename str =
|
||||
let hex h s =
|
||||
match hex_of_string s with
|
||||
|
@ -162,40 +140,94 @@ module Make
|
|||
end
|
||||
| _ -> Logs.debug (fun m -> m "no url section for %s" filename); None
|
||||
|
||||
let find_urls store =
|
||||
find_contents store >>= fun paths ->
|
||||
let opam_paths =
|
||||
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths
|
||||
module Null_write = struct
|
||||
(* we have to provide a WRITER, but we don't actually want to write. *)
|
||||
type out_channel = |
|
||||
type 'a t = 'a Lwt.t
|
||||
let really_write o _ = match o with (_ : out_channel) -> .
|
||||
end
|
||||
module String_read = struct
|
||||
type in_channel = {
|
||||
data : string;
|
||||
mutable offset : int;
|
||||
}
|
||||
type 'a t = 'a Lwt.t
|
||||
let really_read t buf =
|
||||
let buf_len = Cstruct.length buf in
|
||||
if String.length t.data - t.offset < buf_len then
|
||||
raise Stdlib.End_of_file;
|
||||
Cstruct.blit_from_string t.data t.offset buf 0 buf_len;
|
||||
t.offset <- t.offset + buf_len;
|
||||
Lwt.pause ()
|
||||
let read t buf =
|
||||
let len = min (String.length t.data - t.offset) (Cstruct.length buf) in
|
||||
Cstruct.blit_from_string t.data t.offset buf 0 len;
|
||||
Lwt.pause () >>= fun () ->
|
||||
t.offset <- t.offset + len;
|
||||
Lwt.return len
|
||||
let skip t n =
|
||||
t.offset <- t.offset + n; Lwt.pause ()
|
||||
end
|
||||
module Index = Tar_gz.Make(Lwt)(Null_write)(String_read)
|
||||
|
||||
let find_urls index =
|
||||
let ic =
|
||||
Index.of_in_channel ~internal:(Cstruct.create 0x1000)
|
||||
{ String_read.data = index ; offset = 0 ; }
|
||||
in
|
||||
Lwt_list.fold_left_s (fun acc path ->
|
||||
Git_kv.get store path >|= function
|
||||
| Ok data ->
|
||||
(* TODO report parser errors *)
|
||||
(try
|
||||
let url_csums = extract_urls (Mirage_kv.Key.to_string path) data in
|
||||
Option.fold ~none:acc ~some:(fun (url, csums) ->
|
||||
if HM.cardinal csums = 0 then
|
||||
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
|
||||
else
|
||||
SM.update url (function
|
||||
| None -> Some csums
|
||||
| Some csums' ->
|
||||
if HM.for_all (fun h v ->
|
||||
match HM.find_opt h csums with
|
||||
| None -> true | Some v' -> String.equal v v')
|
||||
csums'
|
||||
then
|
||||
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
||||
else begin
|
||||
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
|
||||
url (hm_to_s csums') (hm_to_s csums));
|
||||
None
|
||||
end) acc) url_csums
|
||||
with _ ->
|
||||
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
|
||||
acc)
|
||||
| Error e -> Logs.warn (fun m -> m "Git_kv.get: %a" Git_kv.pp_error e); acc)
|
||||
SM.empty opam_paths
|
||||
let rec do_it acc =
|
||||
Lwt.try_bind
|
||||
(fun () -> Index.get_next_header ic)
|
||||
(fun hdr ->
|
||||
(* XXX: Int64.to_int *)
|
||||
let file_size = Int64.to_int hdr.file_size in
|
||||
Logs.debug (fun m -> m "%s: %Ld" hdr.file_name hdr.file_size);
|
||||
if String.starts_with hdr.Tar.Header.file_name ~prefix:"/packages/" &&
|
||||
String.ends_with hdr.Tar.Header.file_name ~suffix:"/opam" then begin
|
||||
let buf = Cstruct.create file_size in
|
||||
Index.really_read ic buf >>= fun () ->
|
||||
Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () ->
|
||||
let data = Cstruct.to_string buf in
|
||||
(* TODO report parser errors *)
|
||||
let acc =
|
||||
try
|
||||
let url_csums = extract_urls hdr.file_name data in
|
||||
Option.fold ~none:acc ~some:(fun (url, csums) ->
|
||||
if HM.cardinal csums = 0 then
|
||||
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
|
||||
else
|
||||
SM.update url (function
|
||||
| None -> Some csums
|
||||
| Some csums' ->
|
||||
if HM.for_all (fun h v ->
|
||||
match HM.find_opt h csums with
|
||||
| None -> true | Some v' -> String.equal v v')
|
||||
csums'
|
||||
then
|
||||
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
||||
else begin
|
||||
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
|
||||
url (hm_to_s csums') (hm_to_s csums));
|
||||
None
|
||||
end) acc) url_csums
|
||||
with _ ->
|
||||
Logs.warn (fun m -> m "some error in %s, ignoring" hdr.file_name);
|
||||
acc
|
||||
in
|
||||
(do_it [@tailcall]) acc
|
||||
end else begin
|
||||
Logs.debug (fun m -> m "skipping %d bytes" file_size);
|
||||
Index.skip ic file_size >>= fun () ->
|
||||
Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () ->
|
||||
(do_it [@tailcall]) acc
|
||||
end
|
||||
)
|
||||
(function
|
||||
| Tar.Header.End_of_stream ->
|
||||
Lwt.return acc
|
||||
| e -> reraise e)
|
||||
in
|
||||
do_it SM.empty
|
||||
end
|
||||
|
||||
module Disk = struct
|
||||
|
@ -452,65 +484,6 @@ module Make
|
|||
Error `Not_found
|
||||
end
|
||||
|
||||
module Tarball = struct
|
||||
module Async = struct
|
||||
type 'a t = 'a
|
||||
let ( >>= ) x f = f x
|
||||
let return x = x
|
||||
end
|
||||
|
||||
module Writer = struct
|
||||
type out_channel = Buffer.t
|
||||
type 'a t = 'a
|
||||
let really_write buf data =
|
||||
Buffer.add_string buf (Cstruct.to_string data)
|
||||
end
|
||||
|
||||
(* That's not very interesting here, we just ignore everything*)
|
||||
module Reader = struct
|
||||
type in_channel = unit
|
||||
type 'a t = 'a
|
||||
let really_read _in _data = ()
|
||||
let skip _in _len = ()
|
||||
let read _in _data = 0
|
||||
end
|
||||
|
||||
module Tar_Gz = Tar_gz.Make (Async)(Writer)(Reader)
|
||||
|
||||
let of_git repo store =
|
||||
let out_channel = Buffer.create 1024 in
|
||||
let now = Ptime.v (Pclock.now_d_ps ()) in
|
||||
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
||||
let gz_out =
|
||||
Tar_Gz.of_out_channel ~level:4 ~mtime:(Int32.of_int mtime)
|
||||
Gz.Unix out_channel
|
||||
in
|
||||
Git.find_contents store >>= fun paths ->
|
||||
Lwt_list.iter_s (fun path ->
|
||||
Git_kv.get store path >|= function
|
||||
| Ok data ->
|
||||
let data =
|
||||
if Mirage_kv.Key.(equal path (v "repo")) then repo else data
|
||||
in
|
||||
let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *)
|
||||
and mod_time = Int64.of_int mtime
|
||||
and user_id = 0
|
||||
and group_id = 0
|
||||
and size = String.length data
|
||||
in
|
||||
let hdr =
|
||||
Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
|
||||
(Mirage_kv.Key.to_string path) (Int64.of_int size)
|
||||
in
|
||||
let o = ref false in
|
||||
let stream () = if !o then None else (o := true; Some data) in
|
||||
Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream
|
||||
| Error e -> Logs.warn (fun m -> m "Git_kv error: %a" Git_kv.pp_error e))
|
||||
paths >|= fun () ->
|
||||
Tar_Gz.write_end gz_out;
|
||||
Buffer.contents out_channel
|
||||
end
|
||||
|
||||
module Serve = struct
|
||||
let ptime_to_http_date ptime =
|
||||
let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime
|
||||
|
@ -524,62 +497,8 @@ module Make
|
|||
let m' = Array.get month (pred m) in
|
||||
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
|
||||
|
||||
let commit_id git_kv =
|
||||
Git_kv.digest git_kv Mirage_kv.Key.empty >|= fun r ->
|
||||
Result.get_ok r
|
||||
|
||||
let repo commit =
|
||||
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
|
||||
Fmt.str
|
||||
{|opam-version: "2.0"
|
||||
upstream: "%s#%s"
|
||||
archive-mirrors: "cache"
|
||||
stamp: %S
|
||||
|} upstream commit commit
|
||||
|
||||
let modified git_kv =
|
||||
Git_kv.last_modified git_kv Mirage_kv.Key.empty >|= fun r ->
|
||||
let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in
|
||||
ptime_to_http_date (Ptime.v v)
|
||||
|
||||
type t = {
|
||||
mutable commit_id : string ;
|
||||
mutable modified : string ;
|
||||
mutable repo : string ;
|
||||
mutable index : string ;
|
||||
}
|
||||
|
||||
let create git_kv =
|
||||
commit_id git_kv >>= fun commit_id ->
|
||||
modified git_kv >>= fun modified ->
|
||||
let repo = repo commit_id in
|
||||
Tarball.of_git repo git_kv >|= fun index ->
|
||||
{ commit_id ; modified ; repo ; index }
|
||||
|
||||
let update_lock = Lwt_mutex.create ()
|
||||
|
||||
let update_git t git_kv =
|
||||
Lwt_mutex.with_lock update_lock (fun () ->
|
||||
Logs.info (fun m -> m "pulling the git repository");
|
||||
Git_kv.pull git_kv >>= function
|
||||
| Error `Msg msg ->
|
||||
Logs.err (fun m -> m "error %s while updating git" msg);
|
||||
Lwt.return None
|
||||
| Ok [] ->
|
||||
Logs.info (fun m -> m "git changes are empty");
|
||||
Lwt.return (Some [])
|
||||
| Ok changes ->
|
||||
commit_id git_kv >>= fun commit_id ->
|
||||
modified git_kv >>= fun modified ->
|
||||
Logs.info (fun m -> m "git: %s" commit_id);
|
||||
let repo = repo commit_id in
|
||||
Tarball.of_git repo git_kv >|= fun index ->
|
||||
t.commit_id <- commit_id ;
|
||||
t.modified <- modified ;
|
||||
t.repo <- repo ;
|
||||
t.index <- index;
|
||||
Some changes)
|
||||
|
||||
let not_modified request (modified, etag) =
|
||||
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
||||
| Some ts -> String.equal ts modified
|
||||
|
@ -615,7 +534,7 @@ stamp: %S
|
|||
let resp = Httpaf.Response.create `Not_modified in
|
||||
respond_with_empty reqd resp
|
||||
else *)
|
||||
let dispatch t store hook_url update _flow _conn reqd =
|
||||
let dispatch store hook_url update _flow _conn reqd =
|
||||
let request = Httpaf.Reqd.request reqd in
|
||||
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
||||
match String.split_on_char '/' request.Httpaf.Request.target with
|
||||
|
@ -625,46 +544,11 @@ stamp: %S
|
|||
let mime_type = "text/plain" in
|
||||
let headers = [
|
||||
"content-type", mime_type ;
|
||||
"etag", t.commit_id ;
|
||||
"last-modified", t.modified ;
|
||||
"content-length", string_of_int (String.length data) ;
|
||||
] in
|
||||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
Httpaf.Reqd.respond_with_string reqd resp data
|
||||
| [ ""; "repo" ] ->
|
||||
if not_modified request (t.modified, t.commit_id) then
|
||||
let resp = Httpaf.Response.create `Not_modified in
|
||||
respond_with_empty reqd resp
|
||||
else
|
||||
let data = t.repo in
|
||||
let mime_type = "text/plain" in
|
||||
let headers = [
|
||||
"content-type", mime_type ;
|
||||
"etag", t.commit_id ;
|
||||
"last-modified", t.modified ;
|
||||
"content-length", string_of_int (String.length data) ;
|
||||
] in
|
||||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
Httpaf.Reqd.respond_with_string reqd resp data
|
||||
| [ ""; "index.tar.gz" ] ->
|
||||
(* deliver prepared tarball *)
|
||||
if not_modified request (t.modified, t.commit_id) then
|
||||
let resp = Httpaf.Response.create `Not_modified in
|
||||
respond_with_empty reqd resp
|
||||
else
|
||||
let data = t.index in
|
||||
let mime_type = "application/octet-stream" in
|
||||
let headers = [
|
||||
"content-type", mime_type ;
|
||||
"etag", t.commit_id ;
|
||||
"last-modified", t.modified ;
|
||||
"content-length", string_of_int (String.length data) ;
|
||||
] in
|
||||
let headers = Httpaf.Headers.of_list headers in
|
||||
let resp = Httpaf.Response.create ~headers `OK in
|
||||
Httpaf.Reqd.respond_with_string reqd resp data
|
||||
| "" :: "cache" :: hash_algo :: _ :: hash :: [] ->
|
||||
(* `<hash-algo>/<first-2-hash-characters>/<hash>` *)
|
||||
begin
|
||||
|
@ -677,7 +561,8 @@ stamp: %S
|
|||
(Disk.last_modified store h hash >|= function
|
||||
| Error _ ->
|
||||
Logs.warn (fun m -> m "error retrieving last modified");
|
||||
t.modified
|
||||
(* XXX *)
|
||||
ptime_to_http_date (Ptime.v (Pclock.now_d_ps ()))
|
||||
| Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified ->
|
||||
if not_modified request (last_modified, hash) then
|
||||
let resp = Httpaf.Response.create `Not_modified in
|
||||
|
@ -754,82 +639,59 @@ stamp: %S
|
|||
Disk.update_caches disk >|= fun () ->
|
||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||
|
||||
let dump_git git_dump git_kv =
|
||||
Git_kv.to_octets git_kv >>= fun data ->
|
||||
Cache.write git_dump data >|= function
|
||||
| Ok () ->
|
||||
Logs.info (fun m -> m "dumped git %d bytes" (String.length data))
|
||||
| Error e ->
|
||||
Logs.warn (fun m -> m "failed to dump git: %a" Cache.pp_write_error e)
|
||||
|
||||
let restore_git git_dump git_ctx =
|
||||
Cache.read git_dump >>= function
|
||||
| Ok None -> Lwt.return (Error ())
|
||||
| Error e ->
|
||||
Logs.warn (fun m -> m "failed to read git state: %a" Cache.pp_error e);
|
||||
Lwt.return (Error ())
|
||||
| Ok Some data ->
|
||||
Git_kv.of_octets git_ctx ~remote:(Key_gen.remote ()) data >|= function
|
||||
| Ok git_kv -> Ok git_kv
|
||||
| Error `Msg msg ->
|
||||
Logs.err (fun m -> m "error restoring git state: %s" msg);
|
||||
Error ()
|
||||
|
||||
module Paf = Paf_mirage.Make(Stack.TCP)
|
||||
|
||||
let start block _time _pclock stack git_ctx http_ctx =
|
||||
let start block _time _pclock stack http_ctx =
|
||||
BLOCK.get_info block >>= fun info ->
|
||||
let sectors_cache = Key_gen.sectors_cache () in
|
||||
let sectors_git = Key_gen.sectors_git () in
|
||||
let git_start =
|
||||
let sectors =
|
||||
let cache_size = Int64.(mul 2L sectors_cache) in
|
||||
Int64.(sub info.size_sectors (add cache_size sectors_git))
|
||||
Int64.(sub info.size_sectors cache_size)
|
||||
in
|
||||
Part.connect git_start block >>= fun (kv, rest) ->
|
||||
let git_dump, rest = Part.subpartition sectors_git rest in
|
||||
Part.connect sectors block >>= fun (kv, rest) ->
|
||||
let md5s, sha512s = Part.subpartition sectors_cache rest in
|
||||
KV.connect kv >>= fun kv ->
|
||||
Cache.connect md5s >>= fun md5s ->
|
||||
Cache.connect sha512s >>= fun sha512s ->
|
||||
Cache.connect git_dump >>= fun git_dump ->
|
||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||
Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk ->
|
||||
if Key_gen.check () then
|
||||
Lwt.return_unit
|
||||
else
|
||||
begin
|
||||
restore_git git_dump git_ctx >>= function
|
||||
| Ok git_kv -> Lwt.return git_kv
|
||||
| Error () ->
|
||||
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
|
||||
dump_git git_dump git_kv >|= fun () ->
|
||||
git_kv
|
||||
end >>= fun git_kv ->
|
||||
Serve.commit_id git_kv >>= fun commit_id ->
|
||||
Logs.info (fun m -> m "git: %s" commit_id);
|
||||
Serve.create git_kv >>= fun serve ->
|
||||
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
|
||||
let update () =
|
||||
Serve.update_git serve git_kv >>= function
|
||||
| None | Some [] -> Lwt.return_unit
|
||||
| Some _changes ->
|
||||
dump_git git_dump git_kv >>= fun () ->
|
||||
download_archives disk http_ctx git_kv
|
||||
in
|
||||
let service =
|
||||
Paf.http_service
|
||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||
(Serve.dispatch serve disk (Key_gen.hook_url ()) update)
|
||||
in
|
||||
let `Initialized th = Paf.serve service t in
|
||||
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
||||
Lwt.async (fun () ->
|
||||
let rec go () =
|
||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||
update () >>= fun () ->
|
||||
go ()
|
||||
in
|
||||
go ());
|
||||
download_archives disk http_ctx git_kv >>= fun () ->
|
||||
(th >|= fun _v -> ())
|
||||
let url = Key_gen.remote () ^ "/index.tar.gz" in
|
||||
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
|
||||
let update () =
|
||||
Http_mirage_client.one_request
|
||||
~alpn_protocol:HTTP.alpn_protocol
|
||||
~authenticator:HTTP.authenticator
|
||||
~ctx:http_ctx url >>= function
|
||||
| Ok (resp, Some str) ->
|
||||
if resp.status = `OK then
|
||||
download_archives disk http_ctx str
|
||||
else begin
|
||||
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
||||
url H2.Status.pp_hum resp.status resp.reason);
|
||||
Lwt.return_unit
|
||||
end
|
||||
| _ -> Lwt.return_unit
|
||||
in
|
||||
let service =
|
||||
Paf.http_service
|
||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||
(Serve.dispatch disk (Key_gen.hook_url ()) update)
|
||||
in
|
||||
let `Initialized th = Paf.serve service t in
|
||||
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
||||
Lwt.async (fun () ->
|
||||
let rec go () =
|
||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||
update () >>= fun () ->
|
||||
go ()
|
||||
in
|
||||
go ());
|
||||
update () >>= fun () ->
|
||||
(th >|= fun _v -> ())
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue