use git_kv
Co-Authored-By: Hannes Mehnert <hannes@mehnert.org>
This commit is contained in:
parent
9b7e9e5485
commit
44d737887a
2 changed files with 69 additions and 102 deletions
|
@ -53,7 +53,7 @@ let mirror =
|
||||||
package ~min:"0.1.0" ~sublibs:[ "mirage" ] "paf" ;
|
package ~min:"0.1.0" ~sublibs:[ "mirage" ] "paf" ;
|
||||||
package "h2" ;
|
package "h2" ;
|
||||||
package "httpaf" ;
|
package "httpaf" ;
|
||||||
package ~min:"3.0.0" "irmin-mirage-git" ;
|
package ~pin:"git+https://git.robur.io/robur/git-kv.git#main" "git-kv" ;
|
||||||
package ~min:"3.7.0" "git-paf" ;
|
package ~min:"3.7.0" "git-paf" ;
|
||||||
package "opam-file-format" ;
|
package "opam-file-format" ;
|
||||||
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
package ~min:"2.1.0" ~sublibs:[ "gz" ] "tar" ;
|
||||||
|
|
|
@ -12,9 +12,6 @@ module Make
|
||||||
|
|
||||||
module KV = Tar_mirage.Make_KV_RW(BLOCK)
|
module KV = Tar_mirage.Make_KV_RW(BLOCK)
|
||||||
|
|
||||||
module Store = Irmin_mirage_git.Mem.KV.Make(Irmin.Contents.String)
|
|
||||||
module Sync = Irmin.Sync.Make(Store)
|
|
||||||
|
|
||||||
module SM = Map.Make(String)
|
module SM = Map.Make(String)
|
||||||
|
|
||||||
module HM = Map.Make(struct
|
module HM = Map.Make(struct
|
||||||
|
@ -22,8 +19,6 @@ module Make
|
||||||
let compare = compare (* TODO remove polymorphic compare *)
|
let compare = compare (* TODO remove polymorphic compare *)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
module Git_commit = Git.Commit.Make(Store.Git.Hash)
|
|
||||||
|
|
||||||
let hash_to_string = function
|
let hash_to_string = function
|
||||||
| `MD5 -> "md5"
|
| `MD5 -> "md5"
|
||||||
| `SHA1 -> "sha1"
|
| `SHA1 -> "sha1"
|
||||||
|
@ -53,48 +48,28 @@ module Make
|
||||||
hm ""
|
hm ""
|
||||||
|
|
||||||
module Git = struct
|
module Git = struct
|
||||||
let decompose_git_url () =
|
|
||||||
match String.split_on_char '#' (Key_gen.remote ()) with
|
|
||||||
| [ url ] -> url, None
|
|
||||||
| [ url ; branch ] -> url, Some branch
|
|
||||||
| _ ->
|
|
||||||
Logs.err (fun m -> m "expected at most a single # in remote");
|
|
||||||
exit argument_error
|
|
||||||
|
|
||||||
let connect ctx =
|
|
||||||
let uri, branch = decompose_git_url () in
|
|
||||||
let config = Irmin_mem.config () in
|
|
||||||
Store.Repo.v config >>= fun r ->
|
|
||||||
(match branch with
|
|
||||||
| None -> Store.main r
|
|
||||||
| Some branch -> Store.of_branch r branch) >|= fun repo ->
|
|
||||||
Logs.info (fun m -> m "connected to %s (branch %s)"
|
|
||||||
uri (Option.value ~default:"main" branch));
|
|
||||||
repo, Store.remote ~ctx uri
|
|
||||||
|
|
||||||
let pull store upstream =
|
|
||||||
Logs.info (fun m -> m "pulling from remote!");
|
|
||||||
Sync.pull ~depth:1 store upstream `Set >|= fun r ->
|
|
||||||
match r with
|
|
||||||
| Ok (`Head c as s) -> Ok (c, Fmt.str "pulled %a" Sync.pp_status s)
|
|
||||||
| Ok `Empty -> Error (`Msg "pulled empty repository")
|
|
||||||
| Error (`Msg e) -> Error (`Msg ("pull error " ^ e))
|
|
||||||
| Error (`Conflict msg) -> Error (`Msg ("pull conflict " ^ msg))
|
|
||||||
|
|
||||||
let find_contents store =
|
let find_contents store =
|
||||||
let rec go store path acc =
|
let rec go store path acc =
|
||||||
Store.list store path >>= fun steps ->
|
Git_kv.list store path >>= function
|
||||||
Lwt_list.fold_left_s (fun acc (step, _) ->
|
| Error e ->
|
||||||
let full_path = path @ [ step ] in
|
Logs.err (fun m -> m "error %a while listing %a"
|
||||||
let str = String.concat "/" full_path in
|
Git_kv.pp_error e Mirage_kv.Key.pp path);
|
||||||
Store.kind store full_path >>= function
|
Lwt.return acc
|
||||||
| None ->
|
| Ok steps ->
|
||||||
Logs.warn (fun m -> m "no kind for %s" str);
|
Lwt_list.fold_left_s (fun acc (step, _) ->
|
||||||
Lwt.return acc
|
let full_path = Mirage_kv.Key.add path step in
|
||||||
| Some `Contents -> Lwt.return (full_path :: acc)
|
Git_kv.exists store full_path >>= function
|
||||||
| Some `Node -> go store full_path acc) acc steps
|
| Error e ->
|
||||||
|
Logs.err (fun m -> m "error %a for exists %a" Git_kv.pp_error e
|
||||||
|
Mirage_kv.Key.pp full_path);
|
||||||
|
Lwt.return acc
|
||||||
|
| Ok None ->
|
||||||
|
Logs.warn (fun m -> m "no typ for %a" Mirage_kv.Key.pp full_path);
|
||||||
|
Lwt.return acc
|
||||||
|
| Ok Some `Value -> Lwt.return (full_path :: acc)
|
||||||
|
| Ok Some `Dictionary -> go store full_path acc) acc steps
|
||||||
in
|
in
|
||||||
go store [] []
|
go store Mirage_kv.Key.empty []
|
||||||
|
|
||||||
let decode_digest filename str =
|
let decode_digest filename str =
|
||||||
let hex h s =
|
let hex h s =
|
||||||
|
@ -187,16 +162,14 @@ module Make
|
||||||
let find_urls store =
|
let find_urls store =
|
||||||
find_contents store >>= fun paths ->
|
find_contents store >>= fun paths ->
|
||||||
let opam_paths =
|
let opam_paths =
|
||||||
List.filter (fun p -> match List.rev p with
|
List.filter (fun p -> Mirage_kv.Key.basename p = "opam") paths
|
||||||
| "opam" :: _ -> true | _ -> false)
|
|
||||||
paths
|
|
||||||
in
|
in
|
||||||
Lwt_list.fold_left_s (fun acc path ->
|
Lwt_list.fold_left_s (fun acc path ->
|
||||||
Store.find store path >|= function
|
Git_kv.get store path >|= function
|
||||||
| Some data ->
|
| Ok data ->
|
||||||
(* TODO report parser errors *)
|
(* TODO report parser errors *)
|
||||||
(try
|
(try
|
||||||
let url_csums = extract_urls (String.concat "/" path) data in
|
let url_csums = extract_urls (Mirage_kv.Key.to_string path) data in
|
||||||
Option.fold ~none:acc ~some:(fun (url, csums) ->
|
Option.fold ~none:acc ~some:(fun (url, csums) ->
|
||||||
if HM.cardinal csums = 0 then
|
if HM.cardinal csums = 0 then
|
||||||
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
|
(Logs.warn (fun m -> m "no checksums for %s, ignoring" url); acc)
|
||||||
|
@ -216,9 +189,9 @@ module Make
|
||||||
None
|
None
|
||||||
end) acc) url_csums
|
end) acc) url_csums
|
||||||
with _ ->
|
with _ ->
|
||||||
Logs.warn (fun m -> m "some error in %s, ignoring" (String.concat "/" path));
|
Logs.warn (fun m -> m "some error in %a, ignoring" Mirage_kv.Key.pp path);
|
||||||
acc)
|
acc)
|
||||||
| None -> acc)
|
| Error e -> Logs.warn (fun m -> m "Git_kv.get: %a" Git_kv.pp_error e); acc)
|
||||||
SM.empty opam_paths
|
SM.empty opam_paths
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -231,7 +204,7 @@ module Make
|
||||||
|
|
||||||
let empty dev = { md5s = SM.empty ; sha512s = SM.empty ; dev }
|
let empty dev = { md5s = SM.empty ; sha512s = SM.empty ; dev }
|
||||||
|
|
||||||
let key t d =
|
let key _t d =
|
||||||
let d = Cstruct.to_string d in
|
let d = Cstruct.to_string d in
|
||||||
hex_to_string d
|
hex_to_string d
|
||||||
|
|
||||||
|
@ -399,10 +372,10 @@ module Make
|
||||||
in
|
in
|
||||||
Git.find_contents store >>= fun paths ->
|
Git.find_contents store >>= fun paths ->
|
||||||
Lwt_list.iter_s (fun path ->
|
Lwt_list.iter_s (fun path ->
|
||||||
Store.find store path >|= function
|
Git_kv.get store path >|= function
|
||||||
| Some data ->
|
| Ok data ->
|
||||||
let data =
|
let data =
|
||||||
if path = [ "repo" ] then repo else data
|
if Mirage_kv.Key.(equal path (v "repo")) then repo else data
|
||||||
in
|
in
|
||||||
let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *)
|
let file_mode = 0o644 (* would be great to retrieve the actual one - but not needed (since opam-repository doesn't use it anyways)! *)
|
||||||
and mod_time = Int64.of_int mtime
|
and mod_time = Int64.of_int mtime
|
||||||
|
@ -412,12 +385,12 @@ module Make
|
||||||
in
|
in
|
||||||
let hdr =
|
let hdr =
|
||||||
Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
|
Tar.Header.make ~file_mode ~mod_time ~user_id ~group_id
|
||||||
(String.concat "/" path) (Int64.of_int size)
|
(Mirage_kv.Key.to_string path) (Int64.of_int size)
|
||||||
in
|
in
|
||||||
let o = ref false in
|
let o = ref false in
|
||||||
let stream () = if !o then None else (o := true; Some data) in
|
let stream () = if !o then None else (o := true; Some data) in
|
||||||
Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream
|
Tar_Gz.write_block ~level:Tar.Header.Ustar hdr gz_out stream
|
||||||
| None -> ())
|
| Error e -> Logs.warn (fun m -> m "Git_kv error: %a" Git_kv.pp_error e))
|
||||||
paths >|= fun () ->
|
paths >|= fun () ->
|
||||||
Tar_Gz.write_end gz_out;
|
Tar_Gz.write_end gz_out;
|
||||||
Buffer.contents out_channel
|
Buffer.contents out_channel
|
||||||
|
@ -436,8 +409,9 @@ module Make
|
||||||
let m' = Array.get month (pred m) in
|
let m' = Array.get month (pred m) in
|
||||||
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
|
Printf.sprintf "%s, %02d %s %04d %02d:%02d:%02d GMT" weekday d m' y hh mm ss
|
||||||
|
|
||||||
let commit_id commit =
|
let commit_id git_kv =
|
||||||
Fmt.to_to_string (Irmin.Type.pp Store.Hash.t) (Store.Commit.hash commit)
|
Git_kv.digest git_kv Mirage_kv.Key.empty >|= fun r ->
|
||||||
|
Result.get_ok r
|
||||||
|
|
||||||
let repo commit =
|
let repo commit =
|
||||||
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
|
let upstream = List.hd (String.split_on_char '#' (Key_gen.remote ())) in
|
||||||
|
@ -448,13 +422,10 @@ archive-mirrors: "cache"
|
||||||
stamp: %S
|
stamp: %S
|
||||||
|} upstream commit commit
|
|} upstream commit commit
|
||||||
|
|
||||||
let modified commit =
|
let modified git_kv =
|
||||||
let info = Store.Commit.info commit in
|
Git_kv.last_modified git_kv Mirage_kv.Key.empty >|= fun r ->
|
||||||
let ptime =
|
let v = Result.fold ~ok:Fun.id ~error:(fun _ -> Pclock.now_d_ps ()) r in
|
||||||
Option.value ~default:(Ptime.v (Pclock.now_d_ps ()))
|
ptime_to_http_date (Ptime.v v)
|
||||||
(Ptime.of_float_s (Int64.to_float (Store.Info.date info)))
|
|
||||||
in
|
|
||||||
ptime_to_http_date ptime
|
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
mutable commit_id : string ;
|
mutable commit_id : string ;
|
||||||
|
@ -463,35 +434,32 @@ stamp: %S
|
||||||
mutable index : string ;
|
mutable index : string ;
|
||||||
}
|
}
|
||||||
|
|
||||||
let create commit store =
|
let create git_kv =
|
||||||
let commit_id = commit_id commit
|
commit_id git_kv >>= fun commit_id ->
|
||||||
and modified = modified commit
|
modified git_kv >>= fun modified ->
|
||||||
in
|
|
||||||
let repo = repo commit_id in
|
let repo = repo commit_id in
|
||||||
Tarball.of_git repo store >|= fun index ->
|
Tarball.of_git repo git_kv >|= fun index ->
|
||||||
{ commit_id ; modified ; repo ; index }
|
{ commit_id ; modified ; repo ; index }
|
||||||
|
|
||||||
let update_lock = Lwt_mutex.create ()
|
let update_lock = Lwt_mutex.create ()
|
||||||
|
|
||||||
let update_git t git_ctx =
|
let update_git t git_kv =
|
||||||
Lwt_mutex.with_lock update_lock (fun () ->
|
Lwt_mutex.with_lock update_lock (fun () ->
|
||||||
Git.connect git_ctx >>= fun (store, upstream) ->
|
Git_kv.pull git_kv >>= function
|
||||||
Git.pull store upstream >>= function
|
|
||||||
| Error `Msg msg ->
|
| Error `Msg msg ->
|
||||||
Logs.err (fun m -> m "error %s while updating git" msg);
|
Logs.err (fun m -> m "error %s while updating git" msg);
|
||||||
Lwt.return None
|
Lwt.return None
|
||||||
| Ok (commit, msg) ->
|
| Ok changes ->
|
||||||
Logs.info (fun m -> m "git: %s" msg);
|
commit_id git_kv >>= fun commit_id ->
|
||||||
let commit_id = commit_id commit
|
modified git_kv >>= fun modified ->
|
||||||
and modified = modified commit
|
Logs.info (fun m -> m "git: %s" commit_id);
|
||||||
in
|
|
||||||
let repo = repo commit_id in
|
let repo = repo commit_id in
|
||||||
Tarball.of_git repo store >|= fun index ->
|
Tarball.of_git repo git_kv >|= fun index ->
|
||||||
t.commit_id <- commit_id ;
|
t.commit_id <- commit_id ;
|
||||||
t.modified <- modified ;
|
t.modified <- modified ;
|
||||||
t.repo <- repo ;
|
t.repo <- repo ;
|
||||||
t.index <- index;
|
t.index <- index;
|
||||||
Some store)
|
Some changes)
|
||||||
|
|
||||||
let not_modified request (modified, etag) =
|
let not_modified request (modified, etag) =
|
||||||
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
match Httpaf.Headers.get request.Httpaf.Request.headers "if-modified-since" with
|
||||||
|
@ -528,13 +496,13 @@ stamp: %S
|
||||||
let resp = Httpaf.Response.create `Not_modified in
|
let resp = Httpaf.Response.create `Not_modified in
|
||||||
respond_with_empty reqd resp
|
respond_with_empty reqd resp
|
||||||
else *)
|
else *)
|
||||||
let dispatch t store hook_url git_ctx update _flow _conn reqd =
|
let dispatch t store hook_url git_kv update _flow _conn reqd =
|
||||||
let request = Httpaf.Reqd.request reqd in
|
let request = Httpaf.Reqd.request reqd in
|
||||||
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
Logs.info (fun f -> f "requested %s" request.Httpaf.Request.target);
|
||||||
match String.split_on_char '/' request.Httpaf.Request.target with
|
match String.split_on_char '/' request.Httpaf.Request.target with
|
||||||
| [ ""; x ] when String.equal x hook_url ->
|
| [ ""; x ] when String.equal x hook_url ->
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
update_git t git_ctx >>= function
|
update_git t git_kv >>= function
|
||||||
| None -> Lwt.return_unit
|
| None -> Lwt.return_unit
|
||||||
| Some store -> update store);
|
| Some store -> update store);
|
||||||
let data = "Update in progress" in
|
let data = "Update in progress" in
|
||||||
|
@ -667,24 +635,23 @@ stamp: %S
|
||||||
|
|
||||||
let start block _time _pclock stack git_ctx http_ctx =
|
let start block _time _pclock stack git_ctx http_ctx =
|
||||||
KV.connect block >>= fun kv ->
|
KV.connect block >>= fun kv ->
|
||||||
|
Logs.info (fun m -> m "Available bytes in tar storage: %Ld" (KV.free kv));
|
||||||
Disk.init kv >>= fun disk ->
|
Disk.init kv >>= fun disk ->
|
||||||
if Key_gen.check () then Lwt.return_unit
|
if Key_gen.check () then Lwt.return_unit
|
||||||
else
|
else
|
||||||
Git.connect git_ctx >>= fun (store, upstream) ->
|
Git_kv.connect git_ctx (Key_gen.remote ()) >>= fun git_kv ->
|
||||||
Git.pull store upstream >>= function
|
Serve.commit_id git_kv >>= fun commit_id ->
|
||||||
| Error `Msg msg -> Lwt.fail_with msg
|
Logs.info (fun m -> m "git: %s" commit_id);
|
||||||
| Ok (commit, msg) ->
|
Serve.create git_kv >>= fun serve ->
|
||||||
Logs.info (fun m -> m "git: %s" msg);
|
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
|
||||||
Serve.create commit store >>= fun serve ->
|
let update _changes = download_archives disk http_ctx git_kv in
|
||||||
Paf.init ~port:(Key_gen.port ()) (Stack.tcp stack) >>= fun t ->
|
let service =
|
||||||
let update store = download_archives disk http_ctx store in
|
Paf.http_service
|
||||||
let service =
|
~error_handler:(fun _ ?request:_ _ _ -> ())
|
||||||
Paf.http_service
|
(Serve.dispatch serve disk (Key_gen.hook_url ()) git_kv update)
|
||||||
~error_handler:(fun _ ?request:_ _ _ -> ())
|
in
|
||||||
(Serve.dispatch serve disk (Key_gen.hook_url ()) git_ctx update)
|
let `Initialized th = Paf.serve service t in
|
||||||
in
|
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
||||||
let `Initialized th = Paf.serve service t in
|
download_archives disk http_ctx git_kv >>= fun () ->
|
||||||
Logs.info (fun f -> f "listening on %d/HTTP" (Key_gen.port ()));
|
(th >|= fun _v -> ())
|
||||||
download_archives disk http_ctx store >>= fun () ->
|
|
||||||
(th >|= fun _v -> ())
|
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue