builder-web/bin/migrations/m20210712c.ml
2021-09-07 09:35:26 +00:00

220 lines
8.2 KiB
OCaml

let new_version = 14L and old_version = 13L
and identifier = "2021-07-12c"
and migrate_doc = "store script, console on disk"
and rollback_doc = "store script, console in database"
module Asn = struct
let decode_strict codec cs =
match Asn.decode codec cs with
| Ok (a, cs) ->
if Cstruct.len cs = 0
then Ok a
else Error "trailing bytes"
| Error (`Parse msg) -> Error ("parse error: " ^ msg)
let projections_of asn =
let c = Asn.codec Asn.der asn in
(decode_strict c, Asn.encode c)
let console =
Asn.S.(sequence_of
(sequence2
(required ~label:"delta" int)
(required ~label:"data" utf8_string)))
let console_of_cs, console_to_cs = projections_of console
end
let new_build =
Caqti_request.exec
Caqti_type.unit
{| CREATE TABLE new_build (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
uuid VARCHAR(36) NOT NULL UNIQUE,
start_d INTEGER NOT NULL,
start_ps INTEGER NOT NULL,
finish_d INTEGER NOT NULL,
finish_ps INTEGER NOT NULL,
result_code INTEGER NOT NULL,
result_msg TEXT,
console TEXT NOT NULL,
script TEXT NOT NULL,
main_binary INTEGER,
user INTEGER NOT NULL,
job INTEGER NOT NULL,
input_id BLOB, -- sha256 (sha256<opam-switch> || sha256<build-environment> || sha256<system-packages>)
FOREIGN KEY(main_binary) REFERENCES build_artifact(id) DEFERRABLE INITIALLY DEFERRED,
FOREIGN KEY(user) REFERENCES user(id),
FOREIGN KEY(job) REFERENCES job(id)
)
|}
let old_build =
Caqti_request.exec
Caqti_type.unit
{| CREATE TABLE new_build (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
uuid VARCHAR(36) NOT NULL UNIQUE,
start_d INTEGER NOT NULL,
start_ps INTEGER NOT NULL,
finish_d INTEGER NOT NULL,
finish_ps INTEGER NOT NULL,
result_code INTEGER NOT NULL,
result_msg TEXT,
console BLOB NOT NULL,
script TEXT NOT NULL,
main_binary INTEGER,
user INTEGER NOT NULL,
job INTEGER NOT NULL,
input_id BLOB, -- sha256 (sha256<opam-switch> || sha256<build-environment> || sha256<system-packages>)
FOREIGN KEY(main_binary) REFERENCES build_artifact(id) DEFERRABLE INITIALLY DEFERRED,
FOREIGN KEY(user) REFERENCES user(id),
FOREIGN KEY(job) REFERENCES job(id)
)
|}
let copy_from_old_build =
Caqti_request.exec
Caqti_type.unit
{| INSERT INTO new_build(id, uuid, start_d, start_ps, finish_d, finish_ps,
result_kind, result_msg, console, script, main_binary, user, job, input_id)
SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, result_code, result_msg,
'', '', main_binary, user, job, input_id
FROM build
|}
let copy_from_new_build =
Caqti_request.exec
Caqti_type.unit
{| INSERT INTO new_build(id, uuid, start_d, start_ps, finish_d, finish_ps,
result_kind, result_msg, console, script, main_binary, user, job, input_id)
SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, result_code, result_msg,
x'', '', main_binary, user, job, input_id
FROM build
|}
let old_build_console_script =
Caqti_request.collect
Caqti_type.unit
Caqti_type.(tup4 (Builder_db.Rep.id (`build : [ `build ]))
(tup2 string Builder_db.Rep.uuid) octets string)
"SELECT b.id, job.name, b.uuid, b.console, b.script FROM build b, job WHERE b.job = job.id"
let update_new_build_console_script =
Caqti_request.exec
Caqti_type.(tup3 (Builder_db.Rep.id (`build : [ `build ])) Builder_db.Rep.fpath Builder_db.Rep.fpath)
"UPDATE new_build SET console = ?2, script = ?3 WHERE id = ?1"
let new_build_console_script =
Caqti_request.collect
Caqti_type.unit
Caqti_type.(tup3 (Builder_db.Rep.id (`build : [ `build ])) Builder_db.Rep.fpath Builder_db.Rep.fpath)
"SELECT id, console, script FROM build"
let update_old_build_console_script =
Caqti_request.exec
Caqti_type.(tup3 (Builder_db.Rep.id (`build : [ `build ])) octets string)
"UPDATE new_build SET console = ?2, script = ?3 WHERE id = ?1"
let drop_build =
Caqti_request.exec
Caqti_type.unit
"DROP TABLE build"
let rename_build =
Caqti_request.exec
Caqti_type.unit
"ALTER TABLE new_build RENAME TO build"
let console_to_string console =
Asn.console_of_cs console >>| fun console ->
List.map (fun (delta, data) ->
Printf.sprintf "%.3fs:%s\n" (Duration.to_f delta) data)
console
|> String.concat ""
let console_of_string data =
let lines = String.split_on_char '\n' data in
let console = List.map (fun line ->
match String.split_on_char ':' line with
| ts :: tail ->
let delta = float_of_string (String.sub ts 0 (String.length ts - 1)) in
Duration.of_f delta, String.concat ":" tail
| _ -> assert false)
lines
in
Asn.console_to_cs console
let save_console_and_script datadir job_name uuid console script =
let out name = Fpath.(datadir / job_name / Uuidm.to_string uuid / name + "txt") in
let script_out = out "script" in
Bos.OS.File.write script_out script >>= fun () ->
let console_out = out "console" in
let console_data = console_to_string console in
Bos.OS.File.write console_out console_data >>= fun () ->
Ok (console_out, script_out)
let read_console_and_script datadir console_file script_file =
let console_file = Fpath.append datadir console_file
and script_file = Fpath.append datadir script_file
in
Bos.OS.File.read console_file >>= fun console ->
Bos.OS.File.read script_fle >>= fun script ->
let console = console_of_string console in
Bos.OS.File.delete console_file >>= fun () ->
Bos.OS.File.delete script_file >>= fun () ->
Ok (console, script)
open Rresult.R.Infix
let migrate datadir (module Db : Caqti_blocking.CONNECTION) =
Grej.check_version ~user_version:old_version (module Db) >>= fun () ->
Db.exec new_build () >>= fun () ->
Db.exec copy_from_old_build () >>= fun () ->
Db.collect_list old_build_console_script () >>= fun console_scripts ->
Grej.list_iter_result (fun (id, (job_name, uuid), console, script) ->
save_console_and_script datadir job_name uuid console script >>= fun (console_file, script_file) ->
Db.exec update_new_build_console_script (id, console_file, script_file))
console_scripts >>= fun () ->
Db.exec drop_build () >>= fun () ->
Db.exec rename_build () >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_job_start ON build(job, start_d DESC, start_ps DESC)")
() >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_failed ON build(job, start_d DESC, start_ps DESC) WHERE result_code <> 0")
() >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_input_id ON build(input_id)")
() >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_main_binary ON build(main_binary)")
() >>= fun () ->
Db.exec (Grej.set_version new_version) ()
let rollback datadir (module Db : Caqti_blocking.CONNECTION) =
Grej.check_version ~user_version:new_version (module Db) >>= fun () ->
Db.exec old_build () >>= fun () ->
Db.exec copy_from_new_build () >>= fun () ->
Db.collect_list new_build_console_script () >>= fun console_scripts ->
Grej.list_iter_result (fun (id, console_file, script_file) ->
read_console_and_script datadir console_file script_file >>= fun (console, script) ->
Db.exec update_old_build_console_script (id, console, script))
console_scripts >>= fun () ->
Db.exec drop_build () >>= fun () ->
Db.exec rename_build () >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_job_start ON build(job, start_d DESC, start_ps DESC)")
() >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_failed ON build(job, start_d DESC, start_ps DESC) WHERE result_code <> 0")
() >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_input_id ON build(input_id)")
() >>= fun () ->
Db.exec (Caqti_request.exec Caqti_type.unit
"CREATE INDEX idx_build_main_binary ON build(main_binary)")
() >>= fun () ->
Db.exec (Grej.set_version old_version) ()