Merge pull request 'Speed-up to way to unserialize a PACK file' (#13) from speed-up into main
Reviewed-on: https://git.robur.io/robur/git-kv/pulls/13
This commit is contained in:
commit
1a236e190b
2 changed files with 83 additions and 140 deletions
|
@ -18,3 +18,7 @@ build: [
|
|||
["dune" "subst"] {dev}
|
||||
["dune" "build" "-p" name "-j" jobs]
|
||||
]
|
||||
|
||||
pin-depends: [
|
||||
[ "carton.dev" "git+https://github.com/mirage/ocaml-git.git#8cb31ed46aa2600f645dba204b164c7fce2f7037" ]
|
||||
]
|
||||
|
|
183
src/git_kv.ml
183
src/git_kv.ml
|
@ -185,6 +185,11 @@ module Delta = Carton_lwt.Enc.Delta (SHA1) (Verbose)
|
|||
module First_pass = Carton.Dec.Fp (SHA1)
|
||||
module Verify = Carton.Dec.Verify (SHA1) (Scheduler) (Lwt_scheduler)
|
||||
|
||||
let ( <.> ) f g = fun x -> f (g x)
|
||||
let ( >>? ) x f = let open Lwt.Infix in match x with
|
||||
| Some x -> f x >>= fun v -> Lwt.return_some v
|
||||
| None -> Lwt.return_none
|
||||
|
||||
let pack t ~commit stream =
|
||||
let open Lwt.Infix in
|
||||
let load t hash =
|
||||
|
@ -223,6 +228,11 @@ let pack t ~commit stream =
|
|||
ctx := SHA1.feed_bigstring !ctx header ~off:0 ~len:12 ;
|
||||
cursor := Int64.add !cursor 12L ;
|
||||
let encode_target idx =
|
||||
Carton.Enc.target_patch targets.(idx)
|
||||
>>? (find <.> Carton.Enc.source_of_patch)
|
||||
>>= function
|
||||
| Some None -> failwith "Try to encode an OBJ_REF object" (* XXX(dinosaure): should never occur! *)
|
||||
| Some (Some (_ (* offset *) : int)) | None ->
|
||||
Hashtbl.add offsets (Carton.Enc.target_uid targets.(idx)) !cursor ;
|
||||
Carton_lwt.Enc.encode_target ~b ~find ~load:(load t) ~uid targets.(idx) ~cursor:(Int64.to_int !cursor)
|
||||
>>= fun (len, encoder) ->
|
||||
|
@ -260,129 +270,35 @@ let to_octets t = match t.head with
|
|||
pack t.store ~commit stream >|= fun () ->
|
||||
Buffer.contents buf
|
||||
|
||||
let digest ~kind ?(off = 0) ?len buf =
|
||||
let len =
|
||||
match len with Some len -> len | None -> Bigstringaf.length buf - off in
|
||||
let ctx = SHA1.empty in
|
||||
let ctx =
|
||||
match kind with
|
||||
| `A -> SHA1.feed_string ctx (Fmt.str "commit %d\000" len)
|
||||
| `B -> SHA1.feed_string ctx (Fmt.str "tree %d\000" len)
|
||||
| `C -> SHA1.feed_string ctx (Fmt.str "blob %d\000" len)
|
||||
| `D -> SHA1.feed_string ctx (Fmt.str "tag %d\000" len) in
|
||||
let ctx = SHA1.feed_bigstring ctx ~off ~len buf in
|
||||
SHA1.get ctx
|
||||
|
||||
let analyze stream =
|
||||
let where = Hashtbl.create 0x100 in
|
||||
let child = Hashtbl.create 0x100 in
|
||||
let sizes = Hashtbl.create 0x100 in
|
||||
|
||||
let replace tbl k v = match Hashtbl.find_opt tbl k with
|
||||
| Some v' -> if v' < v then Hashtbl.replace tbl k v
|
||||
| _ -> Hashtbl.add tbl k v in
|
||||
|
||||
let rec go acc tmp decoder = let open Lwt.Infix in
|
||||
match First_pass.decode decoder with
|
||||
| `Await decoder ->
|
||||
( stream () >>= function
|
||||
| Some str ->
|
||||
let tmp = Bigstringaf.of_string str ~off:0 ~len:(String.length str) in
|
||||
go acc tmp (First_pass.src decoder tmp 0 (String.length str))
|
||||
| None -> failwith "Truncated PACK file" )
|
||||
| `Peek decoder ->
|
||||
let keep = First_pass.src_rem decoder in
|
||||
( stream () >>= function
|
||||
| Some str ->
|
||||
let tmp = Bigstringaf.create (keep + String.length str) in
|
||||
Bigstringaf.blit tmp ~src_off:0 tmp ~dst_off:0 ~len:keep ;
|
||||
Bigstringaf.blit_from_string str ~src_off:0 tmp ~dst_off:keep
|
||||
~len:(String.length str) ;
|
||||
go acc tmp (First_pass.src decoder tmp 0 (keep + String.length str))
|
||||
| None -> failwith "Truncated PACK file" )
|
||||
| `Entry ({ First_pass.kind= Base _; offset; size; _ }, decoder) ->
|
||||
Hashtbl.add where offset (First_pass.count decoder - 1) ;
|
||||
Hashtbl.add sizes offset size ;
|
||||
go (Verify.unresolved_base ~cursor:offset :: acc) tmp decoder
|
||||
| `Entry ({ First_pass.kind= Ofs { sub= v; source; target; }
|
||||
; offset; _ }, decoder) ->
|
||||
Hashtbl.add where offset (First_pass.count decoder - 1) ;
|
||||
replace sizes Int64.(sub offset (of_int v)) source ;
|
||||
replace sizes offset target ;
|
||||
( try let vs = Hashtbl.find child (`Ofs Int64.(sub offset (of_int v))) in
|
||||
Hashtbl.replace child (`Ofs Int64.(sub offset (of_int v))) (offset :: vs)
|
||||
with _ -> Hashtbl.add child (`Ofs Int64.(sub offset (of_int v))) [ offset ] ) ;
|
||||
go (Verify.unresolved_node :: acc) tmp decoder
|
||||
| `Entry ({ First_pass.kind= Ref { ptr; target; source; }
|
||||
; offset; _ }, decoder) ->
|
||||
Hashtbl.add where offset (First_pass.count decoder - 1) ;
|
||||
replace sizes offset (Stdlib.max target source) ;
|
||||
( try let vs = Hashtbl.find child (`Ref ptr) in
|
||||
Hashtbl.replace child (`Ref ptr) (offset :: vs)
|
||||
with _ -> Hashtbl.add child (`Ref ptr) [ offset ] ) ;
|
||||
go (Verify.unresolved_node :: acc) tmp decoder
|
||||
| `End _hash ->
|
||||
let where ~cursor = Hashtbl.find where cursor in
|
||||
let children ~cursor ~uid =
|
||||
match Hashtbl.find_opt child (`Ofs cursor),
|
||||
Hashtbl.find_opt child (`Ref uid) with
|
||||
| Some a, Some b -> List.sort_uniq compare (a @ b)
|
||||
| Some x, None | None, Some x -> x
|
||||
| None, None -> [] in
|
||||
let weight ~cursor = Hashtbl.find sizes cursor in
|
||||
let oracle = { Carton.Dec.where
|
||||
; Carton.Dec.children
|
||||
; Carton.Dec.digest
|
||||
; Carton.Dec.weight } in
|
||||
Lwt.return (List.rev acc, oracle)
|
||||
| `Malformed err -> failwith err in
|
||||
|
||||
let o = Bigstringaf.create De.io_buffer_size in
|
||||
let allocate _ = De.make_window ~bits:15 in
|
||||
let decoder = First_pass.decoder ~o ~allocate `Manual in
|
||||
let open Lwt.Infix in
|
||||
go [] De.bigstring_empty decoder >>= fun (matrix, oracle) ->
|
||||
Lwt.return (Array.of_list matrix, oracle)
|
||||
|
||||
let stream_of_string str =
|
||||
let closed = ref false in
|
||||
fun () -> match !closed with
|
||||
| true -> Lwt.return_none
|
||||
| false -> closed := true ; Lwt.return_some str
|
||||
(* XXX(dinosaure): we have the full-control between [to_octets]/[of_octets]
|
||||
and we are currently not able to generate a PACK file with OBJ_REF objects.
|
||||
That mostly means that only one pass is enough to extract all objects!
|
||||
OBJ_OFS objects need only already consumed objects. *)
|
||||
|
||||
let map contents ~pos len =
|
||||
let off = Int64.to_int pos in
|
||||
let len = min (String.length contents - off) len in
|
||||
Bigstringaf.of_string ~off:(Int64.to_int pos) ~len contents
|
||||
let len = min (Bigstringaf.length contents - off) len in
|
||||
Bigstringaf.sub ~off:(Int64.to_int pos) ~len contents
|
||||
|
||||
let unpack contents =
|
||||
let open Lwt.Infix in
|
||||
analyze (stream_of_string contents) >>= fun (matrix, oracle) ->
|
||||
let z = De.bigstring_create De.io_buffer_size in
|
||||
let analyze store contents =
|
||||
let len = String.length contents in
|
||||
let contents = Bigstringaf.of_string contents ~off:0 ~len in
|
||||
let allocate bits = De.make_window ~bits in
|
||||
let never _ = assert false in
|
||||
let pack = Carton.Dec.make contents ~allocate ~z ~uid_ln:SHA1.length
|
||||
~uid_rw:SHA1.of_raw_string never in
|
||||
Verify.verify ~threads:4 pack ~map ~oracle ~verbose:ignore ~matrix >>= fun () ->
|
||||
let index = Hashtbl.create (Array.length matrix) in
|
||||
let iter v =
|
||||
let offset = Verify.offset_of_status v in
|
||||
let hash = Verify.uid_of_status v in
|
||||
Hashtbl.add index hash offset in
|
||||
Array.iter iter matrix ;
|
||||
let pack =
|
||||
Carton.Dec.make contents ~allocate ~z ~uid_ln:SHA1.length
|
||||
~uid_rw:SHA1.of_raw_string (Hashtbl.find index) in
|
||||
init_store ()
|
||||
>|= Rresult.R.reword_error (Rresult.R.msgf "%a" Store.pp_error)
|
||||
>|= Rresult.R.failwith_error_msg >>= fun store ->
|
||||
let rec go commit idx =
|
||||
if idx < Array.length matrix
|
||||
then
|
||||
let cursor = Verify.offset_of_status matrix.(idx) in
|
||||
let pack = Carton.Dec.make contents ~allocate
|
||||
~z:(De.bigstring_create De.io_buffer_size)
|
||||
~uid_ln:SHA1.length ~uid_rw:SHA1.of_raw_string never in
|
||||
let objects = Hashtbl.create 0x100 in
|
||||
|
||||
let rec go head decoder = let open Lwt.Infix in
|
||||
match First_pass.decode decoder with
|
||||
| `Await _decoder
|
||||
| `Peek _decoder -> failwith "Truncated PACK file"
|
||||
| `Entry ({ First_pass.kind= Base _; offset= cursor; _ }, decoder) ->
|
||||
let weight = Carton.Dec.weight_of_offset ~map pack ~weight:Carton.Dec.null cursor in
|
||||
let raw = Carton.Dec.make_raw ~weight in
|
||||
let v = Carton.Dec.of_offset ~map pack raw ~cursor in
|
||||
Hashtbl.add objects cursor v ;
|
||||
let kind = match Carton.Dec.kind v with
|
||||
| `A -> `Commit
|
||||
| `B -> `Tree
|
||||
|
@ -390,19 +306,42 @@ let unpack contents =
|
|||
| `D -> `Tag in
|
||||
Store.write_inflated store ~kind
|
||||
(Cstruct.of_bigarray ~off:0 ~len:(Carton.Dec.len v) (Carton.Dec.raw v)) >>= fun hash ->
|
||||
( if kind = `Commit
|
||||
then Store.shallow store hash
|
||||
else Lwt.return_unit ) >>= fun () ->
|
||||
go (if kind = `Commit then Some hash else None) (succ idx)
|
||||
else Lwt.return commit in
|
||||
go None 0 >>= fun head -> Lwt.return (store, head)
|
||||
( match kind with
|
||||
| `Commit -> go (Some hash) decoder
|
||||
| _ -> go head decoder )
|
||||
| `Entry ({ First_pass.kind= Ofs { sub= s; _ } ; offset= cursor; _ }, decoder) ->
|
||||
let weight = Carton.Dec.weight_of_offset ~map pack ~weight:Carton.Dec.null cursor in
|
||||
let source = Int64.sub cursor (Int64.of_int s) in
|
||||
let v = Carton.Dec.copy ~flip:true ~weight (Hashtbl.find objects source) (* XXX(dinosaure): should never fail *) in
|
||||
let v = Carton.Dec.of_offset_with_source ~map pack v ~cursor in
|
||||
Hashtbl.add objects cursor v ;
|
||||
let kind = match Carton.Dec.kind v with
|
||||
| `A -> `Commit
|
||||
| `B -> `Tree
|
||||
| `C -> `Blob
|
||||
| `D -> `Tag in
|
||||
Store.write_inflated store ~kind
|
||||
(Cstruct.of_bigarray ~off:0 ~len:(Carton.Dec.len v) (Carton.Dec.raw v)) >>= fun hash ->
|
||||
( match kind with
|
||||
| `Commit -> go (Some hash) decoder
|
||||
| _ -> go head decoder )
|
||||
| `Entry ({ First_pass.kind= Ref _; _ }, _decoder) ->
|
||||
failwith "Invalid PACK file (OBJ_REF)"
|
||||
| `End _hash -> Lwt.return head
|
||||
| `Malformed err -> failwith err in
|
||||
let decoder = First_pass.decoder ~o:(Bigstringaf.create De.io_buffer_size) ~allocate `Manual in
|
||||
let decoder = First_pass.src decoder contents 0 len in
|
||||
go None decoder
|
||||
|
||||
let of_octets ctx ~remote data =
|
||||
(* TODO maybe recover edn and branch from data as well? *)
|
||||
let open Lwt.Infix in
|
||||
(* TODO maybe recover edn and branch from data as well? *)
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
unpack data >>= fun (store, head) ->
|
||||
init_store ()
|
||||
>|= Rresult.R.reword_error (Rresult.R.msgf "%a" Store.pp_error)
|
||||
>|= Rresult.R.failwith_error_msg >>= fun store ->
|
||||
analyze store data >>= fun head ->
|
||||
let edn, branch = split_url remote in
|
||||
Lwt.return_ok { ctx ; edn ; branch ; store ; head; })
|
||||
(fun exn ->
|
||||
|
|
Loading…
Reference in a new issue