From 7b5119189de24a53f78a70d484fde348e3ca679d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reynir=20Bj=C3=B6rnsson?= Date: Wed, 24 Mar 2021 14:49:23 +0100 Subject: [PATCH] Filesystem transactions and cleanup --- lib/builder_web.ml | 31 +++++++---- lib/model.ml | 125 +++++++++++++++++++++++++++++++++------------ lib/model.mli | 5 ++ 3 files changed, 119 insertions(+), 42 deletions(-) diff --git a/lib/builder_web.ml b/lib/builder_web.ml index 14e4b85..876fcfa 100644 --- a/lib/builder_web.ml +++ b/lib/builder_web.ml @@ -22,7 +22,18 @@ type 'a t = { let realm = "builder-web" +let init_datadir datadir = + let open Rresult.R.Infix in + Bos.OS.Dir.exists datadir >>= (fun exists -> + if exists + then Ok () + else Error (`Msg "Datadir does not exist")) >>= fun () -> + Bos.OS.Dir.create ~path:false (Model.staging datadir) >>| fun _ -> () + let init ?(pool_size = 10) dbpath datadir = + Rresult.R.bind + (init_datadir datadir) @@ + fun () -> match Caqti_lwt.connect_pool ~max_size:pool_size (Uri.make ~scheme:"sqlite3" ~path:dbpath ~query:["create", ["false"]] ()) @@ -32,18 +43,20 @@ let init ?(pool_size = 10) dbpath datadir = | Ok pool -> Lwt_main.run (Caqti_lwt.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) -> Db.find Builder_db.get_application_id () >>= fun application_id -> - Db.find Builder_db.get_version () >>= fun version -> - Lwt_result.return (application_id, version)) + Db.find Builder_db.get_version () >>= (fun version -> + if (application_id, version) = Builder_db.(application_id, current_version) + then Lwt_result.return () + else Lwt_result.fail (`Wrong_version (application_id, version))) + >>= fun () -> + Model.cleanup_staging datadir (module Db)) pool) |> (function | Error e -> Error (e :> [> db_error | `Wrong_version of int32 * int64 ]) - | Ok (application_id, version) -> - if application_id = Builder_db.application_id && version = Builder_db.current_version - then Ok { - pool = (pool :> (Caqti_lwt.connection, [> db_error ]) Caqti_lwt.Pool.t); - datadir; - } - else Error (`Wrong_version (application_id, version))) + | Ok () -> + Ok { + pool = (pool :> (Caqti_lwt.connection, [> db_error ]) Caqti_lwt.Pool.t); + datadir; + }) let pp_exec ppf (job, uuid, _, _, _, _, _) = Format.fprintf ppf "%s(%a)" job.Builder.name Uuidm.pp uuid diff --git a/lib/model.ml b/lib/model.ml index 77e2966..5c71b78 100644 --- a/lib/model.ml +++ b/lib/model.ml @@ -18,6 +18,8 @@ let not_found = function | None -> Lwt.return (Error `Not_found :> (_, [> error ]) result) | Some v -> Lwt_result.return v +let staging datadir = Fpath.(datadir / "_staging") + let read_file filepath = Lwt.try_bind (fun () -> Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string filepath)) @@ -74,6 +76,37 @@ let user username (module Db : CONN) = Db.find_opt Builder_db.User.get_user username >|= Option.map snd +let cleanup_staging datadir (module Db : Caqti_lwt.CONNECTION) = + let cleanup_staged staged = + match Uuidm.of_string (Fpath.to_string staged) with + | None -> + Log.warn (fun m -> m "Non-uuid staged files: %a" Fpath.pp + Fpath.(staging datadir // staged)); + Lwt.return (Bos.OS.Path.delete ~recurse:true Fpath.(staging datadir // staged)) + | Some uuid -> + let staged = Fpath.(staging datadir // staged) in + Db.find_opt Builder_db.Build.get_by_uuid uuid >>= function + | Some (_id, build) -> + Db.find Builder_db.Job.get build.job_id >>= fun job_name -> + let destdir = Fpath.(datadir / job_name / Uuidm.to_string uuid) in + Lwt.return (Bos.OS.Path.move staged destdir) + | None -> + Lwt.return (Bos.OS.Path.delete ~recurse:true Fpath.(staging datadir // staged)) + in + Lwt.return (Bos.OS.Dir.contents ~rel:true (staging datadir)) >>= fun stageds -> + Lwt_result.ok @@ + Lwt_list.iter_s + (fun staged -> + Lwt.map (function + | Error (_ as e) -> + Log.warn (fun m -> m "Failed cleaning up staged files %a in %a: %a" + Fpath.pp staged + Fpath.pp Fpath.(staging datadir // staged) + pp_error e) + | Ok () -> ()) + (cleanup_staged staged)) + stageds + let save file data = let open Lwt.Infix in Lwt.catch @@ -91,45 +124,65 @@ let save_exec build_dir exec = let cs = Builder.Asn.exec_to_cs exec in save Fpath.(build_dir / "full") (Cstruct.to_string cs) -let save_file dir (filepath, data) = +let save_file dir staging (filepath, data) = let size = String.length data in let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) in let localpath = Fpath.append dir filepath in - Lwt_result.lift (Bos.OS.Dir.create (Fpath.parent localpath)) >>= fun _ -> - save localpath data >|= fun () -> + let destpath = Fpath.append staging filepath in + Lwt_result.lift (Bos.OS.Dir.create (Fpath.parent destpath)) >>= fun _ -> + save destpath data >|= fun () -> { Builder_db.filepath; localpath; sha256; size } -let save_files dir files = +let save_files dir staging files = List.fold_left (fun r file -> r >>= fun acc -> - save_file dir file >>= fun file -> + save_file dir staging file >>= fun file -> Lwt_result.return (file :: acc)) (Lwt_result.return []) files -let save_all basedir ((job, uuid, _, _, _, _, artifacts) as exec) = +let save_all basedir staging_dir ((job, uuid, _, _, _, _, artifacts) as exec) = let build_dir = Fpath.(basedir / job.Builder.name / Uuidm.to_string uuid) in - let input_dir = Fpath.(build_dir / "input") in - let output_dir = Fpath.(build_dir / "output") in - Lwt.return (Bos.OS.Dir.create build_dir) >>= (fun created -> + let input_dir = Fpath.(build_dir / "input") + and staging_input_dir = Fpath.(staging_dir / "input") in + let output_dir = Fpath.(build_dir / "output") + and staging_output_dir = Fpath.(staging_dir / "output") in + Lwt.return (Bos.OS.Dir.create staging_dir) >>= (fun created -> if not created then Lwt_result.fail (`Msg "build directory already exists") else Lwt_result.return ()) >>= fun () -> - Lwt.return (Bos.OS.Dir.create input_dir) >>= fun _ -> - Lwt.return (Bos.OS.Dir.create output_dir) >>= fun _ -> - save_exec build_dir exec >>= fun () -> - save_files output_dir artifacts >>= fun artifacts -> - save_files input_dir job.Builder.files >>= fun input_files -> + Lwt.return (Bos.OS.Dir.create staging_input_dir) >>= fun _ -> + Lwt.return (Bos.OS.Dir.create staging_output_dir) >>= fun _ -> + save_exec staging_dir exec >>= fun () -> + save_files output_dir staging_output_dir artifacts >>= fun artifacts -> + save_files input_dir staging_input_dir job.Builder.files >>= fun input_files -> Lwt_result.return (artifacts, input_files) +let commit_files basedir staging_dir job_name uuid = + let job_dir = Fpath.(basedir / job_name) in + let dest = Fpath.(job_dir / Uuidm.to_string uuid) in + Lwt.return (Bos.OS.Dir.create job_dir) >>= fun _ -> + Lwt.return (Bos.OS.Path.move staging_dir dest) + let add_build basedir ((job, uuid, console, start, finish, result, _) as exec) (module Db : CONN) = let open Builder_db in let job_name = job.Builder.name in - save_all basedir exec >>= fun (artifacts, input_files) -> + let staging_dir = Fpath.(staging basedir / Uuidm.to_string uuid) in + let or_cleanup x = + Lwt_result.map_err (fun e -> + Bos.OS.Dir.delete ~recurse:true staging_dir + |> Result.iter_error (fun e -> + Log.err (fun m -> m "Failed to remove staging dir %a: %a" + Fpath.pp staging_dir + pp_error e)); + e) + x + in + or_cleanup (save_all basedir staging_dir exec) >>= fun (artifacts, input_files) -> let main_binary = match List.find_all (fun file -> @@ -147,21 +200,27 @@ let add_build (List.map (fun f -> f.filepath) binaries)); None in - Db.exec Job.try_add job_name >>= fun () -> - Db.find Job.get_id_by_name job_name >>= fun job_id -> - Db.exec Build.add { Build.uuid; start; finish; result; - console; script = job.Builder.script; - main_binary; job_id } >>= fun () -> - Db.find last_insert_rowid () >>= fun id -> - List.fold_left - (fun r file -> - r >>= fun () -> - Db.exec Build_artifact.add (file, id)) - (Lwt_result.return ()) - artifacts >>= fun () -> - List.fold_left - (fun r file -> - r >>= fun () -> - Db.exec Build_file.add (file, id)) - (Lwt_result.return ()) - input_files + let r = + Db.start () >>= fun () -> + Db.exec Job.try_add job_name >>= fun () -> + Db.find Job.get_id_by_name job_name >>= fun job_id -> + Db.exec Build.add { Build.uuid; start; finish; result; + console; script = job.Builder.script; + main_binary; job_id } >>= fun () -> + Db.find last_insert_rowid () >>= fun id -> + List.fold_left + (fun r file -> + r >>= fun () -> + Db.exec Build_artifact.add (file, id)) + (Lwt_result.return ()) + artifacts >>= fun () -> + List.fold_left + (fun r file -> + r >>= fun () -> + Db.exec Build_file.add (file, id)) + (Lwt_result.return ()) + input_files >>= fun () -> + Db.commit () >>= fun () -> + commit_files basedir staging_dir job_name uuid + in + or_cleanup r diff --git a/lib/model.mli b/lib/model.mli index 0c59fb1..6397519 100644 --- a/lib/model.mli +++ b/lib/model.mli @@ -2,6 +2,11 @@ type error = [ Caqti_error.call_or_retrieve | `Not_found | `File_error of Fpath. val pp_error : Format.formatter -> error -> unit +val staging : Fpath.t -> Fpath.t + +val cleanup_staging : Fpath.t -> Caqti_lwt.connection -> + (unit, [> error ]) result Lwt.t + val build_artifact : Uuidm.t -> Fpath.t -> Caqti_lwt.connection -> (string * Cstruct.t, [> error ]) result Lwt.t