Use Lwt_{condition,mutex} instead of lwt tasks to serialize change_and_push
This commit is contained in:
parent
1327cc4f94
commit
3cae0f7765
1 changed files with 16 additions and 16 deletions
|
@ -9,7 +9,9 @@ type t =
|
|||
; branch : Git.Reference.t
|
||||
; store : Store.t
|
||||
; mutable committed : Digestif.SHA1.t option
|
||||
; mutable change_and_push_waiter : unit Lwt.t option
|
||||
; mutable change_and_push_running : bool
|
||||
; condition : unit Lwt_condition.t
|
||||
; mutex : Lwt_mutex.t
|
||||
; mutable head : Store.hash option }
|
||||
|
||||
let init_store () =
|
||||
|
@ -102,7 +104,7 @@ let connect ctx endpoint =
|
|||
init_store () >>= fun store ->
|
||||
let store = to_invalid store in
|
||||
let edn, branch = split_url endpoint in
|
||||
let t = { ctx ; edn ; branch ; store ; committed= None; change_and_push_waiter= None; head= None } in
|
||||
let t = { ctx ; edn ; branch ; store ; committed= None; change_and_push_running= false; condition= Lwt_condition.create (); mutex= Lwt_mutex.create (); head= None } in
|
||||
pull t >>= fun r ->
|
||||
let _r = to_invalid r in
|
||||
Lwt.return t
|
||||
|
@ -370,7 +372,7 @@ let of_octets ctx ~remote data =
|
|||
Result.fold ~ok:Fun.id ~error:(function `Msg msg -> failwith msg) >>= fun store ->
|
||||
analyze store data >>= fun head ->
|
||||
let edn, branch = split_url remote in
|
||||
Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; change_and_push_waiter= None; head; })
|
||||
Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; change_and_push_running= false; condition= Lwt_condition.create (); mutex= Lwt_mutex.create (); head; })
|
||||
(fun _exn ->
|
||||
Lwt.return_error (`Msg "Invalid PACK file"))
|
||||
|
||||
|
@ -684,12 +686,13 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
|
|||
[change_and_push_waiter] before we wait on the existing
|
||||
[change_and_push_waiter] task without any yield point in between to
|
||||
ensure serializability. *)
|
||||
let th, wk = Lwt.wait () in
|
||||
let th' = t.change_and_push_waiter in
|
||||
t.change_and_push_waiter <- Some th;
|
||||
( match th' with
|
||||
| None -> Lwt.return_unit
|
||||
| Some th -> th ) >>= fun () ->
|
||||
let open Lwt.Syntax in
|
||||
let* () = Lwt_mutex.with_lock t.mutex @@ fun () ->
|
||||
let rec await () =
|
||||
if t.change_and_push_running
|
||||
then Lwt_condition.wait ~mutex:t.mutex t.condition >>= await
|
||||
else begin t.change_and_push_running <- true; Lwt.return_unit end in
|
||||
await () in
|
||||
( let open Lwt_result.Infix in
|
||||
tree_root_hash_of_store t >>= fun tree_root_hash ->
|
||||
let t' = { t with committed = Some tree_root_hash } in
|
||||
|
@ -722,12 +725,9 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
|
|||
>|= Result.map_error
|
||||
(fun err -> `Msg (Fmt.str "error pushing %a" Store.pp_error err))
|
||||
>>= fun res ->
|
||||
Lwt.wakeup_later wk () ;
|
||||
(* (hannes) since some other task may have mutated the
|
||||
change_and_push_waiter, we only reset it to None if there's a physical
|
||||
equality between its value and our created task above. *)
|
||||
(match t.change_and_push_waiter with
|
||||
| Some th' -> if th' == th then t.change_and_push_waiter <- None
|
||||
| None -> ());
|
||||
let* () = Lwt_mutex.with_lock t.mutex @@ fun () ->
|
||||
t.change_and_push_running <- false;
|
||||
Lwt_condition.signal t.condition ();
|
||||
Lwt.return_unit in
|
||||
Lwt.return res
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue