WIP content addressing

This commit is contained in:
Reynir Björnsson 2023-09-14 10:58:09 +02:00 committed by Robur
parent 7f3a6719e2
commit f636280f10
8 changed files with 117 additions and 67 deletions

View file

@ -11,7 +11,6 @@ type 'a id = 'a Rep.id
type file = Rep.file = {
filepath : Fpath.t;
localpath : Fpath.t;
sha256 : Cstruct.t;
size : int;
@ -140,7 +139,6 @@ module Build_artifact = struct
{| CREATE TABLE build_artifact (
filepath TEXT NOT NULL, -- the path as in the build
localpath TEXT NOT NULL, -- local path to the file on disk
@ -156,13 +154,13 @@ module Build_artifact = struct
let get =
id `build_artifact ->! file @@
{| SELECT filepath, localpath, sha256, size
{| SELECT filepath, sha256, size
FROM build_artifact WHERE id = ? |}
let get_by_build_uuid =
Caqti_type.tup2 uuid fpath ->? Caqti_type.tup2 (id `build_artifact) file @@
{| SELECT build_artifact.id, build_artifact.filepath,
build_artifact.localpath, build_artifact.sha256, build_artifact.size
build_artifact.sha256, build_artifact.size
FROM build_artifact
INNER JOIN build ON build.id = build_artifact.build
WHERE build.uuid = ? AND build_artifact.filepath = ?
@ -170,11 +168,15 @@ module Build_artifact = struct
let get_all_by_build =
id `build ->* Caqti_type.(tup2 (id `build_artifact) file) @@
"SELECT id, filepath, localpath, sha256, size FROM build_artifact WHERE build = ?"
"SELECT id, filepath, sha256, size FROM build_artifact WHERE build = ?"
let exists =
cstruct ->! Caqti_type.bool @@
"SELECT EXISTS(SELECT 1 FROM build_artifact WHERE sha256 = ?"
let add =
Caqti_type.(tup2 file (id `build)) ->. Caqti_type.unit @@
"INSERT INTO build_artifact (filepath, localpath, sha256, size, build) \
"INSERT INTO build_artifact (filepath, sha256, size, build) \
VALUES (?, ?, ?, ?, ?)"
let remove_by_build =
@ -316,7 +318,7 @@ module Build = struct
b.uuid, b.start_d, b.start_ps, b.finish_d, b.finish_ps,
b.result_code, b.result_msg, b.console, b.script,
b.platform, b.main_binary, b.input_id, b.user, b.job,
a.filepath, a.localpath, a.sha256, a.size
a.filepath, a.sha256, a.size
FROM build b, build_artifact a
WHERE b.main_binary = a.id AND b.job = $1 AND b.platform = $2
AND b.main_binary IS NOT NULL
@ -442,7 +444,7 @@ module Build = struct
{| SELECT b.uuid, b.start_d, b.start_ps, b.finish_d, b.finish_ps,
b.result_code, b.result_msg, b.console, b.script,
b.platform, b.main_binary, b.input_id, b.user, b.job,
a.filepath, a.localpath, a.sha256, a.size
a.filepath, a.sha256, a.size
FROM build_artifact a
INNER JOIN build b ON b.id = a.build
WHERE a.sha256 = ?
@ -593,6 +595,8 @@ let migrate = [
"CREATE INDEX idx_build_main_binary ON build(main_binary)";
Caqti_type.unit ->. Caqti_type.unit @@
"CREATE INDEX idx_build_artifact_sha256 ON build_artifact(sha256)";
Caqti_type.unit ->. Caqti_type.unit @@
"CREATE INDEX idx_build_artifact_build ON build_artifact(build)";
@ -606,6 +610,8 @@ let rollback = [
Caqti_type.unit ->. Caqti_type.unit @@
"DROP INDEX IF EXISTS idx_build_artifact_build";
Caqti_type.unit ->. Caqti_type.unit @@
"DROP INDEX IF EXISTS idx_build_artifact_sha256";
Caqti_type.unit ->. Caqti_type.unit @@
"DROP INDEX IF EXISTS idx_build_failed";

View file

@ -3,7 +3,6 @@ module Rep : sig
type 'a id
type file = {
filepath : Fpath.t;
localpath : Fpath.t;
sha256 : Cstruct.t;
size : int;
@ -23,7 +22,6 @@ type 'a id = 'a Rep.id
type file = Rep.file = {
filepath : Fpath.t;
localpath : Fpath.t;
sha256 : Cstruct.t;
size : int;
@ -87,6 +85,7 @@ module Build_artifact : sig
val get_all_by_build :
([`build] id, [`build_artifact] id * file, [ `Many | `One | `Zero ]) Caqti_request.t
val exists : (Cstruct.t, bool, [ `One ]) Caqti_request.t
val add :
(file * [`build] id, unit, [ `Zero ]) Caqti_request.t
val remove_by_build :

View file

@ -30,7 +30,6 @@ let id_to_int64 (id : 'a id) : int64 = id
type file = {
filepath : Fpath.t;
localpath : Fpath.t;
sha256 : Cstruct.t;
size : int;
@ -63,24 +62,24 @@ let cstruct =
Caqti_type.custom ~encode ~decode Caqti_type.octets
let file =
let encode { filepath; localpath; sha256; size } =
Ok (filepath, localpath, sha256, size) in
let decode (filepath, localpath, sha256, size) =
Ok { filepath; localpath; sha256; size } in
Caqti_type.custom ~encode ~decode Caqti_type.(tup4 fpath fpath cstruct int)
let encode { filepath; sha256; size } =
Ok (filepath, sha256, size) in
let decode (filepath, sha256, size) =
Ok { filepath; sha256; size } in
Caqti_type.custom ~encode ~decode Caqti_type.(tup3 fpath cstruct int)
let file_opt =
let rep = Caqti_type.(tup4 (option fpath) (option fpath) (option cstruct) (option int)) in
let rep = Caqti_type.(tup3 (option fpath) (option cstruct) (option int)) in
let encode = function
| Some { filepath; localpath; sha256; size } ->
Ok (Some filepath, Some localpath, Some sha256, Some size)
| Some { filepath; sha256; size } ->
Ok (Some filepath, Some sha256, Some size)
| None ->
Ok (None, None, None, None)
Ok (None, None, None)
let decode = function
| (Some filepath, Some localpath, Some sha256, Some size) ->
Ok (Some { filepath; localpath; sha256; size })
| (None, None, None, None) ->
| (Some filepath, Some sha256, Some size) ->
Ok (Some { filepath; sha256; size })
| (None, None, None) ->
Ok None
| _ ->
(* This should not happen if the database is well-formed *)

View file

@ -211,9 +211,9 @@ module Viz_aux = struct
match typ with
| `Treemap ->
let debug_binary =
let bin = Fpath.base main_binary.localpath in
let bin = Fpath.base main_binary.filepath in
(fun p -> Fpath.(equal (bin + "debug") (base p.localpath)))
(fun p -> Fpath.(equal (bin + "debug") (base p.filepath)))
@ -226,7 +226,7 @@ module Viz_aux = struct
| `Dependencies ->
let opam_switch =
(fun p -> Fpath.(equal (v "opam-switch") (base p.localpath)))
(fun p -> Fpath.(equal (v "opam-switch") (base p.filepath)))
Model.not_found opam_switch
@ -435,8 +435,8 @@ let routes ~datadir ~cachedir ~configdir ~expired_jobs =
| _ ->
Model.build_artifact_data datadir file
|> if_error "Error getting build artifact"
~log:(fun e -> Log.warn (fun m -> m "Error getting build artifact data for file %a in %a: %a"
Fpath.pp file.Builder_db.filepath Fpath.pp file.Builder_db.localpath
~log:(fun e -> Log.warn (fun m -> m "Error getting build artifact data for file %a: %a"
Fpath.pp file.Builder_db.filepath
pp_error e)) >>= fun data ->
let headers = [
"Content-Type", mime_lookup file.Builder_db.filepath;

View file

@ -67,7 +67,7 @@ let targz_response datadir finish (files : Builder_db.file list) (stream : Dream
Lwt_list.iter_s (fun file ->
let hdr = header_of_file finish file in
write_block hdr Fpath.(datadir // file.localpath) state)
write_block hdr Fpath.(datadir // Model.artifact_path file) state)
files >>= fun () ->
Writer.really_write state Tar.Header.zero_block >>= fun () ->
Writer.really_write state Tar.Header.zero_block >>= fun () ->

View file

@ -19,6 +19,15 @@ let not_found = function
| Some v -> Lwt_result.return v
let staging datadir = Fpath.(datadir / "_staging")
let artifacts_dir datadir = Fpath.(datadir / "_artifacts")
let artifact_path artifact =
let (`Hex sha256) = Hex.of_cstruct artifact.Builder_db.sha256 in
(* NOTE: [sha256] is 64 characters when it's a hex sha256 checksum *)
(* NOTE: We add the prefix to reduce the number of files in a directory - a
workaround for inferior filesystems. We can easily revert this by changing
this function and adding a migration. *)
let prefix = String.sub sha256 0 2 in
Fpath.(v "_artifacts" / prefix / sha256)
let read_file datadir filepath =
let filepath = Fpath.(datadir // filepath) in
@ -44,14 +53,14 @@ let build_artifact_by_id id (module Db : CONN) =
Db.find Builder_db.Build_artifact.get id
let build_artifact_data datadir file =
read_file datadir file.Builder_db.localpath
read_file datadir (artifact_path file)
let build_artifacts build (module Db : CONN) =
Db.collect_list Builder_db.Build_artifact.get_all_by_build build >|=
List.map snd
let solo5_manifest datadir file =
let buf = Owee_buf.map_binary Fpath.(to_string (datadir // file.Builder_db.localpath)) in
let buf = Owee_buf.map_binary Fpath.(to_string (datadir // artifact_path file)) in
Solo5_elftool.query_manifest buf |> Result.to_option
let platforms_of_job id (module Db : CONN) =
@ -196,11 +205,11 @@ let cleanup_staging datadir (module Db : Caqti_lwt.CONNECTION) =
(cleanup_staged staged))
let save file data =
let save path data =
let open Lwt.Infix in
(fun () ->
Lwt_io.open_file ~mode:Lwt_io.Output (Fpath.to_string file) >>= fun oc ->
Lwt_io.open_file ~mode:Lwt_io.Output (Fpath.to_string path) >>= fun oc ->
Lwt_io.write oc data >>= fun () ->
Lwt_io.close oc
|> Lwt_result.ok)
@ -209,33 +218,30 @@ let save file data =
Lwt_result.fail (`Msg (Unix.error_message e))
| e -> Lwt.fail e)
let save_file dir staging (filepath, data) =
let size = String.length data in
let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) in
let localpath = Fpath.append dir filepath in
let destpath = Fpath.append staging filepath in
Lwt_result.lift (Bos.OS.Dir.create (Fpath.parent destpath)) >>= fun _ ->
save destpath data >|= fun () ->
{ Builder_db.filepath; localpath; sha256; size }
let save_files dir staging files =
let save_artifacts staging artifacts =
(fun r file ->
r >>= fun acc ->
save_file dir staging file >>= fun file ->
Lwt_result.return (file :: acc))
(Lwt_result.return [])
(fun r (file, data) ->
r >>= fun () ->
let (`Hex sha256) = Hex.of_cstruct file.Builder_db.sha256 in
let destpath = Fpath.(staging / sha256) in
save destpath data)
(Lwt_result.return ())
let save_all staging_dir (job : Builder.script_job) uuid artifacts =
let build_dir = Fpath.(v job.Builder.name / Uuidm.to_string uuid) in
let output_dir = Fpath.(build_dir / "output")
and staging_output_dir = Fpath.(staging_dir / "output") in
Lwt.return (Bos.OS.Dir.create staging_output_dir) >>= fun _ ->
save_files output_dir staging_output_dir artifacts >>= fun artifacts ->
Lwt_result.return artifacts
let commit_files datadir staging_dir job_name uuid =
let commit_files datadir staging_dir job_name uuid artifacts =
(* First we move the artifacts *)
Lwt.return (Bos.OS.Dir.create (artifacts_dir datadir)) >>= fun _ ->
(fun r artifact ->
r >>= fun () ->
let (`Hex sha256) = Hex.of_cstruct artifact.Builder_db.sha256 in
let src = Fpath.(staging_dir / sha256) in
let dest = Fpath.(datadir // artifact_path artifact) in
Lwt.return (Bos.OS.Dir.create (Fpath.parent dest)) >>= fun _created ->
Lwt.return (Bos.OS.Path.move ~force:true src dest))
(Lwt_result.return ())
artifacts >>= fun () ->
(* Now the staging dir only contains script & console *)
let job_dir = Fpath.(datadir / job_name) in
let dest = Fpath.(job_dir / Uuidm.to_string uuid) in
Lwt.return (Bos.OS.Dir.create job_dir) >>= fun _ ->
@ -324,6 +330,25 @@ let prepare_staging staging_dir =
then Lwt_result.fail (`Msg "build directory already exists")
else Lwt_result.return ()
(* saving:
- for each artifact compute its sha256 checksum -- calling Lwt.pause in
- lookup artifact sha256 in the database and filter them out of the list: not_in_db
- mkdir -p _staging/uuid/
- save console & script to _staging/uuid/
- save each artifact in not_in_db as _staging/uuid/sha256
- for each artifact mv _staging/uuid/sha256 _artifacts/sha256
(or _artifacts/prefix(sha256)/sha256 where prefix(sha256) is the first two hex digits in sha256)
- now _staging/uuid only contains console & script so we mv _staging/uuid _staging/job/uuid
potential issues:
- race condition in uploading same artifact:
* if the artifact already exists in the database and thus filesystem then nothing is done
* if the artifact is added to the database and/or filesystem we atomically overwrite it
- input_id depends on a sort order?
let add_build
@ -344,16 +369,35 @@ let add_build
let artifacts_to_preserve =
let not_interesting p =
String.equal (Fpath.basename p) "README.md" || String.equal (Fpath.get_ext p) ".build-hashes"
List.filter (fun (p, _) -> not (not_interesting p)) raw_artifacts
(fun r (filepath, data) ->
r >>= fun acc ->
if not_interesting filepath then
Lwt_result.return acc
let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data)
and size = String.length data in
Lwt_result.ok (Lwt.pause ()) >|= fun () ->
({ filepath; sha256; size }, data) :: acc)
(Lwt_result.return [])
end >>= fun artifacts ->
or_cleanup (prepare_staging staging_dir) >>= fun () ->
or_cleanup (save_console_and_script staging_dir job_name uuid console job.Builder.script)
>>= fun (console, script) ->
or_cleanup (save_all staging_dir job uuid artifacts_to_preserve) >>= fun artifacts ->
(fun r ((f, _) as artifact) ->
r >>= fun acc ->
Db.find Builder_db.Build_artifact.exists f.sha256 >|= fun exists ->
if exists then acc else artifact :: acc)
(Lwt_result.return [])
artifacts >>= fun artifacts_to_save ->
or_cleanup (save_artifacts staging_dir artifacts_to_save) >>= fun () ->
let artifacts = List.map fst artifacts in
let r =
Db.start () >>= fun () ->
Db.exec Job.try_add job_name >>= fun () ->
@ -422,7 +466,7 @@ let add_build
(Lwt_result.return ())
remaining_artifacts_to_add >>= fun () ->
Db.commit () >>= fun () ->
commit_files datadir staging_dir job_name uuid >|= fun () ->
commit_files datadir staging_dir job_name uuid artifacts >|= fun () ->
Lwt_result.bind_lwt_error (or_cleanup r)
@ -451,7 +495,7 @@ let add_build
"--uuid=" ^ uuid ; "--platform=" ^ platform ;
"--cache-dir=" ^ Fpath.to_string cachedir ;
"--data-dir=" ^ Fpath.to_string datadir ;
fp_str main_binary.localpath ])
fp_str Fpath.(datadir // artifact_path main_binary) ])
Log.debug (fun m -> m "executing hooks with %s" args);
let dir = Fpath.(configdir / "upload-hooks") in

View file

@ -5,6 +5,8 @@ val pp_error : Format.formatter -> error -> unit
val not_found : 'a option -> ('a, [> `Not_found ]) result Lwt.t
val staging : Fpath.t -> Fpath.t
val artifacts_dir : Fpath.t -> Fpath.t
val artifact_path : Builder_db.file -> Fpath.t
val cleanup_staging : Fpath.t -> Caqti_lwt.connection ->
(unit, [> `Msg of string ]) result Lwt.t

View file

@ -188,7 +188,7 @@ let artifact
~file:{ Builder_db.filepath; localpath = _; sha256; size }
~file:{ Builder_db.filepath; sha256; size }
let artifact_link =