Serialize sequence of batches operations

This commit is contained in:
Romain Calascibetta 2022-10-28 10:49:59 +02:00
parent 98bd2bfe3c
commit 934ed7d960

View file

@ -8,7 +8,7 @@ type t =
; edn : Smart_git.Endpoint.t ; edn : Smart_git.Endpoint.t
; branch : Git.Reference.t ; branch : Git.Reference.t
; store : Store.t ; store : Store.t
; mutable batch : bool ; mutable batch : unit Lwt.t option
; mutable head : Store.hash option } ; mutable head : Store.hash option }
let init_store () = let init_store () =
@ -98,7 +98,7 @@ let connect ctx endpoint =
init_store () >>= fun store -> init_store () >>= fun store ->
let store = to_invalid store in let store = to_invalid store in
let edn, branch = split_url endpoint in let edn, branch = split_url endpoint in
let t = { ctx ; edn ; branch ; store ; batch= false; head = None } in let t = { ctx ; edn ; branch ; store ; batch= None; head= None } in
pull t >>= fun r -> pull t >>= fun r ->
let _r = to_invalid r in let _r = to_invalid r in
Lwt.return t Lwt.return t
@ -331,7 +331,7 @@ let of_octets ctx ~remote data =
>|= Rresult.R.failwith_error_msg >>= fun store -> >|= Rresult.R.failwith_error_msg >>= fun store ->
analyze store data >>= fun head -> analyze store data >>= fun head ->
let edn, branch = split_url remote in let edn, branch = split_url remote in
Lwt.return_ok { ctx ; edn ; branch ; store ; batch= false; head; }) Lwt.return_ok { ctx ; edn ; branch ; store ; batch= None; head; })
(fun exn -> (fun exn ->
Fmt.epr ">>> Got an exception: %s.\n%!" (Printexc.to_string exn) ; Fmt.epr ">>> Got an exception: %s.\n%!" (Printexc.to_string exn) ;
Fmt.epr ">>> %s.\n%!" Fmt.epr ">>> %s.\n%!"
@ -480,6 +480,12 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
Store.write t.store (Git.Value.Tree tree) >>? fun (hash, _) -> Store.write t.store (Git.Value.Tree tree) >>? fun (hash, _) ->
unroll_tree t ?head (`Dir, name, hash) rest unroll_tree t ?head (`Dir, name, hash) rest
| _ -> assert false ) | _ -> assert false )
let no_batch = function
| None -> true
| Some th -> match Lwt.state th with
| Sleep -> true
| Return _ | Fail _ -> false
let set ~and_push t key contents = let set ~and_push t key contents =
let segs = Mirage_kv.Key.segments key in let segs = Mirage_kv.Key.segments key in
@ -516,12 +522,12 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
let set ?(and_push= true) t key contents = let set ?(and_push= true) t key contents =
let open Lwt.Infix in let open Lwt.Infix in
let and_push = not t.batch && and_push in let and_push = no_batch t.batch && and_push in
set ~and_push t key contents >|= Rresult.R.reword_error to_write_error set ~and_push t key contents >|= Rresult.R.reword_error to_write_error
let set_partial ?(and_push= true) t key ~offset chunk = let set_partial ?(and_push= true) t key ~offset chunk =
let open Lwt_result.Infix in let open Lwt_result.Infix in
let and_push = not t.batch && and_push in let and_push = no_batch t.batch && and_push in
get t key >>= fun contents -> get t key >>= fun contents ->
let len = String.length contents in let len = String.length contents in
let add = String.length chunk in let add = String.length chunk in
@ -584,7 +590,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
let remove ?(and_push= true) t key = let remove ?(and_push= true) t key =
let open Lwt.Infix in let open Lwt.Infix in
let and_push = not t.batch && and_push in let and_push = no_batch t.batch && and_push in
remove ~and_push t key >|= Rresult.R.reword_error to_write_error remove ~and_push t key >|= Rresult.R.reword_error to_write_error
let rename ?(and_push= true) t ~source ~dest = let rename ?(and_push= true) t ~source ~dest =
@ -598,7 +604,11 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
let open Lwt.Infix in let open Lwt.Infix in
if retries < 0 if retries < 0
then Fmt.invalid_arg "Git_kv.Make.batch: retries must be equal or greater than 0" ; then Fmt.invalid_arg "Git_kv.Make.batch: retries must be equal or greater than 0" ;
t.batch <- true ; ( match t.batch with
| None -> Lwt.return_unit
| Some th -> th ) >>= fun () ->
let th, wk = Lwt.wait () in
t.batch <- Some th ;
f t >>= fun res -> f t >>= fun res ->
let rec force_push limit = let rec force_push limit =
Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] >>= function Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] >>= function
@ -607,7 +617,8 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
| Error err -> | Error err ->
Fmt.failwith "error pushing branch %a: %a" Git.Reference.pp t.branch Sync.pp_error err in Fmt.failwith "error pushing branch %a: %a" Git.Reference.pp t.branch Sync.pp_error err in
force_push retries >>= fun () -> force_push retries >>= fun () ->
t.batch <- false ; Lwt.return res Lwt.wakeup_later wk () ;
Lwt.return res
module Local = struct module Local = struct
let set_partial t k ~offset v = set_partial ~and_push:false t k ~offset v let set_partial t k ~offset v = set_partial ~and_push:false t k ~offset v