From 938406313eb50ea8d3e50dd340b7bf415e15a434 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 27 Jan 2022 14:39:31 +0100 Subject: [PATCH] create: add ~listen_port which allows dynamic adjustment of log level and metrics source enable/disable --- monitoring-experiments.opam | 1 + src/dune | 2 +- src/monitoring_experiments.ml | 116 ++++++++++++++++++++++++++++++++- src/monitoring_experiments.mli | 3 +- 4 files changed, 118 insertions(+), 4 deletions(-) diff --git a/monitoring-experiments.opam b/monitoring-experiments.opam index d1cdd69..a4288e1 100644 --- a/monitoring-experiments.opam +++ b/monitoring-experiments.opam @@ -17,6 +17,7 @@ depends: [ "tcpip" {>= "7.0.0"} "mirage-solo5" {>= "0.6.4"} "ocaml-freestanding" {>= "0.4.5"} + "mirage-runtime" ] build: [ ["dune" "subst"] {pinned} diff --git a/src/dune b/src/dune index 09ea214..8b5f758 100644 --- a/src/dune +++ b/src/dune @@ -3,4 +3,4 @@ (public_name monitoring-experiments) (wrapped false) (libraries logs metrics metrics-lwt metrics-influx mirage-time tcpip - mirage-solo5)) + mirage-solo5 mirage-runtime)) diff --git a/src/monitoring_experiments.ml b/src/monitoring_experiments.ml index a683b5d..0d764f2 100644 --- a/src/monitoring_experiments.ml +++ b/src/monitoring_experiments.ml @@ -3,6 +3,8 @@ open Lwt.Infix let src = Logs.Src.create "monitoring-experiments" ~doc:"Monitoring experiments" module Log = (val Logs.src_log src : Logs.LOG) +let ( let* ) = Result.bind + let create ~f = let data : (string, int) Hashtbl.t = Hashtbl.create 7 in (fun x -> @@ -42,6 +44,80 @@ let memory_metrics ~tags = in Src.v ~doc ~tags ~data "memory" +let adjust_log_level s = + let ts = + List.map + (fst Mirage_runtime.Arg.log_threshold) + (String.split_on_char ',' s) + in + let* oks = + List.fold_left (fun acc t -> + let* acc = acc in + match t with + | `Ok l -> Ok (l :: acc) + | `Error msg -> Error msg) + (Ok []) ts + in + Ok (Mirage_runtime.set_level + ~default:(Option.value (Logs.level ()) ~default:Logs.Info) + oks) + +let enable_of_str s = + let s = String.lowercase_ascii s in + if s = "enable" || s = "on" then + Ok `Enable + else if s = "disable" || s = "off" then + Ok `Disable + else + Error ("couldn't decode 'enable' or 'disable': " ^ s) + +let adjust_metrics s = + let ts = + List.map (fun s -> + match String.split_on_char ':' s with + | [ en ] | [ "*" ; en ] -> + let* en_or_d = enable_of_str en in + Ok (`All, en_or_d) + | [ src ; en ] -> + let* en_or_d = enable_of_str en in + Ok (`Src src, en_or_d) + | [ "src" ; src ; en ] -> + let* en_or_d = enable_of_str en in + Ok (`Src src, en_or_d) + | [ "tag" ; src ; en ] -> + let* en_or_d = enable_of_str en in + Ok (`Tag src, en_or_d) + | _ -> Error ("couldn't decode metrics " ^ s)) + (String.split_on_char ',' s) + in + let* (all, tags, srcs) = + List.fold_left (fun acc t -> + let* (all, tags, srcs) = acc in + let* t = t in + match t with + | `All, en_or_d -> Ok (Some en_or_d, tags, srcs) + | `Src s, en_or_d -> Ok (all, tags, (s, en_or_d) :: srcs) + | `Tag t, en_or_d -> Ok (all, (t, en_or_d) :: tags, srcs)) + (Ok (None, [], [])) ts + in + (match all with + | Some `Enable -> Metrics.enable_all () + | Some `Disable -> Metrics.disable_all () + | None -> ()); + List.iter + (function + | t, `Enable -> Metrics.enable_tag t + | t, `Disable -> Metrics.disable_tag t) + tags ; + List.iter (fun (src, e_or_d) -> + match List.find_opt (fun s -> Metrics.Src.name s = src) (Metrics.Src.list ()), e_or_d with + | Some src, `Enable -> Metrics.Src.enable src + | Some src, `Disable -> Metrics.Src.disable src + | None, _ -> + Log.warn (fun m -> m "%s is not a valid metrics source." src)) + srcs ; + Ok () + module Make (T : Mirage_time.S) (S : Tcpip.Stack.V4V6) = struct let timer conn get host stack dst = @@ -86,7 +162,42 @@ module Make (T : Mirage_time.S) (S : Tcpip.Stack.V4V6) = struct in one () - let create ?(interval = 10) ?hostname dst ?(port = 8094) stack = + let create_listener stack = function + | None -> () + | Some port -> + S.TCP.listen (S.tcp stack) ~port (fun f -> + (S.TCP.read f >>= function + | Ok `Data data -> + if Cstruct.length data > 0 then + let rest = Cstruct.(to_string (shift data 1)) in + let r = + match Cstruct.get_char data 0 with + | 'L' -> adjust_log_level rest + | 'M' -> adjust_metrics rest + | _ -> Error "unknown command" + in + let msg = + match r with + | Ok () -> "ok" | Error msg -> "error: " ^ msg + in + S.TCP.write f (Cstruct.of_string msg) >|= function + | Ok () -> () + | Error e -> + Log.warn (fun m -> m "write error on log & metrics listener %a" + S.TCP.pp_write_error e) + else + (Log.debug (fun m -> m "received empty data on log & metrics listener"); + Lwt.return_unit) + | Ok `Eof -> + Log.debug (fun m -> m "EOF on log & metrics listener"); + Lwt.return_unit + | Error e -> + Log.debug (fun m -> m "read error on log & metrics listener %a" + S.TCP.pp_error e); + Lwt.return_unit) >>= fun () -> + S.TCP.close f) + + let create ?(interval = 10) ?hostname dst ?(port = 8094) ?listen_port stack = let get_cache, reporter = Metrics.cache_reporter () in Metrics.set_reporter reporter; Metrics.enable_all (); @@ -94,6 +205,7 @@ module Make (T : Mirage_time.S) (S : Tcpip.Stack.V4V6) = struct Metrics_lwt.periodically (OS.MM.malloc_metrics ~tags:Metrics.Tags.[])[@warning "-3"]; Metrics_lwt.periodically (memory_metrics ~tags:Metrics.Tags.[]); let host = match hostname with None -> [] | Some host -> [vmname host] in - Lwt.async (timer_loop get_cache host interval stack (dst, port)) + Lwt.async (timer_loop get_cache host interval stack (dst, port)); + create_listener stack listen_port end diff --git a/src/monitoring_experiments.mli b/src/monitoring_experiments.mli index e3deecf..9922b71 100644 --- a/src/monitoring_experiments.mli +++ b/src/monitoring_experiments.mli @@ -6,5 +6,6 @@ val vmname : string -> Metrics.field (** [vmname name] creates a [tag] with the virtual machine name. *) module Make (T : Mirage_time.S) (S : Tcpip.Stack.V4V6) : sig - val create : ?interval:int -> ?hostname:string -> Ipaddr.t -> ?port:int -> S.t -> unit + val create : ?interval:int -> ?hostname:string -> Ipaddr.t -> ?port:int -> + ?listen_port:int -> S.t -> unit end