diff --git a/src/git_kv.ml b/src/git_kv.ml index a6be55b..6f43245 100644 --- a/src/git_kv.ml +++ b/src/git_kv.ml @@ -9,7 +9,9 @@ type t = ; branch : Git.Reference.t ; store : Store.t ; mutable committed : Digestif.SHA1.t option - ; mutable change_and_push_waiter : unit Lwt.t option + ; mutable change_and_push_running : bool + ; condition : unit Lwt_condition.t + ; mutex : Lwt_mutex.t ; mutable head : Store.hash option } let init_store () = @@ -102,7 +104,7 @@ let connect ctx endpoint = init_store () >>= fun store -> let store = to_invalid store in let edn, branch = split_url endpoint in - let t = { ctx ; edn ; branch ; store ; committed= None; change_and_push_waiter= None; head= None } in + let t = { ctx ; edn ; branch ; store ; committed= None; change_and_push_running= false; condition= Lwt_condition.create (); mutex= Lwt_mutex.create (); head= None } in pull t >>= fun r -> let _r = to_invalid r in Lwt.return t @@ -370,7 +372,7 @@ let of_octets ctx ~remote data = Result.fold ~ok:Fun.id ~error:(function `Msg msg -> failwith msg) >>= fun store -> analyze store data >>= fun head -> let edn, branch = split_url remote in - Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; change_and_push_waiter= None; head; }) + Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; change_and_push_running= false; condition= Lwt_condition.create (); mutex= Lwt_mutex.create (); head; }) (fun _exn -> Lwt.return_error (`Msg "Invalid PACK file")) @@ -684,12 +686,13 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct [change_and_push_waiter] before we wait on the existing [change_and_push_waiter] task without any yield point in between to ensure serializability. *) - let th, wk = Lwt.wait () in - let th' = t.change_and_push_waiter in - t.change_and_push_waiter <- Some th; - ( match th' with - | None -> Lwt.return_unit - | Some th -> th ) >>= fun () -> + let open Lwt.Syntax in + let* () = Lwt_mutex.with_lock t.mutex @@ fun () -> + let rec await () = + if t.change_and_push_running + then Lwt_condition.wait ~mutex:t.mutex t.condition >>= await + else begin t.change_and_push_running <- true; Lwt.return_unit end in + await () in ( let open Lwt_result.Infix in tree_root_hash_of_store t >>= fun tree_root_hash -> let t' = { t with committed = Some tree_root_hash } in @@ -722,12 +725,9 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct >|= Result.map_error (fun err -> `Msg (Fmt.str "error pushing %a" Store.pp_error err)) >>= fun res -> - Lwt.wakeup_later wk () ; - (* (hannes) since some other task may have mutated the - change_and_push_waiter, we only reset it to None if there's a physical - equality between its value and our created task above. *) - (match t.change_and_push_waiter with - | Some th' -> if th' == th then t.change_and_push_waiter <- None - | None -> ()); + let* () = Lwt_mutex.with_lock t.mutex @@ fun () -> + t.change_and_push_running <- false; + Lwt_condition.signal t.condition (); + Lwt.return_unit in Lwt.return res end