Add influx metrics reporting
This commit is contained in:
parent
f7eafc56c5
commit
8a8dd9a14b
3 changed files with 112 additions and 3 deletions
|
@ -1,5 +1,85 @@
|
||||||
open Opium
|
open Opium
|
||||||
|
|
||||||
|
open Lwt.Infix
|
||||||
|
|
||||||
|
let safe_close fd =
|
||||||
|
Lwt.catch
|
||||||
|
(fun () -> Lwt_unix.close fd)
|
||||||
|
(fun _ -> Lwt.return_unit)
|
||||||
|
|
||||||
|
let connect addrtype sockaddr =
|
||||||
|
let c = Lwt_unix.(socket addrtype SOCK_STREAM 0) in
|
||||||
|
Lwt_unix.set_close_on_exec c ;
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
Lwt_unix.(connect c sockaddr) >|= fun () ->
|
||||||
|
Some c)
|
||||||
|
(fun e ->
|
||||||
|
Logs.warn (fun m -> m "error %s connecting to influx"
|
||||||
|
(Printexc.to_string e));
|
||||||
|
safe_close c >|= fun () ->
|
||||||
|
None)
|
||||||
|
|
||||||
|
let write_raw s buf =
|
||||||
|
let rec w off l =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
Lwt_unix.send s buf off l [] >>= fun n ->
|
||||||
|
if n = l then
|
||||||
|
Lwt.return (Ok ())
|
||||||
|
else
|
||||||
|
w (off + n) (l - n))
|
||||||
|
(fun e ->
|
||||||
|
Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ;
|
||||||
|
safe_close s >|= fun () ->
|
||||||
|
Error `Exception)
|
||||||
|
in
|
||||||
|
(* Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; *)
|
||||||
|
w 0 (Bytes.length buf)
|
||||||
|
|
||||||
|
let process =
|
||||||
|
Metrics.field ~doc:"name of the process" "vm" Metrics.String
|
||||||
|
|
||||||
|
let init_influx name data =
|
||||||
|
match data with
|
||||||
|
| None -> ()
|
||||||
|
| Some (ip, port) ->
|
||||||
|
Logs.info (fun m -> m "stats connecting to %a:%d" Ipaddr.V4.pp ip port);
|
||||||
|
Metrics.enable_all ();
|
||||||
|
Metrics_lwt.init_periodic (fun () -> Lwt_unix.sleep 10.);
|
||||||
|
Metrics_lwt.periodically (Metrics_rusage.rusage_src ~tags:[]);
|
||||||
|
Metrics_lwt.periodically (Metrics_rusage.kinfo_mem_src ~tags:[]);
|
||||||
|
let get_cache, reporter = Metrics.cache_reporter () in
|
||||||
|
Metrics.set_reporter reporter;
|
||||||
|
let fd = ref None in
|
||||||
|
let rec report () =
|
||||||
|
let send () =
|
||||||
|
(match !fd with
|
||||||
|
| Some _ -> Lwt.return_unit
|
||||||
|
| None ->
|
||||||
|
let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr ip, port) in
|
||||||
|
connect Lwt_unix.PF_INET addr >|= function
|
||||||
|
| None -> Logs.err (fun m -> m "connection failure to stats")
|
||||||
|
| Some fd' -> fd := Some fd') >>= fun () ->
|
||||||
|
match !fd with
|
||||||
|
| None -> Lwt.return_unit
|
||||||
|
| Some socket ->
|
||||||
|
let tag = process name in
|
||||||
|
let datas = Metrics.SM.fold (fun src (tags, data) acc ->
|
||||||
|
let name = Metrics.Src.name src in
|
||||||
|
Metrics_influx.encode_line_protocol (tag :: tags) data name :: acc)
|
||||||
|
(get_cache ()) []
|
||||||
|
in
|
||||||
|
let datas = String.concat "" datas in
|
||||||
|
write_raw socket (Bytes.unsafe_of_string datas) >|= function
|
||||||
|
| Ok () -> ()
|
||||||
|
| Error `Exception ->
|
||||||
|
Logs.warn (fun m -> m "error on stats write");
|
||||||
|
fd := None
|
||||||
|
and sleep () = Lwt_unix.sleep 10.
|
||||||
|
in
|
||||||
|
Lwt.join [ send () ; sleep () ] >>= report
|
||||||
|
in
|
||||||
|
Lwt.async report
|
||||||
|
|
||||||
let timestamp_reporter () =
|
let timestamp_reporter () =
|
||||||
let report src level ~over k msgf =
|
let report src level ~over k msgf =
|
||||||
let k _ = over (); k () in
|
let k _ = over (); k () in
|
||||||
|
@ -23,9 +103,10 @@ let app t =
|
||||||
|> App.cmd_name "Builder Web"
|
|> App.cmd_name "Builder Web"
|
||||||
|> Builder_web.add_routes t
|
|> Builder_web.add_routes t
|
||||||
|
|
||||||
let setup_app () port host datadir =
|
let setup_app () influx port host datadir =
|
||||||
let dbpath = Printf.sprintf "%s/builder.sqlite3" datadir in
|
let dbpath = Printf.sprintf "%s/builder.sqlite3" datadir in
|
||||||
let datadir = Fpath.v datadir in
|
let datadir = Fpath.v datadir in
|
||||||
|
let () = init_influx "builder-web" influx in
|
||||||
match Builder_web.init dbpath datadir with
|
match Builder_web.init dbpath datadir with
|
||||||
| Error (#Caqti_error.load as e) ->
|
| Error (#Caqti_error.load as e) ->
|
||||||
Format.eprintf "Error: %a\n%!" Caqti_error.pp e;
|
Format.eprintf "Error: %a\n%!" Caqti_error.pp e;
|
||||||
|
@ -42,6 +123,25 @@ let setup_app () port host datadir =
|
||||||
|
|
||||||
open Cmdliner
|
open Cmdliner
|
||||||
|
|
||||||
|
let ip_port : (Ipaddr.V4.t * int) Arg.converter =
|
||||||
|
let default_port = 8094 in
|
||||||
|
let parse s =
|
||||||
|
match
|
||||||
|
match String.split_on_char ':' s with
|
||||||
|
| [ s ] -> Ok (s, default_port)
|
||||||
|
| [ip; port] -> begin match int_of_string port with
|
||||||
|
| exception Failure _ -> Error "non-numeric port"
|
||||||
|
| port -> Ok (ip, port)
|
||||||
|
end
|
||||||
|
| _ -> Error "multiple : found"
|
||||||
|
with
|
||||||
|
| Error msg -> `Error msg
|
||||||
|
| Ok (ip, port) -> match Ipaddr.V4.of_string ip with
|
||||||
|
| Ok ip -> `Ok (ip, port)
|
||||||
|
| Error `Msg msg -> `Error msg
|
||||||
|
in
|
||||||
|
parse, fun ppf (ip, port) -> Format.fprintf ppf "%a:%d" Ipaddr.V4.pp ip port
|
||||||
|
|
||||||
let setup_log =
|
let setup_log =
|
||||||
Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ())
|
Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ())
|
||||||
|
|
||||||
|
@ -57,9 +157,14 @@ let host =
|
||||||
let doc = "host" in
|
let doc = "host" in
|
||||||
Arg.(value & opt string "0.0.0.0" & info [ "h"; "host" ] ~doc)
|
Arg.(value & opt string "0.0.0.0" & info [ "h"; "host" ] ~doc)
|
||||||
|
|
||||||
|
let influx =
|
||||||
|
let doc = "IP address and port (default: 8094) to report metrics to in influx line protocol" in
|
||||||
|
Arg.(value & opt (some ip_port) None & info [ "influx" ] ~doc ~docv:"INFLUXHOST[:PORT]")
|
||||||
|
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
let () = Mirage_crypto_rng_unix.initialize () in
|
let () = Mirage_crypto_rng_unix.initialize () in
|
||||||
let term = Term.(pure setup_app $ setup_log $ port $ host $ datadir) in
|
let term = Term.(pure setup_app $ setup_log $ influx $ port $ host $ datadir) in
|
||||||
let info = Term.info "Builder web" ~doc:"Builder web" ~man:[] in
|
let info = Term.info "Builder web" ~doc:"Builder web" ~man:[] in
|
||||||
match Term.eval (term, info) with
|
match Term.eval (term, info) with
|
||||||
| `Ok s ->
|
| `Ok s ->
|
||||||
|
|
2
bin/dune
2
bin/dune
|
@ -2,7 +2,7 @@
|
||||||
(public_name builder-web)
|
(public_name builder-web)
|
||||||
(name builder_web_app)
|
(name builder_web_app)
|
||||||
(modules builder_web_app)
|
(modules builder_web_app)
|
||||||
(libraries builder_web mirage-crypto-rng.unix cmdliner logs.cli fmt.cli fmt.tty ptime.clock.os))
|
(libraries builder_web mirage-crypto-rng.unix cmdliner logs.cli fmt.cli fmt.tty ptime.clock.os metrics metrics-lwt metrics-influx metrics-rusage ipaddr ipaddr.unix))
|
||||||
|
|
||||||
(executable
|
(executable
|
||||||
(public_name builder-db)
|
(public_name builder-db)
|
||||||
|
|
|
@ -27,6 +27,10 @@ depends: [
|
||||||
"alcotest" {with-test}
|
"alcotest" {with-test}
|
||||||
"opam-core"
|
"opam-core"
|
||||||
"opam-format"
|
"opam-format"
|
||||||
|
"metrics"
|
||||||
|
"metrics-lwt"
|
||||||
|
"metrics-influx"
|
||||||
|
"ipaddr"
|
||||||
]
|
]
|
||||||
|
|
||||||
synopsis: "Web interface for builder"
|
synopsis: "Web interface for builder"
|
||||||
|
|
Loading…
Reference in a new issue