From 934ed7d960cec5d6e862dbbf2742fa195c2bad44 Mon Sep 17 00:00:00 2001 From: Romain Calascibetta Date: Fri, 28 Oct 2022 10:49:59 +0200 Subject: [PATCH] Serialize sequence of batches operations --- src/git_kv.ml | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/git_kv.ml b/src/git_kv.ml index 1271368..89ba555 100644 --- a/src/git_kv.ml +++ b/src/git_kv.ml @@ -8,7 +8,7 @@ type t = ; edn : Smart_git.Endpoint.t ; branch : Git.Reference.t ; store : Store.t - ; mutable batch : bool + ; mutable batch : unit Lwt.t option ; mutable head : Store.hash option } let init_store () = @@ -98,7 +98,7 @@ let connect ctx endpoint = init_store () >>= fun store -> let store = to_invalid store in let edn, branch = split_url endpoint in - let t = { ctx ; edn ; branch ; store ; batch= false; head = None } in + let t = { ctx ; edn ; branch ; store ; batch= None; head= None } in pull t >>= fun r -> let _r = to_invalid r in Lwt.return t @@ -331,7 +331,7 @@ let of_octets ctx ~remote data = >|= Rresult.R.failwith_error_msg >>= fun store -> analyze store data >>= fun head -> let edn, branch = split_url remote in - Lwt.return_ok { ctx ; edn ; branch ; store ; batch= false; head; }) + Lwt.return_ok { ctx ; edn ; branch ; store ; batch= None; head; }) (fun exn -> Fmt.epr ">>> Got an exception: %s.\n%!" (Printexc.to_string exn) ; Fmt.epr ">>> %s.\n%!" @@ -480,6 +480,12 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct Store.write t.store (Git.Value.Tree tree) >>? fun (hash, _) -> unroll_tree t ?head (`Dir, name, hash) rest | _ -> assert false ) + + let no_batch = function + | None -> true + | Some th -> match Lwt.state th with + | Sleep -> true + | Return _ | Fail _ -> false let set ~and_push t key contents = let segs = Mirage_kv.Key.segments key in @@ -516,12 +522,12 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct let set ?(and_push= true) t key contents = let open Lwt.Infix in - let and_push = not t.batch && and_push in + let and_push = no_batch t.batch && and_push in set ~and_push t key contents >|= Rresult.R.reword_error to_write_error let set_partial ?(and_push= true) t key ~offset chunk = let open Lwt_result.Infix in - let and_push = not t.batch && and_push in + let and_push = no_batch t.batch && and_push in get t key >>= fun contents -> let len = String.length contents in let add = String.length chunk in @@ -584,7 +590,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct let remove ?(and_push= true) t key = let open Lwt.Infix in - let and_push = not t.batch && and_push in + let and_push = no_batch t.batch && and_push in remove ~and_push t key >|= Rresult.R.reword_error to_write_error let rename ?(and_push= true) t ~source ~dest = @@ -598,7 +604,11 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct let open Lwt.Infix in if retries < 0 then Fmt.invalid_arg "Git_kv.Make.batch: retries must be equal or greater than 0" ; - t.batch <- true ; + ( match t.batch with + | None -> Lwt.return_unit + | Some th -> th ) >>= fun () -> + let th, wk = Lwt.wait () in + t.batch <- Some th ; f t >>= fun res -> let rec force_push limit = Sync.push ~capabilities ~ctx:t.ctx t.edn t.store [ `Update (t.branch, t.branch) ] >>= function @@ -607,7 +617,8 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct | Error err -> Fmt.failwith "error pushing branch %a: %a" Git.Reference.pp t.branch Sync.pp_error err in force_push retries >>= fun () -> - t.batch <- false ; Lwt.return res + Lwt.wakeup_later wk () ; + Lwt.return res module Local = struct let set_partial t k ~offset v = set_partial ~and_push:false t k ~offset v