Implement correctly a batch operation (which will push at the end of the given closure and provide a Local sub-module to be able to manipulate the Git repository without connections

This commit is contained in:
Romain Calascibetta 2022-10-27 16:29:12 +02:00
parent 6d6b0bd9c7
commit 98bd2bfe3c
2 changed files with 52 additions and 46 deletions

View file

@ -8,6 +8,7 @@ type t =
; edn : Smart_git.Endpoint.t ; edn : Smart_git.Endpoint.t
; branch : Git.Reference.t ; branch : Git.Reference.t
; store : Store.t ; store : Store.t
; mutable batch : bool
; mutable head : Store.hash option } ; mutable head : Store.hash option }
let init_store () = let init_store () =
@ -97,7 +98,7 @@ let connect ctx endpoint =
init_store () >>= fun store -> init_store () >>= fun store ->
let store = to_invalid store in let store = to_invalid store in
let edn, branch = split_url endpoint in let edn, branch = split_url endpoint in
let t = { ctx ; edn ; branch ; store ; head = None } in let t = { ctx ; edn ; branch ; store ; batch= false; head = None } in
pull t >>= fun r -> pull t >>= fun r ->
let _r = to_invalid r in let _r = to_invalid r in
Lwt.return t Lwt.return t
@ -330,7 +331,7 @@ let of_octets ctx ~remote data =
>|= Rresult.R.failwith_error_msg >>= fun store -> >|= Rresult.R.failwith_error_msg >>= fun store ->
analyze store data >>= fun head -> analyze store data >>= fun head ->
let edn, branch = split_url remote in let edn, branch = split_url remote in
Lwt.return_ok { ctx ; edn ; branch ; store ; head; }) Lwt.return_ok { ctx ; edn ; branch ; store ; batch= false; head; })
(fun exn -> (fun exn ->
Fmt.epr ">>> Got an exception: %s.\n%!" (Printexc.to_string exn) ; Fmt.epr ">>> Got an exception: %s.\n%!" (Printexc.to_string exn) ;
Fmt.epr ">>> %s.\n%!" Fmt.epr ">>> %s.\n%!"
@ -480,7 +481,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
unroll_tree t ?head (`Dir, name, hash) rest unroll_tree t ?head (`Dir, name, hash) rest
| _ -> assert false ) | _ -> assert false )
let set ?(push= false) t key contents = let set ~and_push t key contents =
let segs = Mirage_kv.Key.segments key in let segs = Mirage_kv.Key.segments key in
let now () = Int64.of_float (Ptime.to_float_s (Ptime.v (Pclock.now_d_ps ()))) in let now () = Int64.of_float (Ptime.to_float_s (Ptime.v (Pclock.now_d_ps ()))) in
match segs with match segs with
@ -499,7 +500,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
~parents (Some "Committed by git-kv") in ~parents (Some "Committed by git-kv") in
Store.write t.store (Git.Value.Commit commit) >>= fun (hash, _) -> Store.write t.store (Git.Value.Commit commit) >>= fun (hash, _) ->
Store.Ref.write t.store t.branch (Git.Reference.uid hash) >>= fun () -> Store.Ref.write t.store t.branch (Git.Reference.uid hash) >>= fun () ->
Lwt.Infix.(if push then Lwt.Infix.(if and_push then
Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ]
>|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing branch %a: %a" >|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing branch %a: %a"
Git.Reference.pp t.branch Sync.pp_error err)) Git.Reference.pp t.branch Sync.pp_error err))
@ -513,23 +514,23 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
| `Msg err -> `Msg err | `Msg err -> `Msg err
| err -> Rresult.R.msgf "%a" Store.pp_error err | err -> Rresult.R.msgf "%a" Store.pp_error err
let set ?push t key contents = let set ?(and_push= true) t key contents =
let open Lwt.Infix in let open Lwt.Infix in
set ?push t key contents >|= Rresult.R.reword_error to_write_error let and_push = not t.batch && and_push in
set ~and_push t key contents >|= Rresult.R.reword_error to_write_error
let set_partial ?push t key ~offset chunk = let set_partial ?(and_push= true) t key ~offset chunk =
let open Lwt_result.Infix in let open Lwt_result.Infix in
let and_push = not t.batch && and_push in
get t key >>= fun contents -> get t key >>= fun contents ->
let len = String.length contents in let len = String.length contents in
let add = String.length chunk in let add = String.length chunk in
let res = Bytes.make (max len (offset + add)) '\000' in let res = Bytes.make (max len (offset + add)) '\000' in
Bytes.blit_string contents 0 res 0 len ; Bytes.blit_string contents 0 res 0 len ;
Bytes.blit_string chunk 0 res offset add ; Bytes.blit_string chunk 0 res offset add ;
set ?push t key (Bytes.unsafe_to_string res) set ~and_push t key (Bytes.unsafe_to_string res)
let batch t ?retries:_ f = f t let remove ~and_push t key =
let remove ?(push= false) t key =
let segs = Mirage_kv.Key.segments key in let segs = Mirage_kv.Key.segments key in
let now () = Int64.of_float (Ptime.to_float_s (Ptime.v (Pclock.now_d_ps ()))) in let now () = Int64.of_float (Ptime.to_float_s (Ptime.v (Pclock.now_d_ps ()))) in
match List.rev segs, t.head with match List.rev segs, t.head with
@ -549,7 +550,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
~parents:[ head ] (Some "Committed by git-kv") in ~parents:[ head ] (Some "Committed by git-kv") in
Store.write t.store (Git.Value.Commit commit) >>= fun (hash, _) -> Store.write t.store (Git.Value.Commit commit) >>= fun (hash, _) ->
Store.Ref.write t.store t.branch (Git.Reference.uid hash) >>= fun () -> Store.Ref.write t.store t.branch (Git.Reference.uid hash) >>= fun () ->
Lwt.Infix.(if push then Lwt.Infix.(if and_push then
Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ]
>|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing branch %a: %a" >|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing branch %a: %a"
Git.Reference.pp t.branch Sync.pp_error err)) Git.Reference.pp t.branch Sync.pp_error err))
@ -572,7 +573,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
~parents:[ head ] (Some "Committed by git-kv") in ~parents:[ head ] (Some "Committed by git-kv") in
Store.write t.store (Git.Value.Commit commit) >>= fun (hash, _) -> Store.write t.store (Git.Value.Commit commit) >>= fun (hash, _) ->
Store.Ref.write t.store t.branch (Git.Reference.uid hash) >>= fun () -> Store.Ref.write t.store t.branch (Git.Reference.uid hash) >>= fun () ->
Lwt.Infix.(if push then Lwt.Infix.(if and_push then
Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ]
>|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing branch %a: %a" >|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing branch %a: %a"
Git.Reference.pp t.branch Sync.pp_error err)) Git.Reference.pp t.branch Sync.pp_error err))
@ -581,23 +582,42 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
t.head <- Some hash ; Lwt.return_ok () t.head <- Some hash ; Lwt.return_ok ()
| _ -> Lwt.return_ok () | _ -> Lwt.return_ok ()
let remove ?push t key = let remove ?(and_push= true) t key =
let open Lwt.Infix in let open Lwt.Infix in
remove ?push t key >|= Rresult.R.reword_error to_write_error let and_push = not t.batch && and_push in
remove ~and_push t key >|= Rresult.R.reword_error to_write_error
let rename ?(push= false) t ~source ~dest = let rename ?(and_push= true) t ~source ~dest =
(* TODO(dinosaure): optimize it! It was done on the naive way. *) (* TODO(dinosaure): optimize it! It was done on the naive way. *)
let open Lwt_result.Infix in let open Lwt_result.Infix in
get t source >>= fun contents -> get t source >>= fun contents ->
remove ~push t source >>= fun () -> remove ~and_push t source >>= fun () ->
set ~push t dest contents set ~and_push t dest contents
let set_partial_and_push t k ~offset v = set_partial ~push:true t k ~offset v let batch t ?(retries= 3) f =
let set_partial t k ~offset v = set_partial ~push:false t k ~offset v let open Lwt.Infix in
let set_and_push t k v = set ~push:true t k v if retries < 0
let set t k v = set ~push:false t k v then Fmt.invalid_arg "Git_kv.Make.batch: retries must be equal or greater than 0" ;
let remove_and_push t k = remove ~push:true t k t.batch <- true ;
let remove t k = remove ~push:false t k f t >>= fun res ->
let rename_and_push t ~source ~dest = rename ~push:true t ~source ~dest let rec force_push limit =
let rename t ~source ~dest = rename ~push:false t ~source ~dest Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] >>= function
| Ok () -> Lwt.return_unit
| Error _ when limit > 0 -> force_push (pred limit)
| Error err ->
Fmt.failwith "error pushing branch %a: %a" Git.Reference.pp t.branch Sync.pp_error err in
force_push retries >>= fun () ->
t.batch <- false ; Lwt.return res
module Local = struct
let set_partial t k ~offset v = set_partial ~and_push:false t k ~offset v
let set t k v = set ~and_push:false t k v
let remove t k = remove ~and_push:false t k
let rename t ~source ~dest = rename ~and_push:false t ~source ~dest
end
let set_partial t k ~offset v = set_partial ~and_push:true t k ~offset v
let set t k v = set ~and_push:true t k v
let remove t k = remove ~and_push:true t k
let rename t ~source ~dest = rename ~and_push:true t ~source ~dest
end end

View file

@ -60,24 +60,10 @@ module Make (Pclock : Mirage_clock.PCLOCK) : sig
| `Reference_not_found of Git.Reference.t | `Reference_not_found of Git.Reference.t
| Mirage_kv.write_error ] | Mirage_kv.write_error ]
val set_and_push : t -> key -> string -> (unit, write_error) result Lwt.t module Local : sig
(** [set_and_push t k v] replaces the binding [k -> v] in [t] and pushes this val set : t -> key -> string -> (unit, write_error) result Lwt.t
modification to the remote repository. This function can fail on the val remove : t -> key -> (unit, write_error) result Lwt.t
synchronisation mechanism if the remote Git repository is {i in advance} val rename : t -> source:key -> dest:key -> (unit, write_error) result Lwt.t
of your local repository. It is advisable to use {!val:pull} before. *) val set_partial : t -> key -> offset:int -> string -> (unit, write_error) result Lwt.t
end
val remove_and_push : t -> key -> (unit, write_error) result Lwt.t
(** Same as {!val:remove} with the synchronisation mechanism, see
{!val:set_and_push} for more details about possible failures and how to
use it. *)
val rename_and_push : t -> source:key -> dest:key -> (unit, write_error) result Lwt.t
(** Same as {!val:rename} with the synchronisation mechanism, see
{!val:rename_and_push} for more details about possible failures and how
to use it. *)
val set_partial_and_push : t -> key -> offset:int -> string -> (unit, write_error) result Lwt.t
(** Same as {!val:set_partial} with the synchronisation mechanism, see
{!val:set_and_push} for more details about possible failures and how to
use it. *)
end end