Filesystem transactions and cleanup
This commit is contained in:
parent
643b156e07
commit
7b5119189d
3 changed files with 119 additions and 42 deletions
|
@ -22,7 +22,18 @@ type 'a t = {
|
|||
|
||||
let realm = "builder-web"
|
||||
|
||||
let init_datadir datadir =
|
||||
let open Rresult.R.Infix in
|
||||
Bos.OS.Dir.exists datadir >>= (fun exists ->
|
||||
if exists
|
||||
then Ok ()
|
||||
else Error (`Msg "Datadir does not exist")) >>= fun () ->
|
||||
Bos.OS.Dir.create ~path:false (Model.staging datadir) >>| fun _ -> ()
|
||||
|
||||
let init ?(pool_size = 10) dbpath datadir =
|
||||
Rresult.R.bind
|
||||
(init_datadir datadir) @@
|
||||
fun () ->
|
||||
match Caqti_lwt.connect_pool
|
||||
~max_size:pool_size
|
||||
(Uri.make ~scheme:"sqlite3" ~path:dbpath ~query:["create", ["false"]] ())
|
||||
|
@ -32,18 +43,20 @@ let init ?(pool_size = 10) dbpath datadir =
|
|||
| Ok pool ->
|
||||
Lwt_main.run (Caqti_lwt.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) ->
|
||||
Db.find Builder_db.get_application_id () >>= fun application_id ->
|
||||
Db.find Builder_db.get_version () >>= fun version ->
|
||||
Lwt_result.return (application_id, version))
|
||||
Db.find Builder_db.get_version () >>= (fun version ->
|
||||
if (application_id, version) = Builder_db.(application_id, current_version)
|
||||
then Lwt_result.return ()
|
||||
else Lwt_result.fail (`Wrong_version (application_id, version)))
|
||||
>>= fun () ->
|
||||
Model.cleanup_staging datadir (module Db))
|
||||
pool)
|
||||
|> (function
|
||||
| Error e -> Error (e :> [> db_error | `Wrong_version of int32 * int64 ])
|
||||
| Ok (application_id, version) ->
|
||||
if application_id = Builder_db.application_id && version = Builder_db.current_version
|
||||
then Ok {
|
||||
pool = (pool :> (Caqti_lwt.connection, [> db_error ]) Caqti_lwt.Pool.t);
|
||||
datadir;
|
||||
}
|
||||
else Error (`Wrong_version (application_id, version)))
|
||||
| Ok () ->
|
||||
Ok {
|
||||
pool = (pool :> (Caqti_lwt.connection, [> db_error ]) Caqti_lwt.Pool.t);
|
||||
datadir;
|
||||
})
|
||||
|
||||
let pp_exec ppf (job, uuid, _, _, _, _, _) =
|
||||
Format.fprintf ppf "%s(%a)" job.Builder.name Uuidm.pp uuid
|
||||
|
|
125
lib/model.ml
125
lib/model.ml
|
@ -18,6 +18,8 @@ let not_found = function
|
|||
| None -> Lwt.return (Error `Not_found :> (_, [> error ]) result)
|
||||
| Some v -> Lwt_result.return v
|
||||
|
||||
let staging datadir = Fpath.(datadir / "_staging")
|
||||
|
||||
let read_file filepath =
|
||||
Lwt.try_bind
|
||||
(fun () -> Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string filepath))
|
||||
|
@ -74,6 +76,37 @@ let user username (module Db : CONN) =
|
|||
Db.find_opt Builder_db.User.get_user username >|=
|
||||
Option.map snd
|
||||
|
||||
let cleanup_staging datadir (module Db : Caqti_lwt.CONNECTION) =
|
||||
let cleanup_staged staged =
|
||||
match Uuidm.of_string (Fpath.to_string staged) with
|
||||
| None ->
|
||||
Log.warn (fun m -> m "Non-uuid staged files: %a" Fpath.pp
|
||||
Fpath.(staging datadir // staged));
|
||||
Lwt.return (Bos.OS.Path.delete ~recurse:true Fpath.(staging datadir // staged))
|
||||
| Some uuid ->
|
||||
let staged = Fpath.(staging datadir // staged) in
|
||||
Db.find_opt Builder_db.Build.get_by_uuid uuid >>= function
|
||||
| Some (_id, build) ->
|
||||
Db.find Builder_db.Job.get build.job_id >>= fun job_name ->
|
||||
let destdir = Fpath.(datadir / job_name / Uuidm.to_string uuid) in
|
||||
Lwt.return (Bos.OS.Path.move staged destdir)
|
||||
| None ->
|
||||
Lwt.return (Bos.OS.Path.delete ~recurse:true Fpath.(staging datadir // staged))
|
||||
in
|
||||
Lwt.return (Bos.OS.Dir.contents ~rel:true (staging datadir)) >>= fun stageds ->
|
||||
Lwt_result.ok @@
|
||||
Lwt_list.iter_s
|
||||
(fun staged ->
|
||||
Lwt.map (function
|
||||
| Error (_ as e) ->
|
||||
Log.warn (fun m -> m "Failed cleaning up staged files %a in %a: %a"
|
||||
Fpath.pp staged
|
||||
Fpath.pp Fpath.(staging datadir // staged)
|
||||
pp_error e)
|
||||
| Ok () -> ())
|
||||
(cleanup_staged staged))
|
||||
stageds
|
||||
|
||||
let save file data =
|
||||
let open Lwt.Infix in
|
||||
Lwt.catch
|
||||
|
@ -91,45 +124,65 @@ let save_exec build_dir exec =
|
|||
let cs = Builder.Asn.exec_to_cs exec in
|
||||
save Fpath.(build_dir / "full") (Cstruct.to_string cs)
|
||||
|
||||
let save_file dir (filepath, data) =
|
||||
let save_file dir staging (filepath, data) =
|
||||
let size = String.length data in
|
||||
let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) in
|
||||
let localpath = Fpath.append dir filepath in
|
||||
Lwt_result.lift (Bos.OS.Dir.create (Fpath.parent localpath)) >>= fun _ ->
|
||||
save localpath data >|= fun () ->
|
||||
let destpath = Fpath.append staging filepath in
|
||||
Lwt_result.lift (Bos.OS.Dir.create (Fpath.parent destpath)) >>= fun _ ->
|
||||
save destpath data >|= fun () ->
|
||||
{ Builder_db.filepath; localpath; sha256; size }
|
||||
|
||||
let save_files dir files =
|
||||
let save_files dir staging files =
|
||||
List.fold_left
|
||||
(fun r file ->
|
||||
r >>= fun acc ->
|
||||
save_file dir file >>= fun file ->
|
||||
save_file dir staging file >>= fun file ->
|
||||
Lwt_result.return (file :: acc))
|
||||
(Lwt_result.return [])
|
||||
files
|
||||
|
||||
let save_all basedir ((job, uuid, _, _, _, _, artifacts) as exec) =
|
||||
let save_all basedir staging_dir ((job, uuid, _, _, _, _, artifacts) as exec) =
|
||||
let build_dir = Fpath.(basedir / job.Builder.name / Uuidm.to_string uuid) in
|
||||
let input_dir = Fpath.(build_dir / "input") in
|
||||
let output_dir = Fpath.(build_dir / "output") in
|
||||
Lwt.return (Bos.OS.Dir.create build_dir) >>= (fun created ->
|
||||
let input_dir = Fpath.(build_dir / "input")
|
||||
and staging_input_dir = Fpath.(staging_dir / "input") in
|
||||
let output_dir = Fpath.(build_dir / "output")
|
||||
and staging_output_dir = Fpath.(staging_dir / "output") in
|
||||
Lwt.return (Bos.OS.Dir.create staging_dir) >>= (fun created ->
|
||||
if not created
|
||||
then Lwt_result.fail (`Msg "build directory already exists")
|
||||
else Lwt_result.return ()) >>= fun () ->
|
||||
Lwt.return (Bos.OS.Dir.create input_dir) >>= fun _ ->
|
||||
Lwt.return (Bos.OS.Dir.create output_dir) >>= fun _ ->
|
||||
save_exec build_dir exec >>= fun () ->
|
||||
save_files output_dir artifacts >>= fun artifacts ->
|
||||
save_files input_dir job.Builder.files >>= fun input_files ->
|
||||
Lwt.return (Bos.OS.Dir.create staging_input_dir) >>= fun _ ->
|
||||
Lwt.return (Bos.OS.Dir.create staging_output_dir) >>= fun _ ->
|
||||
save_exec staging_dir exec >>= fun () ->
|
||||
save_files output_dir staging_output_dir artifacts >>= fun artifacts ->
|
||||
save_files input_dir staging_input_dir job.Builder.files >>= fun input_files ->
|
||||
Lwt_result.return (artifacts, input_files)
|
||||
|
||||
let commit_files basedir staging_dir job_name uuid =
|
||||
let job_dir = Fpath.(basedir / job_name) in
|
||||
let dest = Fpath.(job_dir / Uuidm.to_string uuid) in
|
||||
Lwt.return (Bos.OS.Dir.create job_dir) >>= fun _ ->
|
||||
Lwt.return (Bos.OS.Path.move staging_dir dest)
|
||||
|
||||
let add_build
|
||||
basedir
|
||||
((job, uuid, console, start, finish, result, _) as exec)
|
||||
(module Db : CONN) =
|
||||
let open Builder_db in
|
||||
let job_name = job.Builder.name in
|
||||
save_all basedir exec >>= fun (artifacts, input_files) ->
|
||||
let staging_dir = Fpath.(staging basedir / Uuidm.to_string uuid) in
|
||||
let or_cleanup x =
|
||||
Lwt_result.map_err (fun e ->
|
||||
Bos.OS.Dir.delete ~recurse:true staging_dir
|
||||
|> Result.iter_error (fun e ->
|
||||
Log.err (fun m -> m "Failed to remove staging dir %a: %a"
|
||||
Fpath.pp staging_dir
|
||||
pp_error e));
|
||||
e)
|
||||
x
|
||||
in
|
||||
or_cleanup (save_all basedir staging_dir exec) >>= fun (artifacts, input_files) ->
|
||||
let main_binary =
|
||||
match List.find_all
|
||||
(fun file ->
|
||||
|
@ -147,21 +200,27 @@ let add_build
|
|||
(List.map (fun f -> f.filepath) binaries));
|
||||
None
|
||||
in
|
||||
Db.exec Job.try_add job_name >>= fun () ->
|
||||
Db.find Job.get_id_by_name job_name >>= fun job_id ->
|
||||
Db.exec Build.add { Build.uuid; start; finish; result;
|
||||
console; script = job.Builder.script;
|
||||
main_binary; job_id } >>= fun () ->
|
||||
Db.find last_insert_rowid () >>= fun id ->
|
||||
List.fold_left
|
||||
(fun r file ->
|
||||
r >>= fun () ->
|
||||
Db.exec Build_artifact.add (file, id))
|
||||
(Lwt_result.return ())
|
||||
artifacts >>= fun () ->
|
||||
List.fold_left
|
||||
(fun r file ->
|
||||
r >>= fun () ->
|
||||
Db.exec Build_file.add (file, id))
|
||||
(Lwt_result.return ())
|
||||
input_files
|
||||
let r =
|
||||
Db.start () >>= fun () ->
|
||||
Db.exec Job.try_add job_name >>= fun () ->
|
||||
Db.find Job.get_id_by_name job_name >>= fun job_id ->
|
||||
Db.exec Build.add { Build.uuid; start; finish; result;
|
||||
console; script = job.Builder.script;
|
||||
main_binary; job_id } >>= fun () ->
|
||||
Db.find last_insert_rowid () >>= fun id ->
|
||||
List.fold_left
|
||||
(fun r file ->
|
||||
r >>= fun () ->
|
||||
Db.exec Build_artifact.add (file, id))
|
||||
(Lwt_result.return ())
|
||||
artifacts >>= fun () ->
|
||||
List.fold_left
|
||||
(fun r file ->
|
||||
r >>= fun () ->
|
||||
Db.exec Build_file.add (file, id))
|
||||
(Lwt_result.return ())
|
||||
input_files >>= fun () ->
|
||||
Db.commit () >>= fun () ->
|
||||
commit_files basedir staging_dir job_name uuid
|
||||
in
|
||||
or_cleanup r
|
||||
|
|
|
@ -2,6 +2,11 @@ type error = [ Caqti_error.call_or_retrieve | `Not_found | `File_error of Fpath.
|
|||
|
||||
val pp_error : Format.formatter -> error -> unit
|
||||
|
||||
val staging : Fpath.t -> Fpath.t
|
||||
|
||||
val cleanup_staging : Fpath.t -> Caqti_lwt.connection ->
|
||||
(unit, [> error ]) result Lwt.t
|
||||
|
||||
val build_artifact : Uuidm.t -> Fpath.t -> Caqti_lwt.connection ->
|
||||
(string * Cstruct.t, [> error ]) result Lwt.t
|
||||
|
||||
|
|
Loading…
Reference in a new issue