From f636280f10f5973a39fb77c079c07320eefe24f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reynir=20Bj=C3=B6rnsson?= Date: Thu, 14 Sep 2023 10:58:09 +0200 Subject: [PATCH] WIP content addressing --- db/builder_db.ml | 22 +++++--- db/builder_db.mli | 3 +- db/representation.ml | 25 +++++---- lib/builder_web.ml | 10 ++-- lib/dream_tar.ml | 2 +- lib/model.ml | 118 +++++++++++++++++++++++++++++-------------- lib/model.mli | 2 + lib/views.ml | 2 +- 8 files changed, 117 insertions(+), 67 deletions(-) diff --git a/db/builder_db.ml b/db/builder_db.ml index cdcd190..13d63ce 100644 --- a/db/builder_db.ml +++ b/db/builder_db.ml @@ -11,7 +11,6 @@ type 'a id = 'a Rep.id type file = Rep.file = { filepath : Fpath.t; - localpath : Fpath.t; sha256 : Cstruct.t; size : int; } @@ -140,7 +139,6 @@ module Build_artifact = struct {| CREATE TABLE build_artifact ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, filepath TEXT NOT NULL, -- the path as in the build - localpath TEXT NOT NULL, -- local path to the file on disk sha256 BLOB NOT NULL, size INTEGER NOT NULL, build INTEGER NOT NULL, @@ -156,13 +154,13 @@ module Build_artifact = struct let get = id `build_artifact ->! file @@ - {| SELECT filepath, localpath, sha256, size + {| SELECT filepath, sha256, size FROM build_artifact WHERE id = ? |} let get_by_build_uuid = Caqti_type.tup2 uuid fpath ->? Caqti_type.tup2 (id `build_artifact) file @@ {| SELECT build_artifact.id, build_artifact.filepath, - build_artifact.localpath, build_artifact.sha256, build_artifact.size + build_artifact.sha256, build_artifact.size FROM build_artifact INNER JOIN build ON build.id = build_artifact.build WHERE build.uuid = ? AND build_artifact.filepath = ? @@ -170,11 +168,15 @@ module Build_artifact = struct let get_all_by_build = id `build ->* Caqti_type.(tup2 (id `build_artifact) file) @@ - "SELECT id, filepath, localpath, sha256, size FROM build_artifact WHERE build = ?" + "SELECT id, filepath, sha256, size FROM build_artifact WHERE build = ?" + + let exists = + cstruct ->! Caqti_type.bool @@ + "SELECT EXISTS(SELECT 1 FROM build_artifact WHERE sha256 = ?" let add = Caqti_type.(tup2 file (id `build)) ->. Caqti_type.unit @@ - "INSERT INTO build_artifact (filepath, localpath, sha256, size, build) \ + "INSERT INTO build_artifact (filepath, sha256, size, build) \ VALUES (?, ?, ?, ?, ?)" let remove_by_build = @@ -316,7 +318,7 @@ module Build = struct b.uuid, b.start_d, b.start_ps, b.finish_d, b.finish_ps, b.result_code, b.result_msg, b.console, b.script, b.platform, b.main_binary, b.input_id, b.user, b.job, - a.filepath, a.localpath, a.sha256, a.size + a.filepath, a.sha256, a.size FROM build b, build_artifact a WHERE b.main_binary = a.id AND b.job = $1 AND b.platform = $2 AND b.main_binary IS NOT NULL @@ -442,7 +444,7 @@ module Build = struct {| SELECT b.uuid, b.start_d, b.start_ps, b.finish_d, b.finish_ps, b.result_code, b.result_msg, b.console, b.script, b.platform, b.main_binary, b.input_id, b.user, b.job, - a.filepath, a.localpath, a.sha256, a.size + a.filepath, a.sha256, a.size FROM build_artifact a INNER JOIN build b ON b.id = a.build WHERE a.sha256 = ? @@ -593,6 +595,8 @@ let migrate = [ "CREATE INDEX idx_build_main_binary ON build(main_binary)"; Caqti_type.unit ->. Caqti_type.unit @@ "CREATE INDEX idx_build_artifact_sha256 ON build_artifact(sha256)"; + Caqti_type.unit ->. Caqti_type.unit @@ + "CREATE INDEX idx_build_artifact_build ON build_artifact(build)"; set_current_version; set_application_id; ] @@ -606,6 +610,8 @@ let rollback = [ Build.rollback; Job.rollback; Caqti_type.unit ->. Caqti_type.unit @@ + "DROP INDEX IF EXISTS idx_build_artifact_build"; + Caqti_type.unit ->. Caqti_type.unit @@ "DROP INDEX IF EXISTS idx_build_artifact_sha256"; Caqti_type.unit ->. Caqti_type.unit @@ "DROP INDEX IF EXISTS idx_build_failed"; diff --git a/db/builder_db.mli b/db/builder_db.mli index 2988c12..9d1f052 100644 --- a/db/builder_db.mli +++ b/db/builder_db.mli @@ -3,7 +3,6 @@ module Rep : sig type 'a id type file = { filepath : Fpath.t; - localpath : Fpath.t; sha256 : Cstruct.t; size : int; } @@ -23,7 +22,6 @@ type 'a id = 'a Rep.id type file = Rep.file = { filepath : Fpath.t; - localpath : Fpath.t; sha256 : Cstruct.t; size : int; } @@ -87,6 +85,7 @@ module Build_artifact : sig Caqti_request.t val get_all_by_build : ([`build] id, [`build_artifact] id * file, [ `Many | `One | `Zero ]) Caqti_request.t + val exists : (Cstruct.t, bool, [ `One ]) Caqti_request.t val add : (file * [`build] id, unit, [ `Zero ]) Caqti_request.t val remove_by_build : diff --git a/db/representation.ml b/db/representation.ml index 4d94b93..1cc1722 100644 --- a/db/representation.ml +++ b/db/representation.ml @@ -30,7 +30,6 @@ let id_to_int64 (id : 'a id) : int64 = id type file = { filepath : Fpath.t; - localpath : Fpath.t; sha256 : Cstruct.t; size : int; } @@ -63,24 +62,24 @@ let cstruct = Caqti_type.custom ~encode ~decode Caqti_type.octets let file = - let encode { filepath; localpath; sha256; size } = - Ok (filepath, localpath, sha256, size) in - let decode (filepath, localpath, sha256, size) = - Ok { filepath; localpath; sha256; size } in - Caqti_type.custom ~encode ~decode Caqti_type.(tup4 fpath fpath cstruct int) + let encode { filepath; sha256; size } = + Ok (filepath, sha256, size) in + let decode (filepath, sha256, size) = + Ok { filepath; sha256; size } in + Caqti_type.custom ~encode ~decode Caqti_type.(tup3 fpath cstruct int) let file_opt = - let rep = Caqti_type.(tup4 (option fpath) (option fpath) (option cstruct) (option int)) in + let rep = Caqti_type.(tup3 (option fpath) (option cstruct) (option int)) in let encode = function - | Some { filepath; localpath; sha256; size } -> - Ok (Some filepath, Some localpath, Some sha256, Some size) + | Some { filepath; sha256; size } -> + Ok (Some filepath, Some sha256, Some size) | None -> - Ok (None, None, None, None) + Ok (None, None, None) in let decode = function - | (Some filepath, Some localpath, Some sha256, Some size) -> - Ok (Some { filepath; localpath; sha256; size }) - | (None, None, None, None) -> + | (Some filepath, Some sha256, Some size) -> + Ok (Some { filepath; sha256; size }) + | (None, None, None) -> Ok None | _ -> (* This should not happen if the database is well-formed *) diff --git a/lib/builder_web.ml b/lib/builder_web.ml index 4a40dcf..feb596e 100644 --- a/lib/builder_web.ml +++ b/lib/builder_web.ml @@ -211,9 +211,9 @@ module Viz_aux = struct match typ with | `Treemap -> let debug_binary = - let bin = Fpath.base main_binary.localpath in + let bin = Fpath.base main_binary.filepath in List.find_opt - (fun p -> Fpath.(equal (bin + "debug") (base p.localpath))) + (fun p -> Fpath.(equal (bin + "debug") (base p.filepath))) artifacts in begin @@ -226,7 +226,7 @@ module Viz_aux = struct | `Dependencies -> let opam_switch = List.find_opt - (fun p -> Fpath.(equal (v "opam-switch") (base p.localpath))) + (fun p -> Fpath.(equal (v "opam-switch") (base p.filepath))) artifacts in Model.not_found opam_switch @@ -435,8 +435,8 @@ let routes ~datadir ~cachedir ~configdir ~expired_jobs = | _ -> Model.build_artifact_data datadir file |> if_error "Error getting build artifact" - ~log:(fun e -> Log.warn (fun m -> m "Error getting build artifact data for file %a in %a: %a" - Fpath.pp file.Builder_db.filepath Fpath.pp file.Builder_db.localpath + ~log:(fun e -> Log.warn (fun m -> m "Error getting build artifact data for file %a: %a" + Fpath.pp file.Builder_db.filepath pp_error e)) >>= fun data -> let headers = [ "Content-Type", mime_lookup file.Builder_db.filepath; diff --git a/lib/dream_tar.ml b/lib/dream_tar.ml index b5a5899..2cad5f6 100644 --- a/lib/dream_tar.ml +++ b/lib/dream_tar.ml @@ -67,7 +67,7 @@ let targz_response datadir finish (files : Builder_db.file list) (stream : Dream in Lwt_list.iter_s (fun file -> let hdr = header_of_file finish file in - write_block hdr Fpath.(datadir // file.localpath) state) + write_block hdr Fpath.(datadir // Model.artifact_path file) state) files >>= fun () -> Writer.really_write state Tar.Header.zero_block >>= fun () -> Writer.really_write state Tar.Header.zero_block >>= fun () -> diff --git a/lib/model.ml b/lib/model.ml index 91e25b4..ec62243 100644 --- a/lib/model.ml +++ b/lib/model.ml @@ -19,6 +19,15 @@ let not_found = function | Some v -> Lwt_result.return v let staging datadir = Fpath.(datadir / "_staging") +let artifacts_dir datadir = Fpath.(datadir / "_artifacts") +let artifact_path artifact = + let (`Hex sha256) = Hex.of_cstruct artifact.Builder_db.sha256 in + (* NOTE: [sha256] is 64 characters when it's a hex sha256 checksum *) + (* NOTE: We add the prefix to reduce the number of files in a directory - a + workaround for inferior filesystems. We can easily revert this by changing + this function and adding a migration. *) + let prefix = String.sub sha256 0 2 in + Fpath.(v "_artifacts" / prefix / sha256) let read_file datadir filepath = let filepath = Fpath.(datadir // filepath) in @@ -44,14 +53,14 @@ let build_artifact_by_id id (module Db : CONN) = Db.find Builder_db.Build_artifact.get id let build_artifact_data datadir file = - read_file datadir file.Builder_db.localpath + read_file datadir (artifact_path file) let build_artifacts build (module Db : CONN) = Db.collect_list Builder_db.Build_artifact.get_all_by_build build >|= List.map snd let solo5_manifest datadir file = - let buf = Owee_buf.map_binary Fpath.(to_string (datadir // file.Builder_db.localpath)) in + let buf = Owee_buf.map_binary Fpath.(to_string (datadir // artifact_path file)) in Solo5_elftool.query_manifest buf |> Result.to_option let platforms_of_job id (module Db : CONN) = @@ -196,11 +205,11 @@ let cleanup_staging datadir (module Db : Caqti_lwt.CONNECTION) = (cleanup_staged staged)) stageds -let save file data = +let save path data = let open Lwt.Infix in Lwt.catch (fun () -> - Lwt_io.open_file ~mode:Lwt_io.Output (Fpath.to_string file) >>= fun oc -> + Lwt_io.open_file ~mode:Lwt_io.Output (Fpath.to_string path) >>= fun oc -> Lwt_io.write oc data >>= fun () -> Lwt_io.close oc |> Lwt_result.ok) @@ -209,33 +218,30 @@ let save file data = Lwt_result.fail (`Msg (Unix.error_message e)) | e -> Lwt.fail e) -let save_file dir staging (filepath, data) = - let size = String.length data in - let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) in - let localpath = Fpath.append dir filepath in - let destpath = Fpath.append staging filepath in - Lwt_result.lift (Bos.OS.Dir.create (Fpath.parent destpath)) >>= fun _ -> - save destpath data >|= fun () -> - { Builder_db.filepath; localpath; sha256; size } - -let save_files dir staging files = +let save_artifacts staging artifacts = List.fold_left - (fun r file -> - r >>= fun acc -> - save_file dir staging file >>= fun file -> - Lwt_result.return (file :: acc)) - (Lwt_result.return []) - files + (fun r (file, data) -> + r >>= fun () -> + let (`Hex sha256) = Hex.of_cstruct file.Builder_db.sha256 in + let destpath = Fpath.(staging / sha256) in + save destpath data) + (Lwt_result.return ()) + artifacts -let save_all staging_dir (job : Builder.script_job) uuid artifacts = - let build_dir = Fpath.(v job.Builder.name / Uuidm.to_string uuid) in - let output_dir = Fpath.(build_dir / "output") - and staging_output_dir = Fpath.(staging_dir / "output") in - Lwt.return (Bos.OS.Dir.create staging_output_dir) >>= fun _ -> - save_files output_dir staging_output_dir artifacts >>= fun artifacts -> - Lwt_result.return artifacts - -let commit_files datadir staging_dir job_name uuid = +let commit_files datadir staging_dir job_name uuid artifacts = + (* First we move the artifacts *) + Lwt.return (Bos.OS.Dir.create (artifacts_dir datadir)) >>= fun _ -> + List.fold_left + (fun r artifact -> + r >>= fun () -> + let (`Hex sha256) = Hex.of_cstruct artifact.Builder_db.sha256 in + let src = Fpath.(staging_dir / sha256) in + let dest = Fpath.(datadir // artifact_path artifact) in + Lwt.return (Bos.OS.Dir.create (Fpath.parent dest)) >>= fun _created -> + Lwt.return (Bos.OS.Path.move ~force:true src dest)) + (Lwt_result.return ()) + artifacts >>= fun () -> + (* Now the staging dir only contains script & console *) let job_dir = Fpath.(datadir / job_name) in let dest = Fpath.(job_dir / Uuidm.to_string uuid) in Lwt.return (Bos.OS.Dir.create job_dir) >>= fun _ -> @@ -324,6 +330,25 @@ let prepare_staging staging_dir = then Lwt_result.fail (`Msg "build directory already exists") else Lwt_result.return () +(* saving: + - for each artifact compute its sha256 checksum -- calling Lwt.pause in + between + - lookup artifact sha256 in the database and filter them out of the list: not_in_db + - mkdir -p _staging/uuid/ + - save console & script to _staging/uuid/ + - save each artifact in not_in_db as _staging/uuid/sha256 + committing: + - for each artifact mv _staging/uuid/sha256 _artifacts/sha256 + (or _artifacts/prefix(sha256)/sha256 where prefix(sha256) is the first two hex digits in sha256) + - now _staging/uuid only contains console & script so we mv _staging/uuid _staging/job/uuid + potential issues: + - race condition in uploading same artifact: + * if the artifact already exists in the database and thus filesystem then nothing is done + * if the artifact is added to the database and/or filesystem we atomically overwrite it + - input_id depends on a sort order? + *) + + let add_build ~datadir ~cachedir @@ -344,16 +369,35 @@ let add_build e) x in - let artifacts_to_preserve = - let not_interesting p = - String.equal (Fpath.basename p) "README.md" || String.equal (Fpath.get_ext p) ".build-hashes" - in - List.filter (fun (p, _) -> not (not_interesting p)) raw_artifacts + let not_interesting p = + String.equal (Fpath.basename p) "README.md" || String.equal (Fpath.get_ext p) ".build-hashes" in + begin + List.fold_left + (fun r (filepath, data) -> + r >>= fun acc -> + if not_interesting filepath then + Lwt_result.return acc + else + let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) + and size = String.length data in + Lwt_result.ok (Lwt.pause ()) >|= fun () -> + ({ filepath; sha256; size }, data) :: acc) + (Lwt_result.return []) + raw_artifacts + end >>= fun artifacts -> or_cleanup (prepare_staging staging_dir) >>= fun () -> or_cleanup (save_console_and_script staging_dir job_name uuid console job.Builder.script) >>= fun (console, script) -> - or_cleanup (save_all staging_dir job uuid artifacts_to_preserve) >>= fun artifacts -> + List.fold_left + (fun r ((f, _) as artifact) -> + r >>= fun acc -> + Db.find Builder_db.Build_artifact.exists f.sha256 >|= fun exists -> + if exists then acc else artifact :: acc) + (Lwt_result.return []) + artifacts >>= fun artifacts_to_save -> + or_cleanup (save_artifacts staging_dir artifacts_to_save) >>= fun () -> + let artifacts = List.map fst artifacts in let r = Db.start () >>= fun () -> Db.exec Job.try_add job_name >>= fun () -> @@ -422,7 +466,7 @@ let add_build (Lwt_result.return ()) remaining_artifacts_to_add >>= fun () -> Db.commit () >>= fun () -> - commit_files datadir staging_dir job_name uuid >|= fun () -> + commit_files datadir staging_dir job_name uuid artifacts >|= fun () -> main_binary in Lwt_result.bind_lwt_error (or_cleanup r) @@ -451,7 +495,7 @@ let add_build "--uuid=" ^ uuid ; "--platform=" ^ platform ; "--cache-dir=" ^ Fpath.to_string cachedir ; "--data-dir=" ^ Fpath.to_string datadir ; - fp_str main_binary.localpath ]) + fp_str Fpath.(datadir // artifact_path main_binary) ]) in Log.debug (fun m -> m "executing hooks with %s" args); let dir = Fpath.(configdir / "upload-hooks") in diff --git a/lib/model.mli b/lib/model.mli index 2b36a7c..db38ae3 100644 --- a/lib/model.mli +++ b/lib/model.mli @@ -5,6 +5,8 @@ val pp_error : Format.formatter -> error -> unit val not_found : 'a option -> ('a, [> `Not_found ]) result Lwt.t val staging : Fpath.t -> Fpath.t +val artifacts_dir : Fpath.t -> Fpath.t +val artifact_path : Builder_db.file -> Fpath.t val cleanup_staging : Fpath.t -> Caqti_lwt.connection -> (unit, [> `Msg of string ]) result Lwt.t diff --git a/lib/views.ml b/lib/views.ml index 9e9e3e3..8a597f8 100644 --- a/lib/views.ml +++ b/lib/views.ml @@ -188,7 +188,7 @@ let artifact ~basename ~job_name ~build - ~file:{ Builder_db.filepath; localpath = _; sha256; size } + ~file:{ Builder_db.filepath; sha256; size } = let artifact_link = Link.Job_build_artifact.make