Reynir Björnsson
f666c9d0d1
Give hints what to do in case the database file does not exist, or when the database is not of the expected version. Addresses #82
252 lines
7.9 KiB
OCaml
252 lines
7.9 KiB
OCaml
open Lwt.Infix
|
|
|
|
let safe_close fd =
|
|
Lwt.catch
|
|
(fun () -> Lwt_unix.close fd)
|
|
(fun _ -> Lwt.return_unit)
|
|
|
|
let connect addrtype sockaddr =
|
|
let c = Lwt_unix.(socket addrtype SOCK_STREAM 0) in
|
|
Lwt_unix.set_close_on_exec c ;
|
|
Lwt.catch (fun () ->
|
|
Lwt_unix.(connect c sockaddr) >|= fun () ->
|
|
Some c)
|
|
(fun e ->
|
|
Logs.warn (fun m -> m "error %s connecting to influx"
|
|
(Printexc.to_string e));
|
|
safe_close c >|= fun () ->
|
|
None)
|
|
|
|
let write_raw s buf =
|
|
let rec w off l =
|
|
Lwt.catch (fun () ->
|
|
Lwt_unix.send s buf off l [] >>= fun n ->
|
|
if n = l then
|
|
Lwt.return (Ok ())
|
|
else
|
|
w (off + n) (l - n))
|
|
(fun e ->
|
|
Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ;
|
|
safe_close s >|= fun () ->
|
|
Error `Exception)
|
|
in
|
|
(* Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; *)
|
|
w 0 (Bytes.length buf)
|
|
|
|
let process =
|
|
Metrics.field ~doc:"name of the process" "vm" Metrics.String
|
|
|
|
let init_influx name data =
|
|
match data with
|
|
| None -> ()
|
|
| Some (ip, port) ->
|
|
Logs.info (fun m -> m "stats connecting to %a:%d" Ipaddr.V4.pp ip port);
|
|
Metrics.enable_all ();
|
|
Metrics_lwt.init_periodic (fun () -> Lwt_unix.sleep 10.);
|
|
Metrics_lwt.periodically (Metrics_rusage.rusage_src ~tags:[]);
|
|
Metrics_lwt.periodically (Metrics_rusage.kinfo_mem_src ~tags:[]);
|
|
let get_cache, reporter = Metrics.cache_reporter () in
|
|
Metrics.set_reporter reporter;
|
|
let fd = ref None in
|
|
let rec report () =
|
|
let send () =
|
|
(match !fd with
|
|
| Some _ -> Lwt.return_unit
|
|
| None ->
|
|
let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr ip, port) in
|
|
connect Lwt_unix.PF_INET addr >|= function
|
|
| None -> Logs.err (fun m -> m "connection failure to stats")
|
|
| Some fd' -> fd := Some fd') >>= fun () ->
|
|
match !fd with
|
|
| None -> Lwt.return_unit
|
|
| Some socket ->
|
|
let tag = process name in
|
|
let datas = Metrics.SM.fold (fun src (tags, data) acc ->
|
|
let name = Metrics.Src.name src in
|
|
Metrics_influx.encode_line_protocol (tag :: tags) data name :: acc)
|
|
(get_cache ()) []
|
|
in
|
|
let datas = String.concat "" datas in
|
|
write_raw socket (Bytes.unsafe_of_string datas) >|= function
|
|
| Ok () -> ()
|
|
| Error `Exception ->
|
|
Logs.warn (fun m -> m "error on stats write");
|
|
fd := None
|
|
and sleep () = Lwt_unix.sleep 10.
|
|
in
|
|
Lwt.join [ send () ; sleep () ] >>= report
|
|
in
|
|
Lwt.async report
|
|
|
|
let run_batch_viz ~cachedir ~datadir ~configdir =
|
|
let open Rresult.R.Infix in
|
|
begin
|
|
let script = Fpath.(configdir / "batch-viz.sh")
|
|
and script_log = Fpath.(cachedir / "batch-viz.log")
|
|
and viz_script = Fpath.(configdir / "upload-hooks" / "visualizations.sh")
|
|
in
|
|
Bos.OS.File.exists script >>= fun script_exists ->
|
|
if not script_exists then begin
|
|
Logs.warn (fun m -> m "Didn't find %s" (Fpath.to_string script));
|
|
Ok ()
|
|
end else
|
|
let args =
|
|
[ "--cache-dir=" ^ Fpath.to_string cachedir;
|
|
"--data-dir=" ^ Fpath.to_string datadir;
|
|
"--viz-script=" ^ Fpath.to_string viz_script ]
|
|
|> List.map (fun s -> "\"" ^ String.escaped s ^ "\"")
|
|
|> String.concat " "
|
|
in
|
|
(*> Note: The reason for appending, is that else a new startup could
|
|
overwrite an existing running batch's log*)
|
|
(Fpath.to_string script ^ " " ^ args
|
|
^ " 2>&1 >> " ^ Fpath.to_string script_log
|
|
^ " &")
|
|
|> Sys.command
|
|
|> ignore
|
|
|> Result.ok
|
|
end
|
|
|> function
|
|
| Ok () -> ()
|
|
| Error err ->
|
|
Logs.warn (fun m ->
|
|
m "Error while starting batch-viz.sh: %a"
|
|
Rresult.R.pp_msg err)
|
|
|
|
let setup_app level influx port host datadir cachedir configdir run_batch_viz_flag =
|
|
let dbpath = Printf.sprintf "%s/builder.sqlite3" datadir in
|
|
let datadir = Fpath.v datadir in
|
|
let cachedir =
|
|
cachedir |> Option.fold ~none:Fpath.(datadir / "_cache") ~some:Fpath.v
|
|
in
|
|
let configdir = Fpath.v configdir in
|
|
let () = init_influx "builder-web" influx in
|
|
let () =
|
|
if run_batch_viz_flag then
|
|
run_batch_viz ~cachedir ~datadir ~configdir
|
|
in
|
|
match Builder_web.init dbpath datadir with
|
|
| exception Sqlite3.Error e ->
|
|
Format.eprintf "Error: @[@,%s.\
|
|
@,Does the database file exist? Create with `builder-db migrate`.@]\n%!"
|
|
e;
|
|
exit 2
|
|
| Error (#Caqti_error.load as e) ->
|
|
Format.eprintf "Error: %a\n%!" Caqti_error.pp e;
|
|
exit 2
|
|
| Error (`Wrong_version _ as e) ->
|
|
Format.eprintf "Error: @[@,%a.\
|
|
@,Migrate database version with `builder-migrations`,\
|
|
@,or start with a fresh database with `builder-db migrate`.@]\n%!"
|
|
Builder_web.pp_error e;
|
|
| Error (
|
|
#Caqti_error.connect
|
|
| #Caqti_error.call_or_retrieve
|
|
| `Msg _ as e
|
|
) ->
|
|
Format.eprintf "Error: %a\n%!" Builder_web.pp_error e;
|
|
exit 1
|
|
| Ok () ->
|
|
let level = match level with
|
|
| None -> None
|
|
| Some Logs.Debug -> Some `Debug
|
|
| Some Info -> Some `Info
|
|
| Some Warning -> Some `Warning
|
|
| Some Error -> Some `Error
|
|
| Some App -> None
|
|
in
|
|
let error_handler = Dream.error_template Builder_web.error_template in
|
|
Dream.initialize_log ?level ();
|
|
let dream_routes = Builder_web.(
|
|
routes ~datadir ~cachedir ~configdir
|
|
|> to_dream_routes
|
|
)
|
|
in
|
|
Dream.run ~port ~interface:host ~tls:false ~error_handler
|
|
@@ Dream.logger
|
|
@@ Dream.sql_pool ("sqlite3:" ^ dbpath)
|
|
@@ Http_status_metrics.handle
|
|
@@ Builder_web.Middleware.remove_trailing_url_slash
|
|
@@ Dream.router dream_routes
|
|
|
|
open Cmdliner
|
|
|
|
let ip_port : (Ipaddr.V4.t * int) Arg.conv =
|
|
let default_port = 8094 in
|
|
let parse s =
|
|
match
|
|
match String.split_on_char ':' s with
|
|
| [ s ] -> Ok (s, default_port)
|
|
| [ip; port] -> begin match int_of_string port with
|
|
| exception Failure _ -> Error "non-numeric port"
|
|
| port -> Ok (ip, port)
|
|
end
|
|
| _ -> Error "multiple : found"
|
|
with
|
|
| Error msg -> Error (`Msg msg)
|
|
| Ok (ip, port) -> match Ipaddr.V4.of_string ip with
|
|
| Ok ip -> Ok (ip, port)
|
|
| Error `Msg msg -> Error (`Msg msg)
|
|
in
|
|
let printer ppf (ip, port) =
|
|
Format.fprintf ppf "%a:%d" Ipaddr.V4.pp ip port in
|
|
Arg.conv (parse, printer)
|
|
|
|
let datadir =
|
|
let doc = "data directory" in
|
|
let docv = "DATA_DIR" in
|
|
Arg.(
|
|
value &
|
|
opt dir Builder_system.default_datadir &
|
|
info [ "d"; "datadir" ] ~doc ~docv
|
|
)
|
|
|
|
let cachedir =
|
|
let doc = "cache directory" in
|
|
let docv = "CACHE_DIR" in
|
|
Arg.(
|
|
value
|
|
& opt (some ~none:"DATADIR/_cache" dir) None
|
|
& info [ "cachedir" ] ~doc ~docv)
|
|
|
|
let configdir =
|
|
let doc = "config directory" in
|
|
let docv = "CONFIG_DIR" in
|
|
Arg.(
|
|
value &
|
|
opt dir Builder_system.default_configdir &
|
|
info [ "c"; "configdir" ] ~doc ~docv)
|
|
|
|
let port =
|
|
let doc = "port" in
|
|
Arg.(value & opt int 3000 & info [ "p"; "port" ] ~doc)
|
|
|
|
let host =
|
|
let doc = "host" in
|
|
Arg.(value & opt string "0.0.0.0" & info [ "h"; "host" ] ~doc)
|
|
|
|
let influx =
|
|
let doc = "IP address and port (default: 8094) to report metrics to \
|
|
influx line protocol" in
|
|
Arg.(
|
|
value &
|
|
opt (some ip_port) None &
|
|
info [ "influx" ] ~doc ~docv:"INFLUXHOST[:PORT]")
|
|
|
|
let run_batch_viz =
|
|
let doc = "Run CONFIG_DIR/batch-viz.sh on startup. \
|
|
Note that this is started in the background - so the user \
|
|
is in charge of not running several instances of this. A \
|
|
log is written to CACHE_DIR/batch-viz.log" in
|
|
Arg.(value & flag & info [ "run-batch-viz" ] ~doc)
|
|
|
|
|
|
let () =
|
|
let term =
|
|
Term.(const setup_app $ Logs_cli.level () $ influx $ port $ host $ datadir $
|
|
cachedir $ configdir $ run_batch_viz)
|
|
in
|
|
let info = Cmd.info "Builder web" ~doc:"Builder web" ~man:[] in
|
|
Cmd.v info term
|
|
|> Cmd.eval
|
|
|> exit
|