commit
32ea8d2224
4 changed files with 84 additions and 73 deletions
|
@ -46,7 +46,8 @@ depends: [
|
||||||
"uri"
|
"uri"
|
||||||
"fmt" {>= "0.8.7"}
|
"fmt" {>= "0.8.7"}
|
||||||
"cmarkit" {>= "0.3.0"}
|
"cmarkit" {>= "0.3.0"}
|
||||||
"tar" {< "3.0.0"}
|
"tar" {>= "3.0.0"}
|
||||||
|
"tar-unix" {>= "3.0.0"}
|
||||||
"owee"
|
"owee"
|
||||||
"solo5-elftool" {>= "0.3.0"}
|
"solo5-elftool" {>= "0.3.0"}
|
||||||
"decompress" {>= "1.5.0"}
|
"decompress" {>= "1.5.0"}
|
||||||
|
|
|
@ -486,7 +486,13 @@ let routes ~datadir ~cachedir ~configdir ~expired_jobs =
|
||||||
|> Option.to_result ~none:(`Msg "bad finish time") |> Result.map Int64.of_int
|
|> Option.to_result ~none:(`Msg "bad finish time") |> Result.map Int64.of_int
|
||||||
|> Lwt.return |> if_error "Internal server error" >>= fun finish ->
|
|> Lwt.return |> if_error "Internal server error" >>= fun finish ->
|
||||||
Dream.stream ~headers:["Content-Type", "application/tar+gzip"]
|
Dream.stream ~headers:["Content-Type", "application/tar+gzip"]
|
||||||
(Dream_tar.targz_response datadir finish artifacts)
|
(fun stream ->
|
||||||
|
let+ r = Dream_tar.targz_response datadir finish artifacts stream in
|
||||||
|
match r with
|
||||||
|
| Ok () -> ()
|
||||||
|
| Error _ ->
|
||||||
|
Log.warn (fun m -> m "error assembling gzipped tar archive");
|
||||||
|
())
|
||||||
|> Lwt_result.ok
|
|> Lwt_result.ok
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
144
lib/dream_tar.ml
144
lib/dream_tar.ml
|
@ -1,47 +1,36 @@
|
||||||
open Lwt.Infix
|
module High : sig
|
||||||
|
type t
|
||||||
|
type 'a s = 'a Lwt.t
|
||||||
|
|
||||||
module Writer = struct
|
external inj : 'a s -> ('a, t) Tar.io = "%identity"
|
||||||
type out_channel =
|
external prj : ('a, t) Tar.io -> 'a s = "%identity"
|
||||||
{ mutable gz : Gz.Def.encoder
|
end = struct
|
||||||
; ic : Cstruct.t
|
type t
|
||||||
; oc : Cstruct.t
|
type 'a s = 'a Lwt.t
|
||||||
; stream : Dream.stream }
|
|
||||||
|
|
||||||
type 'a t = 'a Lwt.t
|
external inj : 'a -> 'b = "%identity"
|
||||||
|
external prj : 'a -> 'b = "%identity"
|
||||||
let really_write ({ oc; stream; _ } as state) cs =
|
|
||||||
let rec until_await gz =
|
|
||||||
match Gz.Def.encode gz with
|
|
||||||
| `Await gz -> state.gz <- gz ; Lwt.return_unit
|
|
||||||
| `Flush gz ->
|
|
||||||
let max = Cstruct.length oc - Gz.Def.dst_rem gz in
|
|
||||||
let str = Cstruct.to_string ~len:max oc in
|
|
||||||
Dream.write stream str >>= fun () ->
|
|
||||||
let { Cstruct.buffer; off= cs_off; len= cs_len; } = oc in
|
|
||||||
until_await (Gz.Def.dst gz buffer cs_off cs_len)
|
|
||||||
| `End _gz -> assert false in
|
|
||||||
if Cstruct.length cs = 0
|
|
||||||
then Lwt.return_unit
|
|
||||||
else ( let { Cstruct.buffer; off; len; } = cs in
|
|
||||||
let gz = Gz.Def.src state.gz buffer off len in
|
|
||||||
until_await gz )
|
|
||||||
end
|
end
|
||||||
|
|
||||||
module HW = Tar.HeaderWriter(Lwt)(Writer)
|
let value v = Tar.High (High.inj v)
|
||||||
|
|
||||||
let write_block (header : Tar.Header.t) lpath ({ Writer.ic= buf; _ } as state) =
|
let ok_value v = value (Lwt_result.ok v)
|
||||||
HW.write ~level:Tar.Header.Ustar header state >>= fun () ->
|
|
||||||
Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string lpath) >>= fun ic ->
|
let run t stream =
|
||||||
let rec loop () =
|
let rec run : type a. (a, 'err, High.t) Tar.t -> (a, 'err) result Lwt.t =
|
||||||
let { Cstruct.buffer; off; len; } = buf in
|
function
|
||||||
Lwt_io.read_into_bigstring ic buffer off len >>= function
|
| Tar.Write str ->
|
||||||
| 0 -> Lwt.return ()
|
(* Can this not fail?!? Obviously, it can, but we never know?? *)
|
||||||
| len' ->
|
Lwt_result.ok (Dream.write stream str)
|
||||||
Writer.really_write state (Cstruct.sub buf 0 len') >>= fun () ->
|
| Tar.Seek _ | Tar.Read _ | Tar.Really_read _ -> assert false
|
||||||
loop ()
|
| Tar.Return value -> Lwt.return value
|
||||||
|
| Tar.High value -> High.prj value
|
||||||
|
| Tar.Bind (x, f) ->
|
||||||
|
let open Lwt_result.Syntax in
|
||||||
|
let* v = run x in
|
||||||
|
run (f v)
|
||||||
in
|
in
|
||||||
loop () >>= fun () ->
|
run t
|
||||||
Writer.really_write state (Tar.Header.zero_padding header)
|
|
||||||
|
|
||||||
let header_of_file mod_time (file : Builder_db.file) =
|
let header_of_file mod_time (file : Builder_db.file) =
|
||||||
let file_mode = if Fpath.is_prefix Fpath.(v "bin/") file.filepath then
|
let file_mode = if Fpath.is_prefix Fpath.(v "bin/") file.filepath then
|
||||||
|
@ -51,38 +40,53 @@ let header_of_file mod_time (file : Builder_db.file) =
|
||||||
in
|
in
|
||||||
Tar.Header.make ~file_mode ~mod_time (Fpath.to_string file.filepath) (Int64.of_int file.size)
|
Tar.Header.make ~file_mode ~mod_time (Fpath.to_string file.filepath) (Int64.of_int file.size)
|
||||||
|
|
||||||
let targz_response datadir finish (files : Builder_db.file list) (stream : Dream.stream) =
|
let contents datadir file : unit -> (string option, _, _) Tar.t =
|
||||||
let state =
|
let state = ref `Initial in
|
||||||
let ic = Cstruct.create (4 * 4 * 1024) in
|
let dispenser () =
|
||||||
let oc = Cstruct.create 4096 in
|
let ( let* ) = Tar.( let* ) in
|
||||||
let gz =
|
let src = Fpath.append datadir (Model.artifact_path file) in
|
||||||
let w = De.Lz77.make_window ~bits:15 in
|
let* state' =
|
||||||
let q = De.Queue.create 0x1000 in
|
match !state with
|
||||||
let mtime = Int32.of_float (Unix.gettimeofday ()) in
|
| `Initial ->
|
||||||
let gz = Gz.Def.encoder `Manual `Manual ~mtime Gz.Unix ~q ~w ~level:4 in
|
let* fd = ok_value (Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string src)) in
|
||||||
let { Cstruct.buffer; off; len; } = oc in
|
let s = `Active fd in
|
||||||
Gz.Def.dst gz buffer off len
|
state := s; Tar.return (Ok s)
|
||||||
|
| `Active _ | `Closed as s -> Tar.return (Ok s)
|
||||||
in
|
in
|
||||||
{ Writer.gz; ic; oc; stream; }
|
match state' with
|
||||||
|
| `Closed -> Tar.return (Ok None)
|
||||||
|
| `Active fd ->
|
||||||
|
let* data = ok_value (Lwt_io.read ~count:65536 fd) in
|
||||||
|
if String.length data = 0 then begin
|
||||||
|
state := `Closed;
|
||||||
|
let* () = ok_value (Lwt_io.close fd) in
|
||||||
|
Tar.return (Ok None)
|
||||||
|
end else
|
||||||
|
Tar.return (Ok (Some data))
|
||||||
in
|
in
|
||||||
Lwt_list.iter_s (fun file ->
|
dispenser
|
||||||
let hdr = header_of_file finish file in
|
|
||||||
write_block hdr Fpath.(datadir // Model.artifact_path file) state)
|
let entries datadir finish files =
|
||||||
files >>= fun () ->
|
let files =
|
||||||
Writer.really_write state Tar.Header.zero_block >>= fun () ->
|
List.map (fun file ->
|
||||||
Writer.really_write state Tar.Header.zero_block >>= fun () ->
|
let hdr = header_of_file finish file in
|
||||||
(* assert (Gz.Def.encode gz = `Await) *)
|
let level = Some Tar.Header.Posix in
|
||||||
let rec until_end gz = match Gz.Def.encode gz with
|
(level, hdr, contents datadir file)
|
||||||
| `Await _gz -> assert false
|
)
|
||||||
| `Flush gz | `End gz as flush_or_end ->
|
files
|
||||||
let max = Cstruct.length state.oc - Gz.Def.dst_rem gz in
|
|
||||||
let str = Cstruct.to_string ~len:max state.oc in
|
|
||||||
Dream.write stream str >>= fun () -> match flush_or_end with
|
|
||||||
| `Flush gz ->
|
|
||||||
let { Cstruct.buffer; off= cs_off; len= cs_len; } = state.oc in
|
|
||||||
until_end (Gz.Def.dst gz buffer cs_off cs_len)
|
|
||||||
| `End _ -> Lwt.return_unit
|
|
||||||
in
|
in
|
||||||
until_end (Gz.Def.src state.gz De.bigstring_empty 0 0) >>= fun () ->
|
let files = ref files in
|
||||||
Dream.flush stream >>= fun () ->
|
fun () -> match !files with
|
||||||
Dream.close stream
|
| [] -> Tar.return (Ok None)
|
||||||
|
| f :: fs -> files := fs; Tar.return (Ok (Some f))
|
||||||
|
|
||||||
|
let targz_response datadir finish files stream =
|
||||||
|
let entries : (_, _) Tar.entries = entries datadir finish files in
|
||||||
|
let global_hdr =
|
||||||
|
Tar.Header.Extended.make
|
||||||
|
~comment:"Tar file produced by builder-web.%%VERSION_NUM%%"
|
||||||
|
()
|
||||||
|
in
|
||||||
|
let finish32 = Int64.to_int32 finish in
|
||||||
|
Logs.err (fun m -> m "finished at %ld (%Ld)" finish32 finish);
|
||||||
|
run (Tar_gz.out_gzipped ~level:9 ~mtime:finish32 Gz.Unix (Tar.out ~global_hdr entries)) stream
|
||||||
|
|
2
lib/dune
2
lib/dune
|
@ -1,5 +1,5 @@
|
||||||
(library
|
(library
|
||||||
(name builder_web)
|
(name builder_web)
|
||||||
(libraries builder builder_db dream tyxml bos duration hex caqti-lwt
|
(libraries builder builder_db dream tyxml bos duration hex caqti-lwt
|
||||||
opamdiff ptime.clock.os cmarkit tar owee solo5-elftool decompress.de
|
opamdiff ptime.clock.os cmarkit tar tar.gz tar-unix owee solo5-elftool decompress.de
|
||||||
decompress.gz uri))
|
decompress.gz uri))
|
||||||
|
|
Loading…
Reference in a new issue