2019-11-06 18:28:26 +00:00
|
|
|
open Lwt.Infix
|
2019-05-12 21:25:25 +00:00
|
|
|
|
2019-11-08 11:49:04 +00:00
|
|
|
let src = Logs.Src.create "monitoring-experiments" ~doc:"Monitoring experiments"
|
2019-11-06 18:28:26 +00:00
|
|
|
module Log = (val Logs.src_log src : Logs.LOG)
|
2019-05-12 21:25:25 +00:00
|
|
|
|
2022-01-27 13:39:31 +00:00
|
|
|
let ( let* ) = Result.bind
|
|
|
|
|
2019-11-08 11:49:04 +00:00
|
|
|
let create ~f =
|
|
|
|
let data : (string, int) Hashtbl.t = Hashtbl.create 7 in
|
|
|
|
(fun x ->
|
|
|
|
let key = f x in
|
|
|
|
let cur = match Hashtbl.find_opt data key with
|
|
|
|
| None -> 0
|
|
|
|
| Some x -> x
|
|
|
|
in
|
|
|
|
Hashtbl.replace data key (succ cur)),
|
|
|
|
(fun () ->
|
|
|
|
let data, total =
|
|
|
|
Hashtbl.fold (fun key value (acc, total) ->
|
|
|
|
(Metrics.uint key value :: acc), value + total)
|
|
|
|
data ([], 0)
|
|
|
|
in
|
|
|
|
Metrics.uint "total" total :: data)
|
|
|
|
|
|
|
|
let counter_metrics ~f name =
|
|
|
|
let open Metrics in
|
|
|
|
let doc = "Counter metrics" in
|
|
|
|
let incr, get = create ~f in
|
|
|
|
let data thing = incr thing; Data.v (get ()) in
|
|
|
|
Src.v ~doc ~tags:Metrics.Tags.[] ~data name
|
|
|
|
|
2019-11-06 18:28:26 +00:00
|
|
|
let vmname = Metrics.field ~doc:"name of the virtual machine" "vm" Metrics.String
|
2019-05-12 21:25:25 +00:00
|
|
|
|
2020-12-21 14:36:25 +00:00
|
|
|
let memory_metrics ~tags =
|
|
|
|
let open Metrics in
|
|
|
|
let doc = "Memory counters" in
|
|
|
|
let data () =
|
|
|
|
let stat = OS.Memory.quick_stat () in
|
|
|
|
Data.v
|
|
|
|
[ uint "memory heap words" stat.heap_words
|
|
|
|
; uint "memory live words" stat.live_words
|
|
|
|
; uint "memory stack words" stat.stack_words
|
|
|
|
; uint "memory free words" stat.free_words ]
|
|
|
|
in
|
|
|
|
Src.v ~doc ~tags ~data "memory"
|
|
|
|
|
2022-01-27 17:39:01 +00:00
|
|
|
let get_log_levels s =
|
|
|
|
let qs = String.split_on_char ',' s in
|
|
|
|
let srcs = Logs.Src.list () in
|
|
|
|
let srcs = List.map (fun src -> Logs.Src.name src, Logs.Src.level src) srcs in
|
|
|
|
let* srcs =
|
|
|
|
match qs with
|
|
|
|
| [""] ->
|
|
|
|
let all_level = Logs.level () in
|
|
|
|
Ok (("*", all_level) :: List.filter (fun (_,l) -> l <> all_level) srcs)
|
2022-01-28 23:40:12 +00:00
|
|
|
| ["*"] ->
|
|
|
|
let all_level = Logs.level () in
|
|
|
|
Ok (("*", all_level) :: srcs)
|
|
|
|
| qs ->
|
2022-01-27 17:39:01 +00:00
|
|
|
let* () =
|
2022-01-28 23:40:12 +00:00
|
|
|
let src_names = List.map fst srcs in
|
|
|
|
match List.find_opt (fun src -> not (List.mem src src_names)) qs with
|
2022-01-27 17:39:01 +00:00
|
|
|
| Some bad_src -> Error ("unknown source: " ^ bad_src)
|
|
|
|
| None -> Ok ()
|
|
|
|
in
|
2022-01-28 23:40:12 +00:00
|
|
|
Ok (List.filter (fun (name, _) -> List.mem name qs) srcs)
|
2022-01-27 17:39:01 +00:00
|
|
|
in
|
|
|
|
let levels =
|
2022-01-28 23:40:12 +00:00
|
|
|
List.map (fun (name, level) ->
|
|
|
|
name ^ ":" ^ Logs.level_to_string level)
|
|
|
|
srcs
|
2022-01-27 17:39:01 +00:00
|
|
|
in
|
|
|
|
Ok (`String (String.concat "," levels))
|
|
|
|
|
|
|
|
let get_metrics s =
|
|
|
|
let qs = String.split_on_char ',' s in
|
|
|
|
let srcs = Metrics.Src.list () in
|
2022-01-28 23:40:12 +00:00
|
|
|
let srcs =
|
|
|
|
List.map (fun src ->
|
|
|
|
Metrics.Src.name src, Metrics.Src.is_active src)
|
|
|
|
srcs
|
|
|
|
in
|
2022-01-27 17:39:01 +00:00
|
|
|
let* srcs =
|
|
|
|
match qs with
|
2022-01-28 23:40:12 +00:00
|
|
|
| [""] ->
|
|
|
|
let all = Metrics.all_enabled () in
|
|
|
|
Ok (("*", all) :: (List.filter (fun (_, b) -> b <> all) srcs))
|
|
|
|
| ["*"] ->
|
|
|
|
let all = Metrics.all_enabled () in
|
|
|
|
let tags = Metrics.tags_enabled () in
|
|
|
|
Ok (("*", all) :: List.map (fun t -> "tag:" ^ t, true) tags @ srcs)
|
2022-01-27 17:39:01 +00:00
|
|
|
| qs ->
|
|
|
|
let* () =
|
2022-01-28 23:40:12 +00:00
|
|
|
let src_names = List.map fst srcs in
|
2022-01-27 17:39:01 +00:00
|
|
|
match List.find_opt (fun src -> not (List.mem src src_names)) qs with
|
|
|
|
| Some bad_src -> Error ("unknown source: " ^ bad_src)
|
|
|
|
| None -> Ok ()
|
|
|
|
in
|
2022-01-28 23:40:12 +00:00
|
|
|
Ok (List.filter (fun (n, _) -> List.mem n qs) srcs)
|
2022-01-27 17:39:01 +00:00
|
|
|
in
|
|
|
|
let metrics =
|
2022-01-28 23:40:12 +00:00
|
|
|
List.map (fun (name, act) ->
|
|
|
|
name ^ ":" ^ if act then "enabled" else "disabled")
|
2022-01-27 17:39:01 +00:00
|
|
|
srcs
|
|
|
|
in
|
|
|
|
Ok (`String (String.concat "," metrics))
|
|
|
|
|
2022-01-27 13:39:31 +00:00
|
|
|
let adjust_log_level s =
|
|
|
|
let ts =
|
|
|
|
List.map
|
2022-01-28 23:40:12 +00:00
|
|
|
(fun s ->
|
|
|
|
try (fst Mirage_runtime.Arg.log_threshold) s with
|
|
|
|
Failure err -> `Error ("failure with " ^ s ^ ": " ^ err))
|
2022-01-27 13:39:31 +00:00
|
|
|
(String.split_on_char ',' s)
|
|
|
|
in
|
|
|
|
let* oks =
|
|
|
|
List.fold_left (fun acc t ->
|
|
|
|
let* acc = acc in
|
|
|
|
match t with
|
|
|
|
| `Ok l -> Ok (l :: acc)
|
|
|
|
| `Error msg -> Error msg)
|
|
|
|
(Ok []) ts
|
|
|
|
in
|
2022-01-27 17:39:01 +00:00
|
|
|
Mirage_runtime.set_level
|
|
|
|
~default:(Option.value (Logs.level ()) ~default:Logs.Info)
|
|
|
|
oks;
|
|
|
|
Ok `Empty
|
2022-01-27 13:39:31 +00:00
|
|
|
|
|
|
|
let enable_of_str s =
|
|
|
|
let s = String.lowercase_ascii s in
|
|
|
|
if s = "enable" || s = "on" then
|
|
|
|
Ok `Enable
|
|
|
|
else if s = "disable" || s = "off" then
|
|
|
|
Ok `Disable
|
|
|
|
else
|
|
|
|
Error ("couldn't decode 'enable' or 'disable': " ^ s)
|
|
|
|
|
|
|
|
let adjust_metrics s =
|
|
|
|
let ts =
|
|
|
|
List.map (fun s ->
|
|
|
|
match String.split_on_char ':' s with
|
|
|
|
| [ en ] | [ "*" ; en ] ->
|
|
|
|
let* en_or_d = enable_of_str en in
|
|
|
|
Ok (`All, en_or_d)
|
|
|
|
| [ src ; en ] ->
|
|
|
|
let* en_or_d = enable_of_str en in
|
|
|
|
Ok (`Src src, en_or_d)
|
2022-01-28 23:40:12 +00:00
|
|
|
| [ "src" ; src ; en ] ->
|
|
|
|
let* en_or_d = enable_of_str en in
|
|
|
|
Ok (`Src src, en_or_d)
|
|
|
|
| [ "tag" ; tag ; en ] ->
|
|
|
|
let* en_or_d = enable_of_str en in
|
|
|
|
Ok (`Tag tag, en_or_d)
|
2022-01-27 13:39:31 +00:00
|
|
|
| _ -> Error ("couldn't decode metrics " ^ s))
|
|
|
|
(String.split_on_char ',' s)
|
|
|
|
in
|
2022-01-28 23:40:12 +00:00
|
|
|
let* (all, srcs, tags) =
|
2022-01-27 13:39:31 +00:00
|
|
|
List.fold_left (fun acc t ->
|
2022-01-28 23:40:12 +00:00
|
|
|
let* (all, srcs, tags) = acc in
|
2022-01-27 13:39:31 +00:00
|
|
|
let* t = t in
|
|
|
|
match t with
|
2022-01-28 23:40:12 +00:00
|
|
|
| `All, en_or_d -> Ok (Some en_or_d, srcs, tags)
|
|
|
|
| `Src s, en_or_d -> Ok (all, (s, en_or_d) :: srcs, tags)
|
|
|
|
| `Tag t, en_or_d -> Ok (all, srcs, (t, en_or_d) :: tags))
|
|
|
|
(Ok (None, [], [])) ts
|
2022-01-27 13:39:31 +00:00
|
|
|
in
|
|
|
|
(match all with
|
|
|
|
| Some `Enable -> Metrics.enable_all ()
|
|
|
|
| Some `Disable -> Metrics.disable_all ()
|
|
|
|
| None -> ());
|
2022-01-28 23:40:12 +00:00
|
|
|
List.iter (fun (tag, e_or_d) ->
|
|
|
|
match e_or_d with
|
|
|
|
| `Enable -> Metrics.enable_tag tag
|
|
|
|
| `Disable -> Metrics.disable_tag tag)
|
|
|
|
tags ;
|
2022-01-27 13:39:31 +00:00
|
|
|
List.iter (fun (src, e_or_d) ->
|
|
|
|
match List.find_opt (fun s -> Metrics.Src.name s = src) (Metrics.Src.list ()), e_or_d with
|
|
|
|
| Some src, `Enable -> Metrics.Src.enable src
|
|
|
|
| Some src, `Disable -> Metrics.Src.disable src
|
|
|
|
| None, _ ->
|
|
|
|
Log.warn (fun m -> m "%s is not a valid metrics source." src))
|
|
|
|
srcs ;
|
2022-01-27 17:39:01 +00:00
|
|
|
Ok `Empty
|
2022-01-27 13:39:31 +00:00
|
|
|
|
2022-01-27 11:23:42 +00:00
|
|
|
module Make (T : Mirage_time.S) (S : Tcpip.Stack.V4V6) = struct
|
2019-05-12 21:25:25 +00:00
|
|
|
|
2019-11-06 18:28:26 +00:00
|
|
|
let timer conn get host stack dst =
|
|
|
|
let datas =
|
2019-11-06 20:43:02 +00:00
|
|
|
Metrics.SM.fold (fun src (tags, data) acc ->
|
2019-11-06 18:28:26 +00:00
|
|
|
let name = Metrics.Src.name src in
|
2019-11-06 20:43:02 +00:00
|
|
|
Metrics_influx.encode_line_protocol (host@tags) data name :: acc)
|
2019-11-06 18:28:26 +00:00
|
|
|
(get ()) []
|
2019-05-12 21:25:25 +00:00
|
|
|
in
|
2019-11-06 18:28:26 +00:00
|
|
|
let datas = String.concat "" datas in
|
|
|
|
let write flow =
|
|
|
|
Log.debug (fun m -> m "sending measurements");
|
2020-12-21 13:46:23 +00:00
|
|
|
S.TCP.write flow (Cstruct.of_string datas) >|= function
|
2019-11-06 18:28:26 +00:00
|
|
|
| Ok () -> ()
|
|
|
|
| Error e ->
|
2020-12-21 13:46:23 +00:00
|
|
|
Log.err (fun m -> m "error %a writing to metrics" S.TCP.pp_write_error e);
|
2019-11-06 18:28:26 +00:00
|
|
|
conn := None
|
2019-05-12 21:25:25 +00:00
|
|
|
in
|
2019-11-06 18:28:26 +00:00
|
|
|
match !conn with
|
|
|
|
| None ->
|
|
|
|
begin
|
|
|
|
Log.debug (fun m -> m "creating connection");
|
2020-12-21 13:46:23 +00:00
|
|
|
S.TCP.create_connection (S.tcp stack) dst >>= function
|
2019-11-06 18:28:26 +00:00
|
|
|
| Error msg ->
|
|
|
|
Log.err (fun m -> m "couldn't create connection %a"
|
2020-12-21 13:46:23 +00:00
|
|
|
S.TCP.pp_error msg);
|
2019-11-06 18:28:26 +00:00
|
|
|
Lwt.return_unit
|
|
|
|
| Ok flow ->
|
|
|
|
conn := Some flow;
|
|
|
|
write flow
|
|
|
|
end
|
|
|
|
| Some f -> write f
|
|
|
|
|
|
|
|
let timer_loop get host interval stack dst () =
|
|
|
|
let conn = ref None in
|
|
|
|
let rec one () =
|
|
|
|
Lwt.join [
|
|
|
|
timer conn get host stack dst;
|
2019-05-12 21:25:25 +00:00
|
|
|
T.sleep_ns (Duration.of_sec interval)
|
|
|
|
] >>= fun () ->
|
2019-11-06 18:28:26 +00:00
|
|
|
(one[@tailcall]) ()
|
|
|
|
in
|
|
|
|
one ()
|
|
|
|
|
2022-01-27 13:39:31 +00:00
|
|
|
let create_listener stack = function
|
|
|
|
| None -> ()
|
|
|
|
| Some port ->
|
|
|
|
S.TCP.listen (S.tcp stack) ~port (fun f ->
|
|
|
|
(S.TCP.read f >>= function
|
|
|
|
| Ok `Data data ->
|
|
|
|
if Cstruct.length data > 0 then
|
|
|
|
let rest = Cstruct.(to_string (shift data 1)) in
|
|
|
|
let r =
|
|
|
|
match Cstruct.get_char data 0 with
|
|
|
|
| 'L' -> adjust_log_level rest
|
|
|
|
| 'M' -> adjust_metrics rest
|
2022-01-27 17:39:01 +00:00
|
|
|
| 'l' -> get_log_levels rest
|
|
|
|
| 'm' -> get_metrics rest
|
2022-01-27 13:39:31 +00:00
|
|
|
| _ -> Error "unknown command"
|
|
|
|
in
|
|
|
|
let msg =
|
|
|
|
match r with
|
2022-01-27 17:39:01 +00:00
|
|
|
| Ok `Empty -> "ok"
|
|
|
|
| Ok `String reply -> "ok: " ^ reply
|
|
|
|
| Error msg -> "error: " ^ msg
|
2022-01-27 13:39:31 +00:00
|
|
|
in
|
|
|
|
S.TCP.write f (Cstruct.of_string msg) >|= function
|
|
|
|
| Ok () -> ()
|
|
|
|
| Error e ->
|
|
|
|
Log.warn (fun m -> m "write error on log & metrics listener %a"
|
|
|
|
S.TCP.pp_write_error e)
|
|
|
|
else
|
|
|
|
(Log.debug (fun m -> m "received empty data on log & metrics listener");
|
|
|
|
Lwt.return_unit)
|
|
|
|
| Ok `Eof ->
|
|
|
|
Log.debug (fun m -> m "EOF on log & metrics listener");
|
|
|
|
Lwt.return_unit
|
|
|
|
| Error e ->
|
|
|
|
Log.debug (fun m -> m "read error on log & metrics listener %a"
|
|
|
|
S.TCP.pp_error e);
|
|
|
|
Lwt.return_unit) >>= fun () ->
|
|
|
|
S.TCP.close f)
|
|
|
|
|
|
|
|
let create ?(interval = 10) ?hostname dst ?(port = 8094) ?listen_port stack =
|
2019-11-06 18:28:26 +00:00
|
|
|
let get_cache, reporter = Metrics.cache_reporter () in
|
|
|
|
Metrics.set_reporter reporter;
|
|
|
|
Metrics.enable_all ();
|
|
|
|
Metrics_lwt.init_periodic (fun () -> T.sleep_ns (Duration.of_sec interval));
|
2022-01-27 12:33:26 +00:00
|
|
|
Metrics_lwt.periodically (OS.MM.malloc_metrics ~tags:Metrics.Tags.[])[@warning "-3"];
|
2020-12-21 14:36:25 +00:00
|
|
|
Metrics_lwt.periodically (memory_metrics ~tags:Metrics.Tags.[]);
|
2019-11-06 18:28:26 +00:00
|
|
|
let host = match hostname with None -> [] | Some host -> [vmname host] in
|
2022-01-27 13:39:31 +00:00
|
|
|
Lwt.async (timer_loop get_cache host interval stack (dst, port));
|
|
|
|
create_listener stack listen_port
|
2019-05-12 21:25:25 +00:00
|
|
|
end
|
|
|
|
|