less is more

This commit is contained in:
Hannes Mehnert 2019-11-06 19:28:26 +01:00
parent 89278eed14
commit 2969493479
4 changed files with 64 additions and 289 deletions

View file

@ -10,25 +10,19 @@ license: "AGPL"
depends: [
"ocaml" {>= "4.07.1"}
"logs" {>= "0.6.3"}
"metrics"
"metrics" {>= "0.99.0"}
"metrics-lwt" {>= "0.99.0"}
"metrics-influx" {>= "0.99.0"}
"astring" {>= "0.8.3"}
"tls" {>= "0.10.2"}
"mirage-clock" {>= "2.0.0"}
"mirage-time-lwt" {>= "1.1.0"}
"mirage-stack-lwt" {>= "1.4.0"}
"mirage-flow-lwt" {>= "1.5.0"}
"tcpip"
"mirage-net-solo5"
"mirage-solo5"
"mirage-clock" {>= "3.0.0"}
"mirage-time" {>= "2.0.0"}
"mirage-stack" {>= "2.0.0"}
"tcpip" {>= "4.0.99"}
"mirage-net-solo5" {>= "0.6.99"}
"mirage-solo5" {>= "0.6.99"}
"ocaml-freestanding" {>= "0.4.5"}
"bheap"
]
pin-depends: [
[ "mirage-solo5.dev" "git+https://github.com/hannesm/mirage-solo5.git#stats" ]
[ "bheap.dev" "git+https://github.com/hannesm/bheap.git#expose-size" ]
[ "mirage-net-solo5.dev" "git+https://github.com/hannesm/mirage-net-solo5.git#stats" ]
[ "tcpip.dev" "git+https://github.com/hannesm/mirage-tcpip.git#stats" ]
]
build: [
["dune" "subst"] {pinned}
["dune" "build" "-p" name "-j" jobs]

View file

@ -2,5 +2,5 @@
(name monitoring_experiments)
(public_name monitoring-experiments)
(wrapped false)
(libraries logs metrics astring tls tls.mirage mirage-time-lwt mirage-clock
mirage-flow-lwt mirage-stack-lwt mirage-solo5))
(libraries logs metrics metrics-lwt metrics-influx astring mirage-time
mirage-clock mirage-flow mirage-stack mirage-solo5))

View file

@ -1,240 +1,61 @@
open Lwt.Infix
module I = struct
(*************)
(* influxdb line protocol reporter *)
(* from https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ *)
(* example line: weather,location=us-midwest temperature=82 1465839830100400200 *)
(*************)
let src = Logs.Src.create "influx" ~doc:"influx metrics reporter"
module Log = (val Logs.src_log src : Logs.LOG)
open Astring
let vmname = Metrics.field ~doc:"name of the virtual machine" "vm" Metrics.String
let avoid_keyword =
let keywords = String.Set.of_list [
"ALL" ; "ALTER" ; "ANY" ; "AS" ; "ASC" ; "BEGIN" ;
"BY" ; "CREATE" ; "CONTINUOUS" ; "DATABASE" ; "DATABASES" ; "DEFAULT" ;
"DELETE" ; "DESC" ; "DESTINATIONS" ; "DIAGNOSTICS" ; "DISTINCT" ; "DROP" ;
"DURATION" ; "END" ; "EVERY" ; "EXPLAIN" ; "FIELD" ; "FOR" ;
"FROM" ; "GRANT" ; "GRANTS" ; "GROUP" ; "GROUPS" ; "IN" ;
"INF" ; "INSERT" ; "INTO" ; "KEY" ; "KEYS" ; "KILL" ;
"LIMIT" ; "SHOW" ; "MEASUREMENT" ; "MEASUREMENTS" ; "NAME" ; "OFFSET" ;
"ON" ; "ORDER" ; "PASSWORD" ; "POLICY" ; "POLICIES" ; "PRIVILEGES" ;
"QUERIES" ; "QUERY" ; "READ" ; "REPLICATION" ; "RESAMPLE" ; "RETENTION" ;
"REVOKE" ; "SELECT" ; "SERIES" ; "SET" ; "SHARD" ; "SHARDS" ;
"SLIMIT" ; "SOFFSET" ; "STATS" ; "SUBSCRIPTION" ; "SUBSCRIPTIONS" ; "TAG" ;
"TO" ; "USER" ; "USERS" ; "VALUES" ; "WHERE" ; "WITH" ; "WRITE"
] in
(fun m -> if String.(Set.mem (Ascii.uppercase m) keywords) then "o" ^ m else m)
module Make (T : Mirage_time.S) (S : Mirage_stack.V4) = struct
let escape =
List.fold_right (fun e m' -> String.(concat ~sep:("\\" ^ e) (cuts ~sep:e m')))
let escape_measurement m = escape [ "," ; " " ] (avoid_keyword m)
let escape_name m = escape [ "," ; " " ; "=" ] (avoid_keyword m)
let pp_value (str : string Fmt.t) ppf f =
let open Metrics in
match value f with
| V (String, s) -> str ppf s
| V (Int, i) -> Fmt.pf ppf "%di" i
| V (Int32, i32) -> Fmt.pf ppf "%ldi" i32
| V (Int64, i64) -> Fmt.pf ppf "%Ldi" i64
| V (Uint, u) -> Fmt.pf ppf "%ui" u
| V (Uint32, u32) -> Fmt.pf ppf "%lui" u32
| V (Uint64, u64) -> Fmt.pf ppf "%Lui" u64
| _ -> pp_value ppf f
(* we need to:
- avoid keywords
- escape comma and space in measurement name
- escape comma, space and equal in tag key, tag value, field key of type string
- double-quote field value of type string
- data type number is a float, suffix i for integers *)
let encode_line_protocol tags data name =
let data_fields = Metrics.Data.fields data in
let pp_field_str ppf s = Fmt.pf ppf "%S" s in
let pp_field ppf f =
Fmt.(pair ~sep:(unit "=") string (pp_value pp_field_str)) ppf
(escape_name (Metrics.key f), f)
let timer conn get host stack dst =
let datas =
Metrics.SM.fold (fun src (tags, data) acc ->
let name = Metrics.Src.name src in
Metrics_influx.encode_line_protocol (host@tags) data name :: acc)
(get ()) []
in
let pp_fields = Fmt.(list ~sep:(unit ",") pp_field) in
let pp_tag_str ppf s = Fmt.string ppf (escape_name s) in
let pp_tag ppf f =
Fmt.(pair ~sep:(unit "=") string (pp_value pp_tag_str)) ppf
(escape_name (Metrics.key f), f)
let datas = String.concat "" datas in
let write flow =
Log.debug (fun m -> m "sending measurements");
S.TCPV4.write flow (Cstruct.of_string datas) >|= function
| Ok () -> ()
| Error e ->
Log.err (fun m -> m "error %a writing to metrics" S.TCPV4.pp_write_error e);
conn := None
in
let pp_tags = Fmt.(list ~sep:(unit ",") pp_tag) in
Fmt.strf "%s,%a %a\n" (escape_measurement name) pp_tags tags pp_fields data_fields
end
match !conn with
| None ->
begin
Log.debug (fun m -> m "creating connection");
S.TCPV4.create_connection (S.tcpv4 stack) dst >>= function
| Error msg ->
Log.err (fun m -> m "couldn't create connection %a"
S.TCPV4.pp_error msg);
Lwt.return_unit
| Ok flow ->
conn := Some flow;
write flow
end
| Some f -> write f
module R = struct
module SM = Map.Make(Metrics.Src)
let store_reporter now () =
let m = ref SM.empty in
let report ~tags ~data ~over src k =
m := SM.add src (tags, data) !m;
over (); k ()
in
(fun () -> !m), { Metrics.report ; now ; at_exit = (fun () -> ()) }
end
module L = struct
open Lwt.Infix
let gc_stat = Metrics.gc_stat ~tags:Metrics.Tags.[]
let log_stats ~tags =
let open Metrics in
let doc = "Statistics of the Logs library" in
let data () =
let warnings, errors = Logs.warn_count (), Logs.err_count () in
Data.v
[ int "warnings" warnings ; int "errors" errors ]
in
Src.v ~doc ~tags ~data "logs"
let log_stat = log_stats ~tags:Metrics.Tags.[]
let malloc_stat = OS.MM.malloc_metrics ~tags:Metrics.Tags.[]
let collect s () =
let f () = Metrics.add gc_stat (fun x -> x) (fun d -> d ())
and g () = Metrics.add log_stat (fun x -> x) (fun d -> d ())
and h () = Metrics.add malloc_stat (fun x -> x) (fun d -> d ())
in
let one () = f (); g (); h (); Lwt.return_unit in
let rec loop () = Lwt.join [ one (); s () ] >>= loop in
loop ()
end
module M = struct
open Lwt.Infix
let src = Logs.Src.create "influx" ~doc:"influx metrics reporter"
module Log = (val Logs.src_log src : Logs.LOG)
let vmname =
Metrics.field
~doc:"name of the virtual machine"
"vm" Metrics.String
module Pull (T : Mirage_time_lwt.S) (C : Mirage_clock.MCLOCK) (F : Mirage_flow_lwt.S) = struct
type t = { mutable flows : F.flow list }
let add_flow t flow =
Log.debug (fun m -> m "registered new flow");
t.flows <- flow :: t.flows
let timer get host t =
let datas =
R.SM.fold (fun src (tags, data) acc ->
let name = Metrics.Src.name src in
I.encode_line_protocol (host@tags) data name :: acc)
(get ()) []
in
let datas = String.concat "" datas in
Log.debug (fun m -> m "sending measurements to %d flows %s" (List.length t.flows) datas);
let cs = Cstruct.of_string datas in
Lwt_list.fold_left_s (fun acc flow ->
F.write flow cs >|= function
| Ok () -> flow :: acc
| Error we ->
Log.warn (fun m -> m "writing to flow failed %a, dropping" F.pp_write_error we);
acc)
[] t.flows >|= fun flows ->
t.flows <- flows
let timer_loop get host interval t () =
let rec one () =
Lwt.join [
timer get host t;
let timer_loop get host interval stack dst () =
let conn = ref None in
let rec one () =
Lwt.join [
timer conn get host stack dst;
T.sleep_ns (Duration.of_sec interval)
] >>= fun () ->
(one[@tailcall]) ()
in
one ()
(one[@tailcall]) ()
in
one ()
let create ?(interval = 10) ?hostname () =
let get, reporter = R.store_reporter C.elapsed_ns () in
Metrics.set_reporter reporter;
let host = match hostname with None -> [] | Some host -> [vmname host] in
let t = { flows = [] } in
Lwt.async (timer_loop get host interval t);
t
(* actually a pushed *)
let push ?(interval = 10) ?hostname flow =
let get, reporter = R.store_reporter C.elapsed_ns () in
Metrics.set_reporter reporter;
Metrics.enable_all ();
Lwt.async (L.collect (fun () -> T.sleep_ns (Duration.of_sec interval)));
let host = match hostname with None -> [] | Some host -> [vmname host] in
Lwt.async (timer_loop get host interval { flows = [ flow ] })
end
module S (T : Mirage_time_lwt.S) (P : Mirage_clock.PCLOCK) (C : Mirage_clock.MCLOCK) (S : Mirage_stack_lwt.V4) = struct
module TLS = Tls_mirage.Make(S.TCPV4)
module I = Pull(T)(C)(TLS)
let ca =
let data = Cstruct.of_string {|
-----BEGIN CERTIFICATE-----
MIIFFzCCAv+gAwIBAgIJAMWQyAG/b/OyMA0GCSqGSIb3DQEBCwUAMBgxFjAUBgNV
BAMMDW1vbml0b3JpbmctY2EwHhcNMTkwNTEyMjA1OTQ5WhcNMjkwNTA5MjA1OTQ5
WjAYMRYwFAYDVQQDDA1tb25pdG9yaW5nLWNhMIICIjANBgkqhkiG9w0BAQEFAAOC
Ag8AMIICCgKCAgEAwczrhNy2F4sVdvo+YMWKtrvPUyRphxLvkFPmO9ewhQIXEdW/
cq+DRKCennOH8F5/mb848MtJDV7plqap+QD3Pzn8t4TjlVd7l1sPjnwLcpeFvtWn
k9fxcsFQkl+D2nkCvQJIYlS9UWEq6d8BeVFt07aqN8k/Q3CqdBrJNVjX4g6Fo11h
4lG8BZLRBU+Sesq7+5TKw3nZPenMfcsLjmHGA0Xkxk687tb2ONkvH9OsT0pHnf2W
M9WczHxtBJKI1QtxMb4gOeSMFV3FnfPbswt9H+B5tzu2YjV2H/fXVtejJhgBMmlg
pIbccyVoKcjJXaQylLsCjPOxIiqtElTHl2rgOZxwacrewBvE/PX2ulDVCfLZGM+h
90tpLgO5JQ+nlfQUgtb8W4JP8gO2BFPBIOuf394J/jrGOfFL6gOAUmNvYoYm66I9
OW+wE8FvLAR2AItkxTvwUQHspt7PrlFMqMAaWYpG7d4tpOAZTkCKiBMQovZzC0Qt
IFvw5Obo5BbSM7ZCgWseIy6nCeM1F4HDxWTWfYETAKN2p6iTjbQ9KHQfOB7v18iI
zUifJAgeRD09MW0uJa1/8MJDGniPV+dLTy3unnYyuq22tQzv4lr1e6sQsNcnVI8n
+RVkYKBrIQeC8Y+VAsKr/pjzEFiglbZ26mvyKXHdssVWePzHA2Wqc7SAfIkCAwEA
AaNkMGIwHwYDVR0jBBgwFoAUm1qoqdDxv45dJj8zxOX5aQSVSngwHQYDVR0OBBYE
FJtaqKnQ8b+OXSY/M8Tl+WkElUp4MA8GA1UdEwEB/wQFMAMBAf8wDwYDVR0PAQH/
BAUDAwfGADANBgkqhkiG9w0BAQsFAAOCAgEAwZpZq7MAjO7f/aPxycEYzPrAMiO/
dIak48w9fTVlrVT8AgKfIKtsygr6lpKLe2aJJwEVuq8TaVhPVVi8cfS1DVMB6ifx
SWA+yv//C1tdt0LiswD4nE55418+c2RvRdlH6lVjzeYEOis+qY/MhAj33iSQ071E
fPCPeiNa9P5vIu6Ds1FAZZ+3W0h7nUyhZGjUBngeT5FdBzjRZBPqloeBCkftYpz7
HaNUZbRt+N/1m4bUPaUzm+jI3u3sxrglJnLgr+vXW4seCv1tzi9GTTl/DkTVXoiU
DEQOHwGC/NzqezguBey5M7M9fiqgIbEFaXBPMUvmDkYKrDX/L1z4TxOiNZupxEky
RavawY8Do6OISqkhmPTvDa1N82asnLmKHEnP8JoSZRiTORqpaJgdNySXf0FcK9Zv
dBgyeUfT3t+r1KR164HOzIc/vSCzpm9bgeCfR097/XFuCqOnoyip9MOGKdcQuvPL
GmjUcb8E3X6qXLZVMETMpGk4aFs6Kq+7zvNedD9sZUTIpjMgrQa4mEIpdfSSgiDE
/U8p/qZWmEv0tJGIvmGqDK0NF3VW4CNTmwqZzMLxnSfL09lcbMZ/kmDKIvYqTMhl
DDGH2wmML4sbbol/cJOucUrvAsQxRj39y+N737ojNUgybgtLiv1FyKzmBx24u0Bb
jfuLKkCfGcw9A8o=
-----END CERTIFICATE-----
|} in
match X509.Certificate.decode_pem data with
| Ok c -> c
| Error (`Msg e) -> invalid_arg e
let create_tls ?(port = 8093) ?hostname ?(interval = 10) s certificates =
Metrics.enable_all ();
let flows = I.create ~interval ?hostname () in
Lwt.async (L.collect (fun () -> T.sleep_ns (Duration.of_sec interval)));
let authenticator =
X509.Authenticator.chain_of_trust
~time:(Ptime.v (P.now_d_ps ())) [ca]
in
let server = Tls.Config.server ~certificates ~authenticator () in
S.listen_tcpv4 s ~port (fun flow ->
TLS.server_of_flow server flow >|= function
| Ok tls -> I.add_flow flows tls
| Error e -> Log.err (fun m -> m "TLS error %a" TLS.pp_write_error e))
module TC = Pull(T)(C)(S.TCPV4)
let create_tcp ?(port = 8093) ?hostname ?(interval = 10) s =
Metrics.enable_all ();
let flows = TC.create ~interval ?hostname () in
Lwt.async (L.collect (fun () -> T.sleep_ns (Duration.of_sec interval)));
S.listen_tcpv4 s ~port (fun flow -> Lwt.return (TC.add_flow flows flow))
end
let create ?(interval = 10) ?hostname stack dst =
let get_cache, reporter = Metrics.cache_reporter () in
Metrics.set_reporter reporter;
Metrics.enable_all ();
Metrics_lwt.init_periodic (fun () -> T.sleep_ns (Duration.of_sec interval));
Metrics_lwt.periodically (OS.MM.malloc_metrics ~tags:Metrics.Tags.[]);
let host = match hostname with None -> [] | Some host -> [vmname host] in
Lwt.async (timer_loop get_cache host interval stack dst)
end

View file

@ -1,46 +1,6 @@
val vmname : string -> Metrics.field
(** [vmname name] creates a [tag] with the virtual machine name. *)
module I : sig
val encode_line_protocol : Metrics.tags -> Metrics.data -> string -> string
end
module R : sig
module SM : Map.S with type key = Metrics.Src.t
val store_reporter : (unit -> int64) -> unit ->
(unit -> (Metrics.tags * Metrics.data) SM.t) * Metrics.reporter
(** [store_reporter now ()] is a reporter that stores the last measurement from
each source in a map (which can be retrieved by the returned function).
This is an initial attempt to overcome the push vs pull interface. Each
measurement _event_ is sent at an arbitrary point in time, while reporting
over a communication channel may be rate-limited (i.e. report every 10
seconds statistics, rather than whenever they appear).
This is only a good idea for counters, histograms etc. may be useful for
other numbers (such as time consumed between receive and send - the
measurement should provide the information whether it's a counter or sth
else). *)
end
module M : sig
val vmname : string -> Metrics.field
(** [vmname name] creates a [tag] with the virtual machine name. *)
module Pull (T : Mirage_time_lwt.S) (C : Mirage_clock.MCLOCK) (F : Mirage_flow_lwt.S) : sig
type t
val create : ?interval:int -> ?hostname:string -> unit -> t
(** [create_pull ~interval ~hostname ()] registers a reporter, that will
each [interval] (in seconds, default is 10) send in an asynchronous task
gathered metrics to all registered flows. Each metrics source produces
one measurment in influx format. Flows where write fails are
deregistered. *)
val add_flow : t -> F.flow -> unit
val push : ?interval:int -> ?hostname:string -> F.flow -> unit
end
module S (T : Mirage_time_lwt.S) (P : Mirage_clock.PCLOCK) (C : Mirage_clock.MCLOCK) (S : Mirage_stack_lwt.V4) : sig
val create_tls : ?port:int -> ?hostname:string -> ?interval:int -> S.t -> Tls.Config.own_cert -> unit
val create_tcp : ?port:int -> ?hostname:string -> ?interval:int -> S.t -> unit
end
module Make (T : Mirage_time.S) (S : Mirage_stack.V4) : sig
val create : ?interval:int -> ?hostname:string -> S.t -> Ipaddr.V4.t * int -> unit
end