Add a way to unserialize a state and reconstruct the Git store
This commit is contained in:
parent
e6254f0439
commit
4dafa3a942
2 changed files with 196 additions and 19 deletions
212
src/git_kv.ml
212
src/git_kv.ml
|
@ -119,6 +119,12 @@ module SHA1 = struct
|
|||
include Digestif.SHA1
|
||||
|
||||
let hash x = Hashtbl.hash x
|
||||
let feed ctx ?off ?len ba = feed_bigstring ctx ?off ?len ba
|
||||
let null = digest_string ""
|
||||
let length = digest_size
|
||||
let compare a b =
|
||||
String.compare
|
||||
(to_raw_string a) (to_raw_string b)
|
||||
end
|
||||
|
||||
module Verbose = struct
|
||||
|
@ -128,7 +134,46 @@ module Verbose = struct
|
|||
let print _ = Lwt.return_unit
|
||||
end
|
||||
|
||||
module Lwt_scheduler = struct
|
||||
module Mutex = struct
|
||||
type 'a fiber = 'a Lwt.t
|
||||
type t = Lwt_mutex.t
|
||||
|
||||
let create () = Lwt_mutex.create ()
|
||||
let lock t = Lwt_mutex.lock t
|
||||
let unlock t = Lwt_mutex.unlock t
|
||||
end
|
||||
|
||||
module Condition = struct
|
||||
type 'a fiber = 'a Lwt.t
|
||||
type mutex = Mutex.t
|
||||
type t = unit Lwt_condition.t
|
||||
|
||||
let create () = Lwt_condition.create ()
|
||||
let wait t mutex = Lwt_condition.wait ~mutex t
|
||||
let signal t = Lwt_condition.signal t ()
|
||||
let broadcast t = Lwt_condition.broadcast t ()
|
||||
end
|
||||
|
||||
type 'a t = 'a Lwt.t
|
||||
|
||||
let bind x f = Lwt.bind x f
|
||||
let return x = Lwt.return x
|
||||
let parallel_map ~f lst = Lwt_list.map_p f lst
|
||||
let parallel_iter ~f lst = Lwt_list.iter_p f lst
|
||||
let detach f =
|
||||
let th, wk = Lwt.wait () in
|
||||
Lwt.async (fun () ->
|
||||
let res = f () in
|
||||
Lwt.wakeup_later wk res ;
|
||||
Lwt.return_unit) ;
|
||||
th
|
||||
end
|
||||
|
||||
module Scheduler = Carton.Make (Lwt)
|
||||
module Delta = Carton_lwt.Enc.Delta (SHA1) (Verbose)
|
||||
module First_pass = Carton.Dec.Fp (SHA1)
|
||||
module Verify = Carton.Dec.Verify (SHA1) (Scheduler) (Lwt_scheduler)
|
||||
|
||||
let pack t ~commit stream =
|
||||
let open Lwt.Infix in
|
||||
|
@ -201,29 +246,160 @@ let pack t ~commit stream =
|
|||
stream None ;
|
||||
Lwt.return_unit
|
||||
|
||||
let to_octets t =
|
||||
(* TODO maybe preserve edn and branch as well? *)
|
||||
let to_octets t = match t.head with
|
||||
| None -> assert false (* TODO(dinosaure): empty PACK file *)
|
||||
| Some commit ->
|
||||
let buf = Buffer.create 0x100 in
|
||||
let stream = Option.iter (Buffer.add_string buf) in
|
||||
let open Lwt.Infix in
|
||||
match t.head with
|
||||
| None -> Lwt.return ""
|
||||
| Some head ->
|
||||
Store.read_exn t.store head >|= function
|
||||
| Commit c ->
|
||||
let l = Encore.to_lavoisier Git_commit.format in
|
||||
Encore.Lavoisier.emit_string c l
|
||||
| _ -> assert false
|
||||
pack t.store ~commit stream >|= fun () ->
|
||||
Buffer.contents buf
|
||||
|
||||
let digest ~kind ?(off = 0) ?len buf =
|
||||
let len =
|
||||
match len with Some len -> len | None -> Bigstringaf.length buf - off in
|
||||
let ctx = SHA1.empty in
|
||||
let ctx =
|
||||
match kind with
|
||||
| `A -> SHA1.feed_string ctx (Fmt.str "commit %d\000" len)
|
||||
| `B -> SHA1.feed_string ctx (Fmt.str "tree %d\000" len)
|
||||
| `C -> SHA1.feed_string ctx (Fmt.str "blob %d\000" len)
|
||||
| `D -> SHA1.feed_string ctx (Fmt.str "tag %d\000" len) in
|
||||
let ctx = SHA1.feed_bigstring ctx ~off ~len buf in
|
||||
SHA1.get ctx
|
||||
|
||||
let analyze stream =
|
||||
let where = Hashtbl.create 0x100 in
|
||||
let child = Hashtbl.create 0x100 in
|
||||
let sizes = Hashtbl.create 0x100 in
|
||||
|
||||
let replace tbl k v = match Hashtbl.find_opt tbl k with
|
||||
| Some v' -> if v' < v then Hashtbl.replace tbl k v
|
||||
| _ -> Hashtbl.add tbl k v in
|
||||
|
||||
let rec go acc tmp decoder = let open Lwt.Infix in
|
||||
match First_pass.decode decoder with
|
||||
| `Await decoder ->
|
||||
( stream () >>= function
|
||||
| Some str ->
|
||||
let tmp = Bigstringaf.of_string str ~off:0 ~len:(String.length str) in
|
||||
go acc tmp (First_pass.src decoder tmp 0 (String.length str))
|
||||
| None -> failwith "Truncated PACK file" )
|
||||
| `Peek decoder ->
|
||||
let keep = First_pass.src_rem decoder in
|
||||
( stream () >>= function
|
||||
| Some str ->
|
||||
let tmp = Bigstringaf.create (keep + String.length str) in
|
||||
Bigstringaf.blit tmp ~src_off:0 tmp ~dst_off:0 ~len:keep ;
|
||||
Bigstringaf.blit_from_string str ~src_off:0 tmp ~dst_off:keep
|
||||
~len:(String.length str) ;
|
||||
go acc tmp (First_pass.src decoder tmp 0 (keep + String.length str))
|
||||
| None -> failwith "Truncated PACK file" )
|
||||
| `Entry ({ First_pass.kind= Base _; offset; size; _ }, decoder) ->
|
||||
Hashtbl.add where offset (First_pass.count decoder - 1) ;
|
||||
Hashtbl.add sizes offset size ;
|
||||
go (Verify.unresolved_base ~cursor:offset :: acc) tmp decoder
|
||||
| `Entry ({ First_pass.kind= Ofs { sub= v; source; target; }
|
||||
; offset; _ }, decoder) ->
|
||||
Hashtbl.add where offset (First_pass.count decoder - 1) ;
|
||||
replace sizes Int64.(sub offset (of_int v)) source ;
|
||||
replace sizes offset target ;
|
||||
( try let vs = Hashtbl.find child (`Ofs Int64.(sub offset (of_int v))) in
|
||||
Hashtbl.replace child (`Ofs Int64.(sub offset (of_int v))) (offset :: vs)
|
||||
with _ -> Hashtbl.add child (`Ofs Int64.(sub offset (of_int v))) [ offset ] ) ;
|
||||
go (Verify.unresolved_node :: acc) tmp decoder
|
||||
| `Entry ({ First_pass.kind= Ref { ptr; target; source; }
|
||||
; offset; _ }, decoder) ->
|
||||
Hashtbl.add where offset (First_pass.count decoder - 1) ;
|
||||
replace sizes offset (Stdlib.max target source) ;
|
||||
( try let vs = Hashtbl.find child (`Ref ptr) in
|
||||
Hashtbl.replace child (`Ref ptr) (offset :: vs)
|
||||
with _ -> Hashtbl.add child (`Ref ptr) [ offset ] ) ;
|
||||
go (Verify.unresolved_node :: acc) tmp decoder
|
||||
| `End _hash ->
|
||||
let where ~cursor = Hashtbl.find where cursor in
|
||||
let children ~cursor ~uid =
|
||||
match Hashtbl.find_opt child (`Ofs cursor),
|
||||
Hashtbl.find_opt child (`Ref uid) with
|
||||
| Some a, Some b -> List.sort_uniq compare (a @ b)
|
||||
| Some x, None | None, Some x -> x
|
||||
| None, None -> [] in
|
||||
let weight ~cursor = Hashtbl.find sizes cursor in
|
||||
let oracle = { Carton.Dec.where
|
||||
; Carton.Dec.children
|
||||
; Carton.Dec.digest
|
||||
; Carton.Dec.weight } in
|
||||
Lwt.return (List.rev acc, oracle)
|
||||
| `Malformed err -> failwith err in
|
||||
|
||||
let o = Bigstringaf.create De.io_buffer_size in
|
||||
let allocate _ = De.make_window ~bits:15 in
|
||||
let decoder = First_pass.decoder ~o ~allocate `Manual in
|
||||
let open Lwt.Infix in
|
||||
go [] De.bigstring_empty decoder >>= fun (matrix, oracle) ->
|
||||
Lwt.return (Array.of_list matrix, oracle)
|
||||
|
||||
let stream_of_string str =
|
||||
let closed = ref false in
|
||||
fun () -> match !closed with
|
||||
| true -> Lwt.return_none
|
||||
| false -> closed := true ; Lwt.return_some str
|
||||
|
||||
let map contents ~pos len =
|
||||
Bigstringaf.of_string ~off:(Int64.to_int pos) ~len contents
|
||||
|
||||
let unpack contents =
|
||||
let open Lwt.Infix in
|
||||
analyze (stream_of_string contents) >>= fun (matrix, oracle) ->
|
||||
let z = De.bigstring_create De.io_buffer_size in
|
||||
let allocate bits = De.make_window ~bits in
|
||||
let never _ = assert false in
|
||||
let pack = Carton.Dec.make contents ~allocate ~z ~uid_ln:SHA1.length
|
||||
~uid_rw:SHA1.of_raw_string never in
|
||||
Verify.verify ~threads:4 pack ~map ~oracle ~verbose:ignore ~matrix >>= fun () ->
|
||||
let index = Hashtbl.create (Array.length matrix) in
|
||||
let iter v =
|
||||
let offset = Verify.offset_of_status v in
|
||||
let hash = Verify.uid_of_status v in
|
||||
Hashtbl.add index hash offset in
|
||||
Array.iter iter matrix ;
|
||||
let pack =
|
||||
Carton.Dec.make contents ~allocate ~z ~uid_ln:SHA1.length
|
||||
~uid_rw:SHA1.of_raw_string (Hashtbl.find index) in
|
||||
init_store ()
|
||||
>|= Rresult.R.reword_error (Rresult.R.msgf "%a" Store.pp_error)
|
||||
>|= Rresult.R.failwith_error_msg >>= fun store ->
|
||||
let rec go commit idx =
|
||||
if idx < Array.length matrix
|
||||
then
|
||||
let cursor = Verify.offset_of_status matrix.(idx) in
|
||||
let weight = Carton.Dec.weight_of_offset ~map pack ~weight:Carton.Dec.null cursor in
|
||||
let raw = Carton.Dec.make_raw ~weight in
|
||||
let v = Carton.Dec.of_offset ~map pack raw ~cursor in
|
||||
let kind = match Carton.Dec.kind v with
|
||||
| `A -> `Commit
|
||||
| `B -> `Tree
|
||||
| `C -> `Blob
|
||||
| `D -> `Tag in
|
||||
Store.write_inflated store ~kind
|
||||
(Cstruct.of_bigarray ~off:0 ~len:(Carton.Dec.len v) (Carton.Dec.raw v)) >>= fun hash ->
|
||||
( if kind = `Commit
|
||||
then Store.shallow store hash
|
||||
else Lwt.return_unit ) >>= fun () ->
|
||||
go (if kind = `Commit then Some hash else None) (succ idx)
|
||||
else Lwt.return commit in
|
||||
go None 0 >>= fun head -> Lwt.return (store, head)
|
||||
|
||||
let of_octets ctx ~remote data =
|
||||
(* TODO maybe recover edn and branch from data as well? *)
|
||||
let open Lwt_result.Infix in
|
||||
let l = Encore.to_angstrom Git_commit.format in
|
||||
Lwt.return
|
||||
(Result.map_error (fun e -> `Msg e)
|
||||
(Angstrom.parse_string ~consume:All l data)) >>= fun head ->
|
||||
let head = Git_commit.tree head in
|
||||
init_store () >|= fun store ->
|
||||
let open Lwt.Infix in
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
unpack data >>= fun (store, head) ->
|
||||
let edn, branch = split_url remote in
|
||||
{ ctx ; edn ; branch ; store ; head = Some head }
|
||||
Lwt.return_ok { ctx ; edn ; branch ; store ; head; })
|
||||
(fun _exn ->
|
||||
Lwt.return_error (`Msg "Invalid PACK file"))
|
||||
|
||||
let exists t key =
|
||||
let open Lwt.Infix in
|
||||
|
|
1
src/pack.ml
Normal file
1
src/pack.ml
Normal file
|
@ -0,0 +1 @@
|
|||
|
Loading…
Reference in a new issue