as suggested by @reynir @dinosaure, use a single Lwt_mutex.t

This commit is contained in:
Hannes Mehnert 2024-10-29 11:51:55 +01:00
parent 681e4f2367
commit 0b330178e1

View file

@ -9,8 +9,6 @@ type t =
; branch : Git.Reference.t ; branch : Git.Reference.t
; store : Store.t ; store : Store.t
; mutable committed : Digestif.SHA1.t option ; mutable committed : Digestif.SHA1.t option
; mutable change_and_push_running : bool
; condition : unit Lwt_condition.t
; mutex : Lwt_mutex.t ; mutex : Lwt_mutex.t
; mutable head : Store.hash option } ; mutable head : Store.hash option }
@ -104,7 +102,7 @@ let connect ctx endpoint =
init_store () >>= fun store -> init_store () >>= fun store ->
let store = to_invalid store in let store = to_invalid store in
let edn, branch = split_url endpoint in let edn, branch = split_url endpoint in
let t = { ctx ; edn ; branch ; store ; committed= None; change_and_push_running= false; condition= Lwt_condition.create (); mutex= Lwt_mutex.create (); head= None } in let t = { ctx ; edn ; branch ; store ; committed= None; mutex= Lwt_mutex.create (); head= None } in
pull t >>= fun r -> pull t >>= fun r ->
let _r = to_invalid r in let _r = to_invalid r in
Lwt.return t Lwt.return t
@ -372,7 +370,7 @@ let of_octets ctx ~remote data =
Result.fold ~ok:Fun.id ~error:(function `Msg msg -> failwith msg) >>= fun store -> Result.fold ~ok:Fun.id ~error:(function `Msg msg -> failwith msg) >>= fun store ->
analyze store data >>= fun head -> analyze store data >>= fun head ->
let edn, branch = split_url remote in let edn, branch = split_url remote in
Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; change_and_push_running= false; condition= Lwt_condition.create (); mutex= Lwt_mutex.create (); head; }) Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; mutex= Lwt_mutex.create (); head; })
(fun _exn -> (fun _exn ->
Lwt.return_error (`Msg "Invalid PACK file")) Lwt.return_error (`Msg "Invalid PACK file"))
@ -679,20 +677,7 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
match t.committed with match t.committed with
| Some _ -> Lwt.return_error (`Msg "Nested change_and_push") | Some _ -> Lwt.return_error (`Msg "Nested change_and_push")
| None -> | None ->
(* XXX(dinosaure): serialize [change_and_push]. If we do Lwt_mutex.with_lock t.mutex (fun () ->
[Lwt.both (change_and_push ..) (change_and_push ..)], they can not run
concurrently! The second will waiting the first to finish.
(reynir): Furthermore, we need to create a new task and update
[change_and_push_waiter] before we wait on the existing
[change_and_push_waiter] task without any yield point in between to
ensure serializability. *)
let open Lwt.Syntax in
let* () = Lwt_mutex.with_lock t.mutex @@ fun () ->
let rec await () =
if t.change_and_push_running
then Lwt_condition.wait ~mutex:t.mutex t.condition >>= await
else begin t.change_and_push_running <- true; Lwt.return_unit end in
await () in
(let open Lwt_result.Infix in (let open Lwt_result.Infix in
tree_root_hash_of_store t >>= fun tree_root_hash -> tree_root_hash_of_store t >>= fun tree_root_hash ->
let t' = { t with committed = Some tree_root_hash } in let t' = { t with committed = Some tree_root_hash } in
@ -723,11 +708,5 @@ module Make (Pclock : Mirage_clock.PCLOCK) = struct
t.head <- Some hash ; t.head <- Some hash ;
Lwt.return_ok res) Lwt.return_ok res)
>|= Result.map_error >|= Result.map_error
(fun err -> `Msg (Fmt.str "error pushing %a" Store.pp_error err)) (fun err -> `Msg (Fmt.str "error pushing %a" Store.pp_error err)))
>>= fun res ->
let* () = Lwt_mutex.with_lock t.mutex @@ fun () ->
t.change_and_push_running <- false;
Lwt_condition.signal t.condition ();
Lwt.return_unit in
Lwt.return res
end end