Merge pull request 'Stream in/out the PACK file' (#33) from lwt-pause-and-stream into main

Reviewed-on: #33
This commit is contained in:
dinosaure 2024-05-17 07:37:56 +00:00
commit 8c941e2431
3 changed files with 56 additions and 25 deletions

View file

@ -95,8 +95,8 @@ let pull ~quiet store =
let save store filename = let save store filename =
let oc = open_out filename in let oc = open_out filename in
Git_kv.to_octets store >>= fun contents -> let stream = Git_kv.to_octets store in
output_string oc contents ; Lwt_stream.iter_p (fun str -> output_string oc str; Lwt.return_unit) stream >>= fun () ->
close_out oc ; close_out oc ;
Lwt.return (Ok 0) Lwt.return (Ok 0)
@ -164,7 +164,8 @@ let run remote = function
Bytes.unsafe_to_string bs in Bytes.unsafe_to_string bs in
Lwt_main.run Lwt_main.run
( Git_unix.ctx (Happy_eyeballs_lwt.create ()) >>= fun ctx -> ( Git_unix.ctx (Happy_eyeballs_lwt.create ()) >>= fun ctx ->
Git_kv.of_octets ctx ~remote contents >>= function let stream = Lwt_stream.of_list [ contents ] in
Git_kv.of_octets ctx ~remote stream >>= function
| Ok t -> repl t Unix.stdin | Ok t -> repl t Unix.stdin
| Error (`Msg err) -> Fmt.failwith "%s." err ) | Error (`Msg err) -> Fmt.failwith "%s." err )

View file

@ -256,39 +256,73 @@ let pack t ?(level= 4) ~commit stream =
let to_octets ?level t = match t.head with let to_octets ?level t = match t.head with
| None -> | None ->
Lwt.return "PACK\000\000\000\002\000\000\000\000\ let str = "PACK\000\000\000\002\000\000\000\000\
\x02\x9d\x08\x82\x3b\xd8\xa8\xea\xb5\x10\xad\x6a\xc7\x5c\x82\x3c\xfd\x3e\xd3\x1e" \x02\x9d\x08\x82\x3b\xd8\xa8\xea\xb5\x10\xad\x6a\xc7\x5c\x82\x3c\xfd\x3e\xd3\x1e" in
Lwt_stream.of_list [ str ]
| Some commit -> | Some commit ->
let buf = Buffer.create 0x100 in let stream, push = Lwt_stream.create () in
let stream = Option.iter (Buffer.add_string buf) in Lwt.async (fun () -> pack ?level t.store ~commit push);
let open Lwt.Infix in stream
pack ?level t.store ~commit stream >|= fun () ->
Buffer.contents buf
(* XXX(dinosaure): we have the full-control between [to_octets]/[of_octets] (* XXX(dinosaure): we have the full-control between [to_octets]/[of_octets]
and we are currently not able to generate a PACK file with OBJ_REF objects. and we are currently not able to generate a PACK file with OBJ_REF objects.
That mostly means that only one pass is enough to extract all objects! That mostly means that only one pass is enough to extract all objects!
OBJ_OFS objects need only already consumed objects. *) OBJ_OFS objects need only already consumed objects. *)
let map contents ~pos len = let map buf ~pos len =
let str = Buffer.contents buf in
let off = Int64.to_int pos in let off = Int64.to_int pos in
let len = min (Bigstringaf.length contents - off) len in let len = min (String.length str - off) len in
Bigstringaf.sub ~off:(Int64.to_int pos) ~len contents Bigstringaf.of_string str ~off:(Int64.to_int pos) ~len
let analyze store contents = let blit_from_string src src_off dst dst_off len =
let len = String.length contents in Bigstringaf.blit_from_string src ~src_off dst ~dst_off ~len
let contents = Bigstringaf.of_string contents ~off:0 ~len in
let read stream =
let ke = Ke.Rke.create ~capacity:0x1000 Bigarray.char in
let rec go filled input =
match Ke.Rke.N.peek ke with
| [] -> begin
let open Lwt.Infix in
Lwt_stream.get stream >>= function
| Some str ->
Ke.Rke.N.push ke ~blit:blit_from_string ~length:String.length str;
go filled input
| None -> Lwt.return filled end
| src :: _ ->
let src = Cstruct.of_bigarray src in
let len = min (Cstruct.length input) (Cstruct.length src) in
Cstruct.blit src 0 input 0 len;
Ke.Rke.N.shift_exn ke len;
if len < Cstruct.length input
then go (filled + len) (Cstruct.shift input len)
else Lwt.return (filled + len) in
fun input -> go 0 input
let analyze store stream =
let tmp = Cstruct.create 0x1000 in
let buf = Buffer.create 0x1000 in
let read_cstruct tmp =
let open Lwt.Infix in
read stream tmp >>= fun len ->
Buffer.add_string buf (Cstruct.to_string ~off:0 ~len tmp);
Lwt.return len in
let allocate bits = De.make_window ~bits in let allocate bits = De.make_window ~bits in
let never _ = assert false in let never _ = assert false in
let pack = Carton.Dec.make contents ~allocate let pack = Carton.Dec.make buf ~allocate
~z:(De.bigstring_create De.io_buffer_size) ~z:(De.bigstring_create De.io_buffer_size)
~uid_ln:SHA1.length ~uid_rw:SHA1.of_raw_string never in ~uid_ln:SHA1.length ~uid_rw:SHA1.of_raw_string never in
let objects = Hashtbl.create 0x100 in let objects = Hashtbl.create 0x100 in
let rec go head decoder = let open Lwt.Infix in let rec go head decoder = let open Lwt.Infix in
match First_pass.decode decoder with match First_pass.decode decoder with
| `Await _decoder | `Await decoder ->
| `Peek _decoder -> failwith "Truncated PACK file" read_cstruct tmp >>= fun len ->
go head (First_pass.src decoder (Cstruct.to_bigarray tmp) 0 len)
| `Peek decoder ->
let keep = First_pass.src_rem decoder in
read_cstruct (Cstruct.shift tmp keep) >>= fun len ->
go head (First_pass.src decoder (Cstruct.to_bigarray tmp) 0 (keep + len))
| `Entry ({ First_pass.kind= Base _; offset= cursor; _ }, decoder) -> | `Entry ({ First_pass.kind= Base _; offset= cursor; _ }, decoder) ->
let weight = Carton.Dec.weight_of_offset ~map pack ~weight:Carton.Dec.null cursor in let weight = Carton.Dec.weight_of_offset ~map pack ~weight:Carton.Dec.null cursor in
let raw = Carton.Dec.make_raw ~weight in let raw = Carton.Dec.make_raw ~weight in
@ -325,7 +359,6 @@ let analyze store contents =
| `End _hash -> Lwt.return head | `End _hash -> Lwt.return head
| `Malformed err -> failwith err in | `Malformed err -> failwith err in
let decoder = First_pass.decoder ~o:(Bigstringaf.create De.io_buffer_size) ~allocate `Manual in let decoder = First_pass.decoder ~o:(Bigstringaf.create De.io_buffer_size) ~allocate `Manual in
let decoder = First_pass.src decoder contents 0 len in
go None decoder go None decoder
let of_octets ctx ~remote data = let of_octets ctx ~remote data =
@ -339,9 +372,6 @@ let of_octets ctx ~remote data =
let edn, branch = split_url remote in let edn, branch = split_url remote in
Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; in_closure= false; head; }) Lwt.return_ok { ctx ; edn ; branch ; store ; committed= None; in_closure= false; head; })
(fun exn -> (fun exn ->
Fmt.epr ">>> Got an exception: %s.\n%!" (Printexc.to_string exn) ;
Fmt.epr ">>> %s.\n%!"
(Printexc.raw_backtrace_to_string (Printexc.get_raw_backtrace ())) ;
Lwt.return_error (`Msg "Invalid PACK file")) Lwt.return_error (`Msg "Invalid PACK file"))
module Make (Pclock : Mirage_clock.PCLOCK) = struct module Make (Pclock : Mirage_clock.PCLOCK) = struct

View file

@ -43,12 +43,12 @@ val connect : Mimic.ctx -> string -> t Lwt.t
val branch : t -> Git.Reference.t val branch : t -> Git.Reference.t
(** [branch t] returns the branch used by the given [t]. *) (** [branch t] returns the branch used by the given [t]. *)
val to_octets : ?level:int -> t -> string Lwt.t val to_octets : ?level:int -> t -> string Lwt_stream.t
(** [to_octets ?level store] returns a serialized version of the given [store]. (** [to_octets ?level store] returns a serialized version of the given [store].
[level] is the {i zlib} level compression used for Git object (between [0] [level] is the {i zlib} level compression used for Git object (between [0]
and [9] including), defaults to [4]. *) and [9] including), defaults to [4]. *)
val of_octets : Mimic.ctx -> remote:string -> string -> val of_octets : Mimic.ctx -> remote:string -> string Lwt_stream.t ->
(t, [> `Msg of string]) result Lwt.t (t, [> `Msg of string]) result Lwt.t
(** [of_octets ctx ~remote contents] tries to re-create a {!type:t} from its (** [of_octets ctx ~remote contents] tries to re-create a {!type:t} from its
serialized version [contents]. This function does not do I/O and the serialized version [contents]. This function does not do I/O and the