commit 6683f90e61470cfaa4f6b9a71cd1df50ca387851 Author: Hannes Mehnert Date: Sun May 12 23:25:25 2019 +0200 . diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6937a8f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +_build/ +.merlin +*.install +.*.swp diff --git a/CHANGES.md b/CHANGES.md new file mode 100644 index 0000000..e69de29 diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/dune-project b/dune-project new file mode 100644 index 0000000..2e3a6b9 --- /dev/null +++ b/dune-project @@ -0,0 +1,2 @@ +(lang dune 1.0) +(name monitoring-experiments) diff --git a/monitoring-experiments.opam b/monitoring-experiments.opam new file mode 100644 index 0000000..7eb2b27 --- /dev/null +++ b/monitoring-experiments.opam @@ -0,0 +1,41 @@ +opam-version: "2.0" +maintainer: "Hannes Mehnert " +authors: ["Hannes Mehnert "] +homepage: "https://github.com/hannesm/monitoring-experiments" +doc: "https://hannesm.github.io/monitoring-experiments" +dev-repo: "git+https://github.com/hannesm/monitoring-experiments.git" +bug-reports: "https://github.com/mirage/monitoring-experiments/issues" +license: "AGPL" + +depends: [ + "ocaml" {>= "4.07.1"} + "logs" {>= "0.6.3"} + "metrics" + "astring" {>= "0.8.3"} + "tls" {>= "0.10.2"} + "mirage-clock" {>= "2.0.0"} + "mirage-time-lwt" {>= "1.1.0"} + "mirage-stack-lwt" {>= "1.4.0"} + "mirage-flow-lwt" {>= "1.5.0"} + "tcpip" + "mirage-net-solo5" + "mirage-solo5" + "ocaml-freestanding" + "bheap" +] +pin-depends: [ + [ "ocaml-freestanding.dev" "git+https://github.com/hannesm/ocaml-freestanding.git#mallinfo" ] + [ "mirage-solo5.dev" "git+https://github.com/hannesm/mirage-solo5.git#stats" ] + [ "bheap.dev" "git+https://github.com/hannesm/bheap.git#expose-size" ] + [ "mirage-net-solo5.dev" "git+https://github.com/hannesm/mirage-net-solo5.git#stats" ] + [ "tcpip.dev" "git+https://github.com/hannesm/mirage-tcpip.git#stats" ] +] +build: [ + ["dune" "subst"] {pinned} + ["dune" "build" "-p" name "-j" jobs] + ["dune" "runtest" "-p" name "-j" jobs] {with-test} +] +synopsis: "Experiments with monitoring thingies" +description: """ +bla bla +""" diff --git a/src/dune b/src/dune new file mode 100644 index 0000000..9637379 --- /dev/null +++ b/src/dune @@ -0,0 +1,6 @@ +(library + (name monitoring_experiments) + (public_name monitoring-experiments) + (wrapped false) + (libraries logs metrics astring tls tls.mirage mirage-time-lwt mirage-clock + mirage-flow-lwt mirage-stack-lwt mirage-solo5)) diff --git a/src/monitoring_experiments.ml b/src/monitoring_experiments.ml new file mode 100644 index 0000000..0d15e8f --- /dev/null +++ b/src/monitoring_experiments.ml @@ -0,0 +1,225 @@ + +module I = struct + (*************) + (* influxdb line protocol reporter *) + (* from https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ *) + (* example line: weather,location=us-midwest temperature=82 1465839830100400200 *) + (*************) + + open Astring + + let avoid_keyword = + let keywords = String.Set.of_list [ + "ALL" ; "ALTER" ; "ANY" ; "AS" ; "ASC" ; "BEGIN" ; + "BY" ; "CREATE" ; "CONTINUOUS" ; "DATABASE" ; "DATABASES" ; "DEFAULT" ; + "DELETE" ; "DESC" ; "DESTINATIONS" ; "DIAGNOSTICS" ; "DISTINCT" ; "DROP" ; + "DURATION" ; "END" ; "EVERY" ; "EXPLAIN" ; "FIELD" ; "FOR" ; + "FROM" ; "GRANT" ; "GRANTS" ; "GROUP" ; "GROUPS" ; "IN" ; + "INF" ; "INSERT" ; "INTO" ; "KEY" ; "KEYS" ; "KILL" ; + "LIMIT" ; "SHOW" ; "MEASUREMENT" ; "MEASUREMENTS" ; "NAME" ; "OFFSET" ; + "ON" ; "ORDER" ; "PASSWORD" ; "POLICY" ; "POLICIES" ; "PRIVILEGES" ; + "QUERIES" ; "QUERY" ; "READ" ; "REPLICATION" ; "RESAMPLE" ; "RETENTION" ; + "REVOKE" ; "SELECT" ; "SERIES" ; "SET" ; "SHARD" ; "SHARDS" ; + "SLIMIT" ; "SOFFSET" ; "STATS" ; "SUBSCRIPTION" ; "SUBSCRIPTIONS" ; "TAG" ; + "TO" ; "USER" ; "USERS" ; "VALUES" ; "WHERE" ; "WITH" ; "WRITE" + ] in + (fun m -> if String.(Set.mem (Ascii.uppercase m) keywords) then "o" ^ m else m) + + let escape = + List.fold_right (fun e m' -> String.(concat ~sep:("\\" ^ e) (cuts ~sep:e m'))) + + let escape_measurement m = escape [ "," ; " " ] (avoid_keyword m) + + let escape_name m = escape [ "," ; " " ; "=" ] (avoid_keyword m) + + let pp_value (str : string Fmt.t) ppf f = + let open Metrics in + match value f with + | V (String, s) -> str ppf s + | V (Int, i) -> Fmt.pf ppf "%di" i + | V (Int32, i32) -> Fmt.pf ppf "%ldi" i32 + | V (Int64, i64) -> Fmt.pf ppf "%Ldi" i64 + | V (Uint, u) -> Fmt.pf ppf "%ui" u + | V (Uint32, u32) -> Fmt.pf ppf "%lui" u32 + | V (Uint64, u64) -> Fmt.pf ppf "%Lui" u64 + | _ -> pp_value ppf f + + (* we need to: + - avoid keywords + - escape comma and space in measurement name + - escape comma, space and equal in tag key, tag value, field key of type string + - double-quote field value of type string + - data type number is a float, suffix i for integers *) + let encode_line_protocol tags data name = + let data_fields = Metrics.Data.fields data in + let pp_field_str ppf s = Fmt.pf ppf "%S" s in + let pp_field ppf f = + Fmt.(pair ~sep:(unit "=") string (pp_value pp_field_str)) ppf + (escape_name (Metrics.key f), f) + in + let pp_fields = Fmt.(list ~sep:(unit ",") pp_field) in + let pp_tag_str ppf s = Fmt.string ppf (escape_name s) in + let pp_tag ppf f = + Fmt.(pair ~sep:(unit "=") string (pp_value pp_tag_str)) ppf + (escape_name (Metrics.key f), f) + in + let pp_tags = Fmt.(list ~sep:(unit ",") pp_tag) in + Fmt.strf "%s,%a %a\n" (escape_measurement name) pp_tags tags pp_fields data_fields +end + +module R = struct + module SM = Map.Make(Metrics.Src) + + let store_reporter now () = + let m = ref SM.empty in + let report ~tags ~data ~over src k = + m := SM.add src (tags, data) !m; + over (); k () + in + (fun () -> !m), { Metrics.report ; now ; at_exit = (fun () -> ()) } +end + +module L = struct + open Lwt.Infix + + let gc_stat = Metrics.gc_stat ~tags:Metrics.Tags.[] + + let log_stats ~tags = + let open Metrics in + let doc = "Statistics of the Logs library" in + let data () = + let warnings, errors = Logs.warn_count (), Logs.err_count () in + Data.v + [ int "warnings" warnings ; int "errors" errors ] + in + Src.v ~doc ~tags ~data "logs" + + let log_stat = log_stats ~tags:Metrics.Tags.[] + + let malloc_stat = OS.MM.malloc_metrics ~tags:Metrics.Tags.[] + + let sleeper_stat = OS.Time.sleep_metrics ~tags:Metrics.Tags.[] + + let collect s () = + let f () = Metrics.add gc_stat (fun x -> x) (fun d -> d ()) + and g () = Metrics.add log_stat (fun x -> x) (fun d -> d ()) + and h () = Metrics.add malloc_stat (fun x -> x) (fun d -> d ()) + and i () = Metrics.add sleeper_stat (fun x -> x) (fun d -> d ()) + in + let one () = f (); g (); h (); i () ; Lwt.return_unit in + let rec loop () = Lwt.join [ one (); s () ] >>= loop in + loop () +end + +module M = struct + open Lwt.Infix + + let src = Logs.Src.create "influx" ~doc:"influx metrics reporter" + module Log = (val Logs.src_log src : Logs.LOG) + + let vmname = + Metrics.field + ~doc:"name of the virtual machine" + "vm" Metrics.String + + module Pull (T : Mirage_time_lwt.S) (C : Mirage_clock.MCLOCK) (F : Mirage_flow_lwt.S) = struct + + type t = { mutable flows : F.flow list } + + let add_flow t flow = + Log.debug (fun m -> m "registered new flow"); + t.flows <- flow :: t.flows + + let timer get host t = + let datas = + Metrics.SM.fold (fun src (tags, data) acc -> + let name = Metrics.Src.name src in + I.encode_line_protocol (host@tags) data name :: acc) + (get ()) [] + in + let datas = String.concat "" datas in + Log.debug (fun m -> m "sending measurements to %d flows %s" (List.length t.flows) datas); + let cs = Cstruct.of_string datas in + Lwt_list.fold_left_s (fun acc flow -> + F.write flow cs >|= function + | Ok () -> flow :: acc + | Error we -> + Log.warn (fun m -> m "writing to flow failed %a, dropping" F.pp_write_error we); + acc) + [] t.flows >|= fun flows -> + t.flows <- flows + + let timer_loop get host interval t () = + let rec one () = + Lwt.join [ + timer get host t; + T.sleep_ns (Duration.of_sec interval) + ] >>= fun () -> + (one[@tailcall]) () + in + one () + + let create ?(interval = 10) ?hostname () = + let get, reporter = Metrics.store_reporter C.elapsed_ns () in + Metrics.set_reporter reporter; + let host = match hostname with None -> [] | Some host -> [vmname host] in + let t = { flows = [] } in + Lwt.async (timer_loop get host interval t); + t + end + + module S (T : Mirage_time_lwt.S) (P : Mirage_clock.PCLOCK) (C : Mirage_clock.MCLOCK) (S : Mirage_stack_lwt.V4) = struct + module TLS = Tls_mirage.Make(S.TCPV4) + module I = Pull(T)(C)(TLS) + + let ca = + let data = Cstruct.of_string {| +-----BEGIN CERTIFICATE----- +MIIFFzCCAv+gAwIBAgIJAMWQyAG/b/OyMA0GCSqGSIb3DQEBCwUAMBgxFjAUBgNV +BAMMDW1vbml0b3JpbmctY2EwHhcNMTkwNTEyMjA1OTQ5WhcNMjkwNTA5MjA1OTQ5 +WjAYMRYwFAYDVQQDDA1tb25pdG9yaW5nLWNhMIICIjANBgkqhkiG9w0BAQEFAAOC +Ag8AMIICCgKCAgEAwczrhNy2F4sVdvo+YMWKtrvPUyRphxLvkFPmO9ewhQIXEdW/ +cq+DRKCennOH8F5/mb848MtJDV7plqap+QD3Pzn8t4TjlVd7l1sPjnwLcpeFvtWn +k9fxcsFQkl+D2nkCvQJIYlS9UWEq6d8BeVFt07aqN8k/Q3CqdBrJNVjX4g6Fo11h +4lG8BZLRBU+Sesq7+5TKw3nZPenMfcsLjmHGA0Xkxk687tb2ONkvH9OsT0pHnf2W +M9WczHxtBJKI1QtxMb4gOeSMFV3FnfPbswt9H+B5tzu2YjV2H/fXVtejJhgBMmlg +pIbccyVoKcjJXaQylLsCjPOxIiqtElTHl2rgOZxwacrewBvE/PX2ulDVCfLZGM+h +90tpLgO5JQ+nlfQUgtb8W4JP8gO2BFPBIOuf394J/jrGOfFL6gOAUmNvYoYm66I9 +OW+wE8FvLAR2AItkxTvwUQHspt7PrlFMqMAaWYpG7d4tpOAZTkCKiBMQovZzC0Qt +IFvw5Obo5BbSM7ZCgWseIy6nCeM1F4HDxWTWfYETAKN2p6iTjbQ9KHQfOB7v18iI +zUifJAgeRD09MW0uJa1/8MJDGniPV+dLTy3unnYyuq22tQzv4lr1e6sQsNcnVI8n ++RVkYKBrIQeC8Y+VAsKr/pjzEFiglbZ26mvyKXHdssVWePzHA2Wqc7SAfIkCAwEA +AaNkMGIwHwYDVR0jBBgwFoAUm1qoqdDxv45dJj8zxOX5aQSVSngwHQYDVR0OBBYE +FJtaqKnQ8b+OXSY/M8Tl+WkElUp4MA8GA1UdEwEB/wQFMAMBAf8wDwYDVR0PAQH/ +BAUDAwfGADANBgkqhkiG9w0BAQsFAAOCAgEAwZpZq7MAjO7f/aPxycEYzPrAMiO/ +dIak48w9fTVlrVT8AgKfIKtsygr6lpKLe2aJJwEVuq8TaVhPVVi8cfS1DVMB6ifx +SWA+yv//C1tdt0LiswD4nE55418+c2RvRdlH6lVjzeYEOis+qY/MhAj33iSQ071E +fPCPeiNa9P5vIu6Ds1FAZZ+3W0h7nUyhZGjUBngeT5FdBzjRZBPqloeBCkftYpz7 +HaNUZbRt+N/1m4bUPaUzm+jI3u3sxrglJnLgr+vXW4seCv1tzi9GTTl/DkTVXoiU +DEQOHwGC/NzqezguBey5M7M9fiqgIbEFaXBPMUvmDkYKrDX/L1z4TxOiNZupxEky +RavawY8Do6OISqkhmPTvDa1N82asnLmKHEnP8JoSZRiTORqpaJgdNySXf0FcK9Zv +dBgyeUfT3t+r1KR164HOzIc/vSCzpm9bgeCfR097/XFuCqOnoyip9MOGKdcQuvPL +GmjUcb8E3X6qXLZVMETMpGk4aFs6Kq+7zvNedD9sZUTIpjMgrQa4mEIpdfSSgiDE +/U8p/qZWmEv0tJGIvmGqDK0NF3VW4CNTmwqZzMLxnSfL09lcbMZ/kmDKIvYqTMhl +DDGH2wmML4sbbol/cJOucUrvAsQxRj39y+N737ojNUgybgtLiv1FyKzmBx24u0Bb +jfuLKkCfGcw9A8o= +-----END CERTIFICATE----- +|} in + X509.Encoding.Pem.Certificate.of_pem_cstruct1 data + + let create s ?(port = 8093) ?hostname ?(interval = 10) certificates = + Metrics.enable_all (); + let flows = I.create ~interval ?hostname () in + Lwt.async (L.collect (fun () -> T.sleep_ns (Duration.of_sec interval))); + let authenticator = + X509.Authenticator.chain_of_trust + ~time:(Ptime.v (P.now_d_ps ())) [ca] + in + let server = Tls.Config.server ~certificates ~authenticator () in + S.listen_tcpv4 s ~port (fun flow -> + TLS.server_of_flow server flow >|= function + | Ok tls -> I.add_flow flows tls + | Error e -> Log.err (fun m -> m "TLS error %a" TLS.pp_write_error e)) + end +end + diff --git a/src/monitoring_experiments.mli b/src/monitoring_experiments.mli new file mode 100644 index 0000000..ae43e4f --- /dev/null +++ b/src/monitoring_experiments.mli @@ -0,0 +1,43 @@ + +module I : sig + val encode_line_protocol : Metrics.tags -> Metrics.data -> string -> string +end + +module R : sig + module SM : Map.S with type key = Metrics.Src.t + + val store_reporter : (unit -> int64) -> unit -> + (unit -> (Metrics.tags * Metrics.data) SM.t) * Metrics.reporter +(** [store_reporter now ()] is a reporter that stores the last measurement from + each source in a map (which can be retrieved by the returned function). + This is an initial attempt to overcome the push vs pull interface. Each + measurement _event_ is sent at an arbitrary point in time, while reporting + over a communication channel may be rate-limited (i.e. report every 10 + seconds statistics, rather than whenever they appear). + + This is only a good idea for counters, histograms etc. may be useful for + other numbers (such as time consumed between receive and send - the + measurement should provide the information whether it's a counter or sth + else). *) +end + +module M : sig + val vmname : string -> Metrics.field + (** [vmname name] creates a [tag] with the virtual machine name. *) + + module Pull (T : Mirage_time_lwt.S) (C : Mirage_clock.MCLOCK) (F : Mirage_flow_lwt.S) : sig + type t + val create : ?interval:int -> ?hostname:string -> unit -> t + (** [create_pull ~interval ~hostname ()] registers a reporter, that will + each [interval] (in seconds, default is 10) send in an asynchronous task + gathered metrics to all registered flows. Each metrics source produces + one measurment in influx format. Flows where write fails are + deregistered. *) + + val add_flow : t -> F.flow -> unit + end + + module S (T : Mirage_time_lwt.S) (P : Mirage_clock.PCLOCK) (C : Mirage_clock.MCLOCK) (S : Mirage_stack_lwt.V4) : sig + val create : S.t -> ?port:int -> ?hostname:string -> ?interval:int -> Tls.Config.own_cert -> unit + end +end