WIP: Cache only version #25
2 changed files with 141 additions and 287 deletions
|
@ -18,11 +18,10 @@ let verify =
|
||||||
let remote =
|
let remote =
|
||||||
let doc =
|
let doc =
|
||||||
Key.Arg.info
|
Key.Arg.info
|
||||||
~doc:"Remote repository url, use suffix #foo to specify a branch 'foo': \
|
~doc:"Remote repository url"
|
||||||
https://github.com/ocaml/opam-repository.git"
|
|
||||||
["remote"]
|
["remote"]
|
||||||
in
|
in
|
||||||
Key.(create "remote" Arg.(opt string "https://github.com/ocaml/opam-repository.git#master" doc))
|
Key.(create "remote" Arg.(opt string "https://opam.ocaml.org" doc))
|
||||||
|
|
||||||
let parallel_downloads =
|
let parallel_downloads =
|
||||||
let doc =
|
let doc =
|
||||||
|
@ -35,7 +34,7 @@ let parallel_downloads =
|
||||||
let hook_url =
|
let hook_url =
|
||||||
let doc =
|
let doc =
|
||||||
Key.Arg.info
|
Key.Arg.info
|
||||||
~doc:"URL to conduct an update of the git repository" ["hook-url"]
|
~doc:"URL to conduct an update of the opam repository" ["hook-url"]
|
||||||
in
|
in
|
||||||
Key.(create "hook-url" Arg.(opt string "update" doc))
|
Key.(create "hook-url" Arg.(opt string "update" doc))
|
||||||
|
|
||||||
|
@ -55,29 +54,23 @@ let sectors_cache =
|
||||||
let doc = Key.Arg.info ~doc ["sectors-cache"] in
|
let doc = Key.Arg.info ~doc ["sectors-cache"] in
|
||||||
Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc))
|
Key.(create "sectors-cache" Arg.(opt int64 Int64.(mul 4L 2048L) doc))
|
||||||
|
|
||||||
let sectors_git =
|
|
||||||
let doc = "Number of sectors reserved for git dump." in
|
|
||||||
let doc = Key.Arg.info ~doc ["sectors-git"] in
|
|
||||||
Key.(create "sectors-git" Arg.(opt int64 Int64.(mul 40L (mul 2L 1024L)) doc))
|
|
||||||
|
|
||||||
let mirror =
|
let mirror =
|
||||||
foreign "Unikernel.Make"
|
foreign "Unikernel.Make"
|
||||||
~keys:[ Key.v check ; Key.v verify ; Key.v remote ;
|
~keys:[ Key.v check ; Key.v verify ; Key.v remote ;
|
||||||
Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ;
|
Key.v parallel_downloads ; Key.v hook_url ; Key.v tls_authenticator ;
|
||||||
Key.v port ; Key.v sectors_cache ; Key.v sectors_git ; ]
|
Key.v port ; Key.v sectors_cache ; ]
|
||||||
~packages:[
|
~packages:[
|
||||||
package ~min:"0.2.0" ~sublibs:[ "mirage" ] "paf" ;
|
package ~min:"0.2.0" ~sublibs:[ "mirage" ] "paf" ;
|
||||||
package "h2" ;
|
package "h2" ;
|
||||||
package "httpaf" ;
|
package "httpaf" ;
|
||||||
package ~pin:"git+https://git.robur.io/robur/git-kv.git#main" "git-kv" ;
|
|
||||||
package ~min:"3.7.0" "git-paf" ;
|
|
||||||
package "opam-file-format" ;
|
package "opam-file-format" ;
|
||||||
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" ;
|
||||||
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ;
|
package ~pin:"git+https://github.com/hannesm/ocaml-tar.git#kv-rw-kv-5" "tar-mirage" ;
|
||||||
package "mirage-block-partition" ;
|
package "mirage-block-partition" ;
|
||||||
package "oneffs" ;
|
package "oneffs" ;
|
||||||
|
package "hex" ;
|
||||||
]
|
]
|
||||||
(block @-> time @-> pclock @-> stackv4v6 @-> git_client @-> http_client @-> job)
|
(block @-> time @-> pclock @-> stackv4v6 @-> http_client @-> job)
|
||||||
|
|
||||||
let stack = generic_stackv4v6 default_network
|
let stack = generic_stackv4v6 default_network
|
||||||
|
|
||||||
|
@ -98,10 +91,9 @@ let http_client =
|
||||||
2 instances of happy-eyeballs will exists together which is not really good
|
2 instances of happy-eyeballs will exists together which is not really good
|
||||||
(it puts a pressure on the scheduler). *)
|
(it puts a pressure on the scheduler). *)
|
||||||
|
|
||||||
let git_client, http_client =
|
let http_client =
|
||||||
let happy_eyeballs = git_happy_eyeballs stack dns (generic_happy_eyeballs stack dns) in
|
let happy_eyeballs = generic_happy_eyeballs stack dns in
|
||||||
merge_git_clients (git_tcp tcp happy_eyeballs)
|
let happy_eyeballs = git_happy_eyeballs stack dns happy_eyeballs in
|
||||||
(git_http ~authenticator:tls_authenticator tcp happy_eyeballs),
|
|
||||||
http_client $ default_time $ default_posix_clock $ tcp $ happy_eyeballs
|
http_client $ default_time $ default_posix_clock $ tcp $ happy_eyeballs
|
||||||
|
|
||||||
let program_block_size =
|
let program_block_size =
|
||||||
|
@ -111,4 +103,4 @@ let program_block_size =
|
||||||
let block = block_of_file "tar"
|
let block = block_of_file "tar"
|
||||||
|
|
||||||
let () = register "mirror"
|
let () = register "mirror"
|
||||||
[ mirror $ block $ default_time $ default_posix_clock $ stack $ git_client $ http_client ]
|
[ mirror $ block $ default_time $ default_posix_clock $ stack $ http_client ]
|
||||||
|
|
|
@ -2,12 +2,13 @@ open Lwt.Infix
|
||||||
|
|
||||||
let argument_error = 64
|
let argument_error = 64
|
||||||
|
|
||||||
|
external reraise : exn -> 'a = "%reraise"
|
||||||
|
|
||||||
module Make
|
module Make
|
||||||
(BLOCK : Mirage_block.S)
|
(BLOCK : Mirage_block.S)
|
||||||
(Time : Mirage_time.S)
|
(Time : Mirage_time.S)
|
||||||
(Pclock : Mirage_clock.PCLOCK)
|
(Pclock : Mirage_clock.PCLOCK)
|
||||||
(Stack : Tcpip.Stack.V4V6)
|
(Stack : Tcpip.Stack.V4V6)
|
||||||
(_ : sig end)
|
|
||||||
(HTTP : Http_mirage_client.S) = struct
|
(HTTP : Http_mirage_client.S) = struct
|
||||||
|
|
||||||
module Part = Mirage_block_partition.Make(BLOCK)
|
module Part = Mirage_block_partition.Make(BLOCK)
|
||||||
|
@ -51,29 +52,6 @@ module Make
|
||||||
hm ""
|
hm ""
|
||||||
|
|
||||||
module Git = struct
|
module Git = struct
|
||||||
let find_contents store =
|
|
||||||
let rec go store path acc =
|
|
||||||
Git_kv.list store path >>= function
|
|
||||||
| Error e ->
|
|
||||||
Logs.err (fun m -> m "error %a while listing %a"
|
|
||||||
Git_kv.pp_error e Mirage_kv.Key.pp path);
|
|
||||||
Lwt.return acc
|
|
||||||
| Ok steps ->
|
|
||||||
Lwt_list.fold_left_s (fun acc (step, _) ->
|
|
||||||
let full_path = Mirage_kv.Key.add path step in
|
|
||||||
Git_kv.exists store full_path >>= function
|
|
||||||
| Error e ->
|
|
||||||
Logs.err (fun m -> m "error %a for exists %a" Git_kv.pp_error e
|
|
||||||
Mirage_kv.Key.pp full_path);
|
|
||||||
Lwt.return acc
|
|
||||||
| Ok None ->
|
|
||||||
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path);
|
|
||||||
Lwt.return acc
|
|
||||||
| Ok Some `Value -> Lwt.return (full_path :: acc)
|
|
||||||
| Ok Some `Dictionary -> go store full_path acc) acc steps
|
|
||||||
in
|
|
||||||
go store Mirage_kv.Key.empty []
|
|
||||||
|
|
||||||
let decode_digest filename str =
|
let decode_digest filename str =
|
||||||
let hex h s =
|
let hex h s =
|
||||||
match hex_of_string s with
|
match hex_of_string s with
|
||||||
|
@ -162,40 +140,94 @@ module Make
|
||||||
end
|
end
|
||||||
| _ -> Logs.debug (fun m -> m "no url section for %s" filename); None
|
| _ -> Logs.debug (fun m -> m "no url section for %s" filename); None
|
||||||
|
|
||||||
let find_urls store =
|
module Null_write = struct
|
||||||
find_contents store >>= fun paths ->
|
(* we have to provide a WRITER, but we don't actually want to write. *)
|
||||||
let opam_paths =
|
type out_channel = |
|
||||||
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths
|
type 'a t = 'a Lwt.t
|
||||||
|
let really_write o _ = match o with (_ : out_channel) -> .
|
||||||
|
end
|
||||||
|
module String_read = struct
|
||||||
|
type in_channel = {
|
||||||
|
data : string;
|
||||||
|
mutable offset : int;
|
||||||
|
}
|
||||||
|
type 'a t = 'a Lwt.t
|
||||||
|
let really_read t buf =
|
||||||
|
let buf_len = Cstruct.length buf in
|
||||||
|
if String.length t.data - t.offset < buf_len then
|
||||||
|
raise Stdlib.End_of_file;
|
||||||
|
Cstruct.blit_from_string t.data t.offset buf 0 buf_len;
|
||||||
|
t.offset <- t.offset + buf_len;
|
||||||
|
Lwt.pause ()
|
||||||
|
let read t buf =
|
||||||
|
let len = min (String.length t.data - t.offset) (Cstruct.length buf) in
|
||||||
|
Cstruct.blit_from_string t.data t.offset buf 0 len;
|
||||||
|
Lwt.pause () >>= fun () ->
|
||||||
|
t.offset <- t.offset + len;
|
||||||
|
Lwt.return len
|
||||||
|
let skip t n =
|
||||||
|
t.offset <- t.offset + n; Lwt.pause ()
|
||||||
|
end
|
||||||
|
module Index = Tar_gz.Make(Lwt)(Null_write)(String_read)
|
||||||
|
|
||||||
|
let find_urls index =
|
||||||
|
let ic =
|
||||||
|
Index.of_in_channel ~internal:(Cstruct.create 0x1000)
|
||||||
|
{ String_read.data = index ; offset = 0 ; }
|
||||||
in
|
in
|
||||||
Lwt_list.fold_left_s (fun acc path ->
|
let rec do_it acc =
|
||||||
Git_kv.get store path >|= function
|
Lwt.try_bind
|
||||||
| Ok data ->
|
(fun () -> Index.get_next_header ic)
|
||||||
(* TODO report parser errors *)
|
(fun hdr ->
|
||||||
(try
|
(* XXX: Int64.to_int *)
|
||||||
let url_csums = extract_urls (Mirage_kv.Key.to_string path) data in
|
let file_size = Int64.to_int hdr.file_size in
|
||||||
Option.fold ~none:acc ~some:(fun (url, csums) ->
|
Logs.debug (fun m -> m "%s: %Ld" hdr.file_name hdr.file_size);
|
||||||
if HM.cardinal csums = 0 then
|
if String.starts_with hdr.Tar.Header.file_name ~prefix:"/packages/" &&
|
||||||
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
|
String.ends_with hdr.Tar.Header.file_name ~suffix:"/opam" then begin
|
||||||
else
|
let buf = Cstruct.create file_size in
|
||||||
SM.update url (function
|
Index.really_read ic buf >>= fun () ->
|
||||||
| None -> Some csums
|
Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () ->
|
||||||
| Some csums' ->
|
let data = Cstruct.to_string buf in
|
||||||
if HM.for_all (fun h v ->
|
(* TODO report parser errors *)
|
||||||
match HM.find_opt h csums with
|
let acc =
|
||||||
| None -> true | Some v' -> String.equal v v')
|
try
|
||||||
csums'
|
let url_csums = extract_urls hdr.file_name data in
|
||||||
then
|
Option.fold ~none:acc ~some:(fun (url, csums) ->
|
||||||
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
if HM.cardinal csums = 0 then
|
||||||
else begin
|
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
|
||||||
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
|
else
|
||||||
url (hm_to_s csums') (hm_to_s csums));
|
SM.update url (function
|
||||||
None
|
| None -> Some csums
|
||||||
end) acc) url_csums
|
| Some csums' ->
|
||||||
with _ ->
|
if HM.for_all (fun h v ->
|
||||||
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
|
match HM.find_opt h csums with
|
||||||
acc)
|
| None -> true | Some v' -> String.equal v v')
|
||||||
| Error e -> Logs.warn (fun m -> m "Git_kv.get: %a" Git_kv.pp_error e); acc)
|
csums'
|
||||||
SM.empty opam_paths
|
then
|
||||||
|
Some (HM.union (fun _h v _v' -> Some v) csums csums')
|
||||||
|
else begin
|
||||||
|
Logs.warn (fun m -> m "mismatching hashes for %s: %s vs %s"
|
||||||
|
url (hm_to_s csums') (hm_to_s csums));
|
||||||
|
None
|
||||||
|
end) acc) url_csums
|
||||||
|
with _ ->
|
||||||
|
Logs.warn (fun m -> m "some error in %s, ignoring" hdr.file_name);
|
||||||
|
acc
|
||||||
|
in
|
||||||
|
(do_it [@tailcall]) acc
|
||||||
|
end else begin
|
||||||
|
Logs.debug (fun m -> m "skipping %d bytes" file_size);
|
||||||
|
Index.skip ic file_size >>= fun () ->
|
||||||
|
Index.skip ic (Tar.Header.compute_zero_padding_length hdr) >>= fun () ->
|
||||||
|
(do_it [@tailcall]) acc
|
||||||
|
end
|
||||||
|
)
|
||||||
|
(function
|
||||||
|
| Tar.Header.End_of_stream ->
|
||||||
|
Lwt.return acc
|
||||||
|
| e -> reraise e)
|
||||||
|
in
|
||||||
|
do_it SM.empty
|
||||||
end
|
end
|
||||||
|
|
||||||
module Disk = struct
|
module Disk = struct
|
||||||
|
@ -452,65 +484,6 @@ module Make
|
||||||
Error `Not_found
|
Error `Not_found
|
||||||
end
|
end
|
||||||
|
|
||||||
module Tarball = struct
|
|
||||||
module Async = struct
|
|
||||||
type 'a t = 'a
|
|
||||||
let ( >>= ) x f = f x
|
|
||||||
let return x = x
|
|
||||||
end
|
|
||||||
|
|
||||||
module Writer = struct
|
|
||||||
type out_channel = Buffer.t
|
|
||||||
type 'a t = 'a
|
|
||||||
let really_write buf data =
|
|
||||||
Buffer.add_string buf (Cstruct.to_string data)
|
|
||||||
end
|
|
||||||
|
|
||||||
(* That's not very interesting here, we just ignore everything*)
|
|
||||||
module Reader = struct
|
|
||||||
type in_channel = unit
|
|
||||||
type 'a t = 'a
|
|
||||||
let really_read _in _data = ()
|
|
||||||
let skip _in _len = ()
|
|
||||||
let read _in _data = 0
|
|
||||||
end
|
|
||||||
|
|
||||||
module Tar_Gz = Tar_gz.Make (Async)(Writer)(Reader)
|
|
||||||
|
|
||||||
let of_git repo store =
|
|
||||||
let out_channel = Buffer.create 1024 in
|
|
||||||
let now = Ptime.v (Pclock.now_d_ps ()) in
|
|
||||||
let mtime = Option.value ~default:0 Ptime.(Span.to_int_s (to_span now)) in
|
|
||||||
let gz_out =
|
|
||||||
Tar_Gz.of_out_channel ~level:4 ~mtime:(Int32.of_int mtime)
|
|
||||||
Gz.Unix out_channel
|
|
||||||
in
|
|
||||||
Git.find_contents store >>= fun paths ->
|
|
||||||
Lwt_list.iter_s (fun path ->
|
|
||||||
Git_kv.get store path >|= function
|
|
||||||
| Ok data ->
|
|
||||||
let data =
|
|
||||||
if Mirage_kv.Key.(equal path (v "repo")) then repo else data
|
|
||||||
in
|
|
||||||
let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *)
|
|
||||||
and mod_time = Int64.of_int mtime
|
|
||||||
and user_id = 0
|
|
||||||
and group_id = 0
|
|
||||||
and size = String.length data
|
|
||||||
in
|
|
||||||
let hdr =
|
|
||||||
Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
|
|
||||||
(Mirage_kv.Key.to_string path) (Int64.of_int size)
|
|
||||||
in
|
|
||||||
let o = ref false in
|
|
||||||
let stream () = if !o then None else (o := true; Some data) in
|
|
||||||
Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream
|
|
||||||
| Error e -> Logs.warn (fun m -> m "Git_kv error: %a" Git_kv.pp_error e))
|
|
||||||
paths >|= fun () ->
|
|
||||||
Tar_Gz.write_end gz_out;
|
|
||||||
Buffer.contents out_channel
|
|
||||||
end
|
|
||||||
|
|
||||||
module Serve = struct
|
module Serve = struct
|
||||||
let ptime_to_http_date ptime =
|
let ptime_to_http_date ptime =
|
||||||
let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime
|
let (y, m, d), ((hh, mm, ss), _) = Ptime.to_date_time ptime
|
||||||
|
@ -524,62 +497,8 @@ module Make
|
||||||
let m' = Array.get month (pred m) in
|
let m' = Array.get month (pred m) in
|
||||||
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
|
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
|
||||||
|
|
||||||
let commit_id git_kv =
|
|
||||||
Git_kv.digest git_kv Mirage_kv.Key.empty >|= fun r ->
|
|
||||||
Result.get_ok r
|
|
||||||
|
|
||||||
let repo commit =
|
|
||||||
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
|
|
||||||
Fmt.str
|
|
||||||
{|opam-version: "2.0"
|
|
||||||
upstream: "%s#%s"
|
|
||||||
archive-mirrors: "cache"
|
|
||||||
stamp: %S
|
|
||||||
|} upstream commit commit
|
|
||||||
|
|
||||||
let modified git_kv =
|
|
||||||
Git_kv.last_modified git_kv Mirage_kv.Key.empty >|= fun r ->
|
|
||||||
let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in
|
|
||||||
ptime_to_http_date (Ptime.v v)
|
|
||||||
|
|
||||||
type t = {
|
|
||||||
mutable commit_id : string ;
|
|
||||||
mutable modified : string ;
|
|
||||||
mutable repo : string ;
|
|
||||||
mutable index : string ;
|
|
||||||
}
|
|
||||||
|
|
||||||
let create git_kv =
|
|
||||||
commit_id git_kv >>= fun commit_id ->
|
|
||||||
modified git_kv >>= fun modified ->
|
|
||||||
let repo = repo commit_id in
|
|
||||||
Tarball.of_git repo git_kv >|= fun index ->
|
|
||||||
{ commit_id ; modified ; repo ; index }
|
|
||||||
|
|
||||||
let update_lock = Lwt_mutex.create ()
|
let update_lock = Lwt_mutex.create ()
|
||||||
|
|
||||||
let update_git t git_kv =
|
|
||||||
Lwt_mutex.with_lock update_lock (fun () ->
|
|
||||||
Logs.info (fun m -> m "pulling the git repository");
|
|
||||||
Git_kv.pull git_kv >>= function
|
|
||||||
| Error `Msg msg ->
|
|
||||||
Logs.err (fun m -> m "error %s while updating git" msg);
|
|
||||||
Lwt.return None
|
|
||||||
| Ok [] ->
|
|
||||||
Logs.info (fun m -> m "git changes are empty");
|
|
||||||
Lwt.return (Some [])
|
|
||||||
| Ok changes ->
|
|
||||||
commit_id git_kv >>= fun commit_id ->
|
|
||||||
modified git_kv >>= fun modified ->
|
|
||||||
Logs.info (fun m -> m "git: %s" commit_id);
|
|
||||||
let repo = repo commit_id in
|
|
||||||
Tarball.of_git repo git_kv >|= fun index ->
|
|
||||||
t.commit_id <- commit_id ;
|
|
||||||
t.modified <- modified ;
|
|
||||||
t.repo <- repo ;
|
|
||||||
t.index <- index;
|
|
||||||
Some changes)
|
|
||||||
|
|
||||||
let not_modified request (modified, etag) =
|
let not_modified request (modified, etag) =
|
||||||
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
||||||
| Some ts -> String.equal ts modified
|
| Some ts -> String.equal ts modified
|
||||||
|
@ -615,7 +534,7 @@ stamp: %S
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
respond_with_empty reqd resp
|
respond_with_empty reqd resp
|
||||||
else *)
|
else *)
|
||||||
let dispatch t store hook_url update _flow _conn reqd =
|
let dispatch store hook_url update _flow _conn reqd =
|
||||||
let request = Httpaf.Reqd.request reqd in
|
let request = Httpaf.Reqd.request reqd in
|
||||||
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
||||||
match String.split_on_char '/' request.Httpaf.Request.target with
|
match String.split_on_char '/' request.Httpaf.Request.target with
|
||||||
|
@ -625,46 +544,11 @@ stamp: %S
|
||||||
let mime_type = "text/plain" in
|
let mime_type = "text/plain" in
|
||||||
let headers = [
|
let headers = [
|
||||||
"content-type", mime_type ;
|
"content-type", mime_type ;
|
||||||
"etag", t.commit_id ;
|
|
||||||
"last-modified", t.modified ;
|
|
||||||
"content-length", string_of_int (String.length data) ;
|
"content-length", string_of_int (String.length data) ;
|
||||||
] in
|
] in
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
let headers = Httpaf.Headers.of_list headers in
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
let resp = Httpaf.Response.create ~headers `OK in
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data
|
Httpaf.Reqd.respond_with_string reqd resp data
|
||||||
| [ ""; "repo" ] ->
|
|
||||||
if not_modified request (t.modified, t.commit_id) then
|
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
|
||||||
respond_with_empty reqd resp
|
|
||||||
else
|
|
||||||
let data = t.repo in
|
|
||||||
let mime_type = "text/plain" in
|
|
||||||
let headers = [
|
|
||||||
"content-type", mime_type ;
|
|
||||||
"etag", t.commit_id ;
|
|
||||||
"last-modified", t.modified ;
|
|
||||||
"content-length", string_of_int (String.length data) ;
|
|
||||||
] in
|
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data
|
|
||||||
| [ ""; "index.tar.gz" ] ->
|
|
||||||
(* deliver prepared tarball *)
|
|
||||||
if not_modified request (t.modified, t.commit_id) then
|
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
|
||||||
respond_with_empty reqd resp
|
|
||||||
else
|
|
||||||
let data = t.index in
|
|
||||||
let mime_type = "application/octet-stream" in
|
|
||||||
let headers = [
|
|
||||||
"content-type", mime_type ;
|
|
||||||
"etag", t.commit_id ;
|
|
||||||
"last-modified", t.modified ;
|
|
||||||
"content-length", string_of_int (String.length data) ;
|
|
||||||
] in
|
|
||||||
let headers = Httpaf.Headers.of_list headers in
|
|
||||||
let resp = Httpaf.Response.create ~headers `OK in
|
|
||||||
Httpaf.Reqd.respond_with_string reqd resp data
|
|
||||||
| "" :: "cache" :: hash_algo :: _ :: hash :: [] ->
|
| "" :: "cache" :: hash_algo :: _ :: hash :: [] ->
|
||||||
(* `<hash-algo>/<first-2-hash-characters>/<hash>` *)
|
(* `<hash-algo>/<first-2-hash-characters>/<hash>` *)
|
||||||
begin
|
begin
|
||||||
|
@ -677,7 +561,8 @@ stamp: %S
|
||||||
(Disk.last_modified store h hash >|= function
|
(Disk.last_modified store h hash >|= function
|
||||||
| Error _ ->
|
| Error _ ->
|
||||||
Logs.warn (fun m -> m "error retrieving last modified");
|
Logs.warn (fun m -> m "error retrieving last modified");
|
||||||
t.modified
|
(* XXX *)
|
||||||
|
ptime_to_http_date (Ptime.v (Pclock.now_d_ps ()))
|
||||||
| Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified ->
|
| Ok v -> ptime_to_http_date (Ptime.v v)) >>= fun last_modified ->
|
||||||
if not_modified request (last_modified, hash) then
|
if not_modified request (last_modified, hash) then
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
|
@ -754,82 +639,59 @@ stamp: %S
|
||||||
Disk.update_caches disk >|= fun () ->
|
Disk.update_caches disk >|= fun () ->
|
||||||
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
Logs.info (fun m -> m "downloading of %d urls done" (SM.cardinal urls))
|
||||||
|
|
||||||
let dump_git git_dump git_kv =
|
|
||||||
Git_kv.to_octets git_kv >>= fun data ->
|
|
||||||
Cache.write git_dump data >|= function
|
|
||||||
| Ok () ->
|
|
||||||
Logs.info (fun m -> m "dumped git %d bytes" (String.length data))
|
|
||||||
| Error e ->
|
|
||||||
Logs.warn (fun m -> m "failed to dump git: %a" Cache.pp_write_error e)
|
|
||||||
|
|
||||||
let restore_git git_dump git_ctx =
|
|
||||||
Cache.read git_dump >>= function
|
|
||||||
| Ok None -> Lwt.return (Error ())
|
|
||||||
| Error e ->
|
|
||||||
Logs.warn (fun m -> m "failed to read git state: %a" Cache.pp_error e);
|
|
||||||
Lwt.return (Error ())
|
|
||||||
| Ok Some data ->
|
|
||||||
Git_kv.of_octets git_ctx ~remote:(Key_gen.remote ()) data >|= function
|
|
||||||
| Ok git_kv -> Ok git_kv
|
|
||||||
| Error `Msg msg ->
|
|
||||||
Logs.err (fun m -> m "error restoring git state: %s" msg);
|
|
||||||
Error ()
|
|
||||||
|
|
||||||
module Paf = Paf_mirage.Make(Stack.TCP)
|
module Paf = Paf_mirage.Make(Stack.TCP)
|
||||||
|
|
||||||
let start block _time _pclock stack git_ctx http_ctx =
|
let start block _time _pclock stack http_ctx =
|
||||||
BLOCK.get_info block >>= fun info ->
|
BLOCK.get_info block >>= fun info ->
|
||||||
let sectors_cache = Key_gen.sectors_cache () in
|
let sectors_cache = Key_gen.sectors_cache () in
|
||||||
let sectors_git = Key_gen.sectors_git () in
|
let sectors =
|
||||||
let git_start =
|
|
||||||
let cache_size = Int64.(mul 2L sectors_cache) in
|
let cache_size = Int64.(mul 2L sectors_cache) in
|
||||||
Int64.(sub info.size_sectors (add cache_size sectors_git))
|
Int64.(sub info.size_sectors cache_size)
|
||||||
in
|
in
|
||||||
Part.connect git_start block >>= fun (kv, rest) ->
|
Part.connect sectors block >>= fun (kv, rest) ->
|
||||||
let git_dump, rest = Part.subpartition sectors_git rest in
|
|
||||||
let md5s, sha512s = Part.subpartition sectors_cache rest in
|
let md5s, sha512s = Part.subpartition sectors_cache rest in
|
||||||
KV.connect kv >>= fun kv ->
|
KV.connect kv >>= fun kv ->
|
||||||
Cache.connect md5s >>= fun md5s ->
|
Cache.connect md5s >>= fun md5s ->
|
||||||
Cache.connect sha512s >>= fun sha512s ->
|
Cache.connect sha512s >>= fun sha512s ->
|
||||||
Cache.connect git_dump >>= fun git_dump ->
|
|
||||||
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk ->
|
Disk.init ~verify:(Key_gen.verify ()) kv md5s sha512s >>= fun disk ->
|
||||||
if Key_gen.check () then
|
if Key_gen.check () then
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
else
|
else
|
||||||
begin
|
begin
|
||||||
restore_git git_dump git_ctx >>= function
|
let url = Key_gen.remote () ^ "/index.tar.gz" in
|
||||||
| Ok git_kv -> Lwt.return git_kv
|
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
|
||||||
| Error () ->
|
let update () =
|
||||||
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
|
Http_mirage_client.one_request
|
||||||
dump_git git_dump git_kv >|= fun () ->
|
~alpn_protocol:HTTP.alpn_protocol
|
||||||
git_kv
|
~authenticator:HTTP.authenticator
|
||||||
end >>= fun git_kv ->
|
~ctx:http_ctx url >>= function
|
||||||
Serve.commit_id git_kv >>= fun commit_id ->
|
| Ok (resp, Some str) ->
|
||||||
Logs.info (fun m -> m "git: %s" commit_id);
|
if resp.status = `OK then
|
||||||
Serve.create git_kv >>= fun serve ->
|
download_archives disk http_ctx str
|
||||||
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
|
else begin
|
||||||
let update () =
|
Logs.warn (fun m -> m "%s: %a (reason %s)"
|
||||||
Serve.update_git serve git_kv >>= function
|
url H2.Status.pp_hum resp.status resp.reason);
|
||||||
| None | Some [] -> Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Some _changes ->
|
end
|
||||||
dump_git git_dump git_kv >>= fun () ->
|
| _ -> Lwt.return_unit
|
||||||
download_archives disk http_ctx git_kv
|
in
|
||||||
in
|
let service =
|
||||||
let service =
|
Paf.http_service
|
||||||
Paf.http_service
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
(Serve.dispatch disk (Key_gen.hook_url ()) update)
|
||||||
(Serve.dispatch serve disk (Key_gen.hook_url ()) update)
|
in
|
||||||
in
|
let `Initialized th = Paf.serve service t in
|
||||||
let `Initialized th = Paf.serve service t in
|
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
Lwt.async (fun () ->
|
||||||
Lwt.async (fun () ->
|
let rec go () =
|
||||||
let rec go () =
|
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
||||||
Time.sleep_ns (Duration.of_hour 1) >>= fun () ->
|
update () >>= fun () ->
|
||||||
update () >>= fun () ->
|
go ()
|
||||||
go ()
|
in
|
||||||
in
|
go ());
|
||||||
go ());
|
update () >>= fun () ->
|
||||||
download_archives disk http_ctx git_kv >>= fun () ->
|
(th >|= fun _v -> ())
|
||||||
(th >|= fun _v -> ())
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue