let new_version = 14L and old_version = 13L and identifier = "2021-07-12c" and migrate_doc = "store script, console on disk" and rollback_doc = "store script, console in database" open Rresult.R.Infix module Asn = struct let decode_strict codec cs = match Asn.decode codec cs with | Ok (a, cs) -> if Cstruct.length cs = 0 then Ok a else Error "trailing bytes" | Error (`Parse msg) -> Error ("parse error: " ^ msg) let projections_of asn = let c = Asn.codec Asn.der asn in (decode_strict c, Asn.encode c) let console = Asn.S.(sequence_of (sequence2 (required ~label:"delta" int) (required ~label:"data" utf8_string))) let console_of_cs, console_to_cs = projections_of console end let new_build = Caqti_request.exec Caqti_type.unit {| CREATE TABLE new_build ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, uuid VARCHAR(36) NOT NULL UNIQUE, start_d INTEGER NOT NULL, start_ps INTEGER NOT NULL, finish_d INTEGER NOT NULL, finish_ps INTEGER NOT NULL, result_code INTEGER NOT NULL, result_msg TEXT, console TEXT NOT NULL, script TEXT NOT NULL, main_binary INTEGER, user INTEGER NOT NULL, job INTEGER NOT NULL, input_id BLOB, -- sha256 (sha256 || sha256 || sha256) FOREIGN KEY(main_binary) REFERENCES build_artifact(id) DEFERRABLE INITIALLY DEFERRED, FOREIGN KEY(user) REFERENCES user(id), FOREIGN KEY(job) REFERENCES job(id) ) |} let old_build = Caqti_request.exec Caqti_type.unit {| CREATE TABLE new_build ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, uuid VARCHAR(36) NOT NULL UNIQUE, start_d INTEGER NOT NULL, start_ps INTEGER NOT NULL, finish_d INTEGER NOT NULL, finish_ps INTEGER NOT NULL, result_code INTEGER NOT NULL, result_msg TEXT, console BLOB NOT NULL, script TEXT NOT NULL, main_binary INTEGER, user INTEGER NOT NULL, job INTEGER NOT NULL, input_id BLOB, -- sha256 (sha256 || sha256 || sha256) FOREIGN KEY(main_binary) REFERENCES build_artifact(id) DEFERRABLE INITIALLY DEFERRED, FOREIGN KEY(user) REFERENCES user(id), FOREIGN KEY(job) REFERENCES job(id) ) |} let copy_from_old_build = Caqti_request.exec Caqti_type.unit {| INSERT INTO new_build(id, uuid, start_d, start_ps, finish_d, finish_ps, result_code, result_msg, console, script, main_binary, user, job, input_id) SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, result_code, result_msg, '', '', main_binary, user, job, input_id FROM build |} let copy_from_new_build = Caqti_request.exec Caqti_type.unit {| INSERT INTO new_build(id, uuid, start_d, start_ps, finish_d, finish_ps, result_code, result_msg, console, script, main_binary, user, job, input_id) SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, result_code, result_msg, x'', '', main_binary, user, job, input_id FROM build |} let old_build_console_script = Caqti_request.collect Caqti_type.unit Caqti_type.(tup4 (Builder_db.Rep.id (`build : [ `build ])) (tup2 string Builder_db.Rep.uuid) Builder_db.Rep.cstruct string) "SELECT b.id, job.name, b.uuid, b.console, b.script FROM build b, job WHERE b.job = job.id" let update_new_build_console_script = Caqti_request.exec Caqti_type.(tup3 (Builder_db.Rep.id (`build : [ `build ])) Builder_db.Rep.fpath Builder_db.Rep.fpath) "UPDATE new_build SET console = ?2, script = ?3 WHERE id = ?1" let new_build_console_script = Caqti_request.collect Caqti_type.unit Caqti_type.(tup3 (Builder_db.Rep.id (`build : [ `build ])) Builder_db.Rep.fpath Builder_db.Rep.fpath) "SELECT id, console, script FROM build" let update_old_build_console_script = Caqti_request.exec Caqti_type.(tup3 (Builder_db.Rep.id (`build : [ `build ])) Builder_db.Rep.cstruct string) "UPDATE new_build SET console = ?2, script = ?3 WHERE id = ?1" let drop_build = Caqti_request.exec Caqti_type.unit "DROP TABLE build" let rename_build = Caqti_request.exec Caqti_type.unit "ALTER TABLE new_build RENAME TO build" let console_to_string console = Asn.console_of_cs console |> Rresult.R.reword_error (fun s -> `Msg s) >>| fun console -> List.rev_map (fun (delta, data) -> Printf.sprintf "%.3fs:%s\n" (Duration.to_f (Int64.of_int delta)) data) console |> String.concat "" let console_of_string data = let lines = String.split_on_char '\n' data in (* remove last empty line *) let lines = match List.rev lines with | "" :: lines -> List.rev lines | _ -> lines in let console = List.map (fun line -> match String.split_on_char ':' line with | ts :: tail -> let delta = float_of_string (String.sub ts 0 (String.length ts - 1)) in Int64.to_int (Duration.of_f delta), String.concat ":" tail | _ -> assert false) lines in Asn.console_to_cs console let save_console_and_script datadir job_name uuid console script = let out name = Fpath.(v job_name / Uuidm.to_string uuid / name + "txt") in let script_out = out "script" in Bos.OS.File.write Fpath.(datadir // script_out) script >>= fun () -> let console_out = out "console" in console_to_string console >>= fun console_data -> Bos.OS.File.write Fpath.(datadir // console_out) console_data >>= fun () -> Ok (console_out, script_out) let read_console_and_script datadir console_file script_file = let console_file = Fpath.append datadir console_file and script_file = Fpath.append datadir script_file in Bos.OS.File.read console_file >>= fun console -> Bos.OS.File.read script_file >>= fun script -> let console = console_of_string console in Bos.OS.File.delete console_file >>= fun () -> Bos.OS.File.delete script_file >>= fun () -> Ok (console, script) open Rresult.R.Infix let migrate datadir (module Db : Caqti_blocking.CONNECTION) = Grej.check_version ~user_version:old_version (module Db) >>= fun () -> Db.exec new_build () >>= fun () -> Db.exec copy_from_old_build () >>= fun () -> Db.collect_list old_build_console_script () >>= fun console_scripts -> Grej.list_iter_result (fun (id, (job_name, uuid), console, script) -> save_console_and_script datadir job_name uuid console script >>= fun (console_file, script_file) -> Db.exec update_new_build_console_script (id, console_file, script_file)) console_scripts >>= fun () -> Db.exec drop_build () >>= fun () -> Db.exec rename_build () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_job_start ON build(job, start_d DESC, start_ps DESC)") () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_failed ON build(job, start_d DESC, start_ps DESC) WHERE result_code <> 0") () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_input_id ON build(input_id)") () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_main_binary ON build(main_binary)") () >>= fun () -> Db.exec (Grej.set_version new_version) () let rollback datadir (module Db : Caqti_blocking.CONNECTION) = Grej.check_version ~user_version:new_version (module Db) >>= fun () -> Db.exec old_build () >>= fun () -> Db.exec copy_from_new_build () >>= fun () -> Db.collect_list new_build_console_script () >>= fun console_scripts -> Grej.list_iter_result (fun (id, console_file, script_file) -> read_console_and_script datadir console_file script_file >>= fun (console, script) -> Db.exec update_old_build_console_script (id, console, script)) console_scripts >>= fun () -> Db.exec drop_build () >>= fun () -> Db.exec rename_build () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_job_start ON build(job, start_d DESC, start_ps DESC)") () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_failed ON build(job, start_d DESC, start_ps DESC) WHERE result_code <> 0") () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_input_id ON build(input_id)") () >>= fun () -> Db.exec (Caqti_request.exec Caqti_type.unit "CREATE INDEX idx_build_main_binary ON build(main_binary)") () >>= fun () -> Db.exec (Grej.set_version old_version) ()