Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the kvictoria plug-in to provide metrics #881

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Each of these plugins has a corresponding example in [examples](../examples/hook
<a href="./">plugin</a> — you are here
├── <a href="./kgmetrics">kgmetrics</a> — plug-in go-metrics to use with `kgo.WithHooks`
├── <a href="./kprom">kprom</a> — plug-in prometheus metrics to use with `kgo.WithHooks`
├── <a href="./kvictoria">kvictoria</a> — plug-in victoria metrics to use with `kgo.WithHooks`
├── <a href="./klogrus">klogrus</a> — plug-in sirupsen/logrus to use with `kgo.WithLogger`
├── <a href="./kzap">kzap</a> — plug-in uber-go/zap to use with `kgo.WithLogger`
└── <a href="./kzerolog">kzerolog</a> — plug-in rs/zerolog to use with `kgo.WithLogger`
Expand Down
71 changes: 71 additions & 0 deletions plugin/kvictoria/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# kvictoria

kvictoria is a plug-in package to provide metrics for [VictoriaMetrics](https://victoriametrics.com/) using the [VictoriaMetrics/metrics](https://github.com/VictoriaMetrics/metrics) library through a
[`kgo.Hook`](https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Hook).

__Note__: this plug-in is intended to be used by users of the VictoriaMetrics database: due to the non-standard [implementation of histograms](https://pkg.go.dev/github.com/VictoriaMetrics/metrics#Histogram) in the `metrics` library using any other timeseries database will mean you get no usable histograms which would cripple your observability. If you need standard histograms you should use the [kprom](../kprom/README.md) plugin.

# Usage

To use this plug-in, do this:

```go
metrics := kvictoria.NewMetrics("namespace")
cl, err := kgo.NewClient(
kgo.WithHooks(metrics),
// ...other opts
)
```

The metrics will be automatically exposed when you use the [WritePrometheus](https://pkg.go.dev/github.com/VictoriaMetrics/metrics#WritePrometheus) function in your handler:
```go
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
metrics.WritePrometheus(w, false)
})
```

# Details

This package provides the following metrics.

```
{namespace}_{subsystem}_connects_total{node_id="#{node}"}
{namespace}_{subsystem}_connect_errors_total{node_id="#{node}"}
{namespace}_{subsystem}_connect_seconds{node_id="#{node}"}
{namespace}_{subsystem}_disconnects_total{node_id="#{node}"}

{namespace}_{subsystem}_write_errors_total{node_id="#{node}"}
{namespace}_{subsystem}_write_bytes_total{node_id="#{node}"}
{namespace}_{subsystem}_write_wait_seconds{node_id="#{node}"}
{namespace}_{subsystem}_write_time_seconds{node_id="#{node}"}
{namespace}_{subsystem}_read_errors_total{node_id="#{node}"}
{namespace}_{subsystem}_read_bytes_total{node_id="#{node}"}
{namespace}_{subsystem}_read_wait_seconds{node_id="#{node}"}
{namespace}_{subsystem}_read_time_seconds{node_id="#{node}"}

{namespace}_{subsystem}_request_duration_e2e_seconds{node_id="#{node}"}
{namespace}_{subsystem}_request_throttled_seconds{node_id="#{node}"}

{namespace}_{subsystem}_group_manage_error{node_id="#{node}",error_message="#{error_message}"}

{namespace}_{subsystem}_produce_uncompressed_bytes_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}
{namespace}_{subsystem}_produce_compressed_bytes_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}
{namespace}_{subsystem}_produce_batches_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}
{namespace}_{subsystem}_produce_records_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}

{namespace}_{subsystem}_fetch_uncompressed_bytes_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}
{namespace}_{subsystem}_fetch_compressed_bytes_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}
{namespace}_{subsystem}_fetch_batches_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}
{namespace}_{subsystem}_fetch_records_total{node_id="#{node}",topic="#{topic}",partition="#{partition}"}

{namespace}_{subsystem}_buffered_produce_records_total{client_id="#{client_id}"}
{namespace}_{subsystem}_buffered_produce_bytes_total{client_id="#{client_id}"}
{namespace}_{subsystem}_buffered_fetch_records_total{client_id="#{client_id}"}
{namespace}_{subsystem}_buffered_fetch_bytes_total{client_id="#{client_id}"}
```

Some notes:
* the `subsystem` is optional, if you want to use it you can pass the option `kvictoria.Subsystem("mysubsystem")` when calling `NewMetrics`.
* metrics that are suffixed `_total` are either a counter or a gauge
* metrics that are suffixed `_seconds` are histograms
* the `group_manage_error` metric is a counter incremented any time there's an error that caused the client, operating as a group member, to break out of the group managing loop
32 changes: 32 additions & 0 deletions plugin/kvictoria/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kvictoria

type cfg struct {
namespace string
subsystem string
}

func newCfg(namespace string, opts ...Opt) cfg {
cfg := cfg{
namespace: namespace,
}

for _, opt := range opts {
opt.apply(&cfg)
}

return cfg
}

// Opt is an option to configure Metrics.
type Opt interface {
apply(*cfg)
}

type opt struct{ fn func(*cfg) }

func (o opt) apply(c *cfg) { o.fn(c) }

// Subsystem sets the subsystem for the metrics, overriding the default empty string.
func Subsystem(ss string) Opt {
return opt{func(c *cfg) { c.subsystem = ss }}
}
19 changes: 19 additions & 0 deletions plugin/kvictoria/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module github.com/twmb/franz-go/plugin/kvictoria

go 1.21.13

toolchain go1.23.4

require (
github.com/VictoriaMetrics/metrics v1.35.1
github.com/twmb/franz-go v1.18.0
)

require (
github.com/klauspost/compress v1.17.9 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/sys v0.22.0 // indirect
)
18 changes: 18 additions & 0 deletions plugin/kvictoria/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
github.com/VictoriaMetrics/metrics v1.35.1 h1:o84wtBKQbzLdDy14XeskkCZih6anG+veZ1SwJHFGwrU=
github.com/VictoriaMetrics/metrics v1.35.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
243 changes: 243 additions & 0 deletions plugin/kvictoria/kvictoria.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package kvictoria

import (
"errors"
"net"
"sort"
"strconv"
"strings"
"time"

vm "github.com/VictoriaMetrics/metrics"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)

var (
// interface checks to ensure we implement the hooks properly
_ kgo.HookNewClient = new(Metrics)
_ kgo.HookClientClosed = new(Metrics)
_ kgo.HookBrokerConnect = new(Metrics)
_ kgo.HookBrokerDisconnect = new(Metrics)
_ kgo.HookBrokerWrite = new(Metrics)
_ kgo.HookBrokerRead = new(Metrics)
_ kgo.HookBrokerE2E = new(Metrics)
_ kgo.HookBrokerThrottle = new(Metrics)
_ kgo.HookGroupManageError = new(Metrics)
_ kgo.HookProduceBatchWritten = new(Metrics)
_ kgo.HookFetchBatchRead = new(Metrics)
)

// Metrics provides metrics using the [VictoriaMetrics/metrics] library.
//
// [VictoriaMetrics/metrics]: https://github.com/VictoriaMetrics/metrics
type Metrics struct {
cfg cfg

sets map[string]*vm.Set
}

// NewMetrics returns a new Metrics that tracks metrics under the given namespace.
//
// You can pass options to configure the metrics reporting. See [Opt] for all existing options.
func NewMetrics(namespace string, opts ...Opt) *Metrics {
return &Metrics{
cfg: newCfg(namespace, opts...),
sets: make(map[string]*vm.Set),
}
}

// OnNewClient implements the [kgo.HookNewClient] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnNewClient(client *kgo.Client) {
clientID := client.OptValue(kgo.ClientID).(string)

set, ok := m.sets[clientID]
if !ok {
set = vm.NewSet()
m.sets[clientID] = set
}

labels := map[string]string{"client_id": clientID}

set.GetOrCreateGauge(m.buildName("buffered_fetch_bytes_total", labels), func() float64 {
return float64(client.BufferedFetchBytes())
})
set.GetOrCreateGauge(m.buildName("buffered_fetch_records_total", labels), func() float64 {
return float64(client.BufferedFetchRecords())
})
set.GetOrCreateGauge(m.buildName("buffered_produce_bytes_total", labels), func() float64 {
return float64(client.BufferedProduceBytes())
})
set.GetOrCreateGauge(m.buildName("buffered_produce_records_total", labels), func() float64 {
return float64(client.BufferedProduceRecords())
})

vm.RegisterSet(set)
}

// OnClientClosed implements the [kgo.HookClientClosed] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
//
// This will unregister all metrics that are scoped to the client id of the client provided.
func (m *Metrics) OnClientClosed(client *kgo.Client) {
clientID := client.OptValue(kgo.ClientID).(string)

set, ok := m.sets[clientID]
if !ok {
return
}

vm.UnregisterSet(set, true)
}

// OnBrokerConnect implements the [kgo.HookBrokerConnect] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, dialTime time.Duration, _ net.Conn, err error) {
labels := map[string]string{"node_id": kgo.NodeName(meta.NodeID)}

if err != nil {
vm.GetOrCreateCounter(m.buildName("connect_errors_total", labels)).Inc()
return
}

vm.GetOrCreateCounter(m.buildName("connects_total", labels)).Inc()
vm.GetOrCreateHistogram(m.buildName("connect_seconds", labels)).Update(dialTime.Seconds())
}

// OnBrokerDisconnect implements the [kgo.HookBrokerDisconnect] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user
func (m *Metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
labels := map[string]string{"node_id": kgo.NodeName(meta.NodeID)}

vm.GetOrCreateCounter(m.buildName("disconnects_total", labels)).Inc()
}

// OnBrokerWrite is a noop implementation of [kgo.HookBrokerWrite], logic moved to OnBrokerE2E
func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, _, _ time.Duration, err error) {
}

// OnBrokerRead is a noop implementation of [kgo.HookBrokerRead], logic moved to OnBrokerE2E
func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, err error) {
}

// OnBrokerE2E implements the [kgo.HookBrokerE2E] interface for metrics gathering
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnBrokerE2E(meta kgo.BrokerMetadata, _ int16, e2e kgo.BrokerE2E) {
labels := map[string]string{"node_id": kgo.NodeName(meta.NodeID)}

if e2e.WriteErr != nil {
vm.GetOrCreateCounter(m.buildName("write_errors_total", labels)).Inc()
return
}

vm.GetOrCreateCounter(m.buildName("write_bytes_total", labels)).Add(e2e.BytesWritten)
vm.GetOrCreateHistogram(m.buildName("write_wait_seconds", labels)).Update(e2e.WriteWait.Seconds())
vm.GetOrCreateHistogram(m.buildName("write_time_seconds", labels)).Update(e2e.TimeToWrite.Seconds())

if e2e.ReadErr != nil {
vm.GetOrCreateCounter(m.buildName("read_errors_total", labels)).Inc()
return
}

vm.GetOrCreateCounter(m.buildName("read_bytes_total", labels)).Add(e2e.BytesRead)
vm.GetOrCreateHistogram(m.buildName("read_wait_seconds", labels)).Update(e2e.ReadWait.Seconds())
vm.GetOrCreateHistogram(m.buildName("read_time_seconds", labels)).Update(e2e.TimeToRead.Seconds())

vm.GetOrCreateHistogram(m.buildName("request_duration_e2e_seconds", labels)).Update(e2e.DurationE2E().Seconds())
}

// OnBrokerThrottle implements the [kgo.HookBrokerThrottle] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
labels := map[string]string{"node_id": kgo.NodeName(meta.NodeID)}

vm.GetOrCreateHistogram(m.buildName("request_throttled_seconds", labels)).Update(throttleInterval.Seconds())
}

// OnGroupManageError implements the [kgo.HookBrokerThrottle] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnGroupManageError(err error) {
labels := make(map[string]string)

var kerr *kerr.Error
if errors.As(err, &kerr) {
labels["error_message"] = kerr.Message
}

vm.GetOrCreateCounter(m.buildName("group_manage_error", labels)).Inc()
}

// OnProduceBatchWritten implements the [kgo.HookProduceBatchWritten] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, partition int32, metrics kgo.ProduceBatchMetrics) {
labels := map[string]string{
"node_id": kgo.NodeName(meta.NodeID),
"topic": topic,
"partition": strconv.FormatInt(int64(partition), 10),
}

vm.GetOrCreateCounter(m.buildName("produce_uncompressed_bytes_total", labels)).Add(metrics.UncompressedBytes)
vm.GetOrCreateCounter(m.buildName("produce_compressed_bytes_total", labels)).Add(metrics.CompressedBytes)
vm.GetOrCreateCounter(m.buildName("produce_batches_total", labels)).Inc()
vm.GetOrCreateCounter(m.buildName("produce_records_total", labels)).Add(metrics.NumRecords)
}

// OnFetchBatchRead implements the [kgo.HookFetchBatchRead] interface for metrics gathering.
// This method is meant to be called by the hook system and not by the user.
func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, partition int32, metrics kgo.FetchBatchMetrics) {
labels := map[string]string{
"node_id": kgo.NodeName(meta.NodeID),
"topic": topic,
"partition": strconv.FormatInt(int64(partition), 10),
}

vm.GetOrCreateCounter(m.buildName("fetch_uncompressed_bytes_total", labels)).Add(metrics.UncompressedBytes)
vm.GetOrCreateCounter(m.buildName("fetch_compressed_bytes_total", labels)).Add(metrics.CompressedBytes)
vm.GetOrCreateCounter(m.buildName("fetch_batches_total", labels)).Inc()
vm.GetOrCreateCounter(m.buildName("fetch_records_total", labels)).Add(metrics.NumRecords)
}

// buildName constructs a metric name for the VictoriaMetrics metrics library.
//
// The library expects the user to create a metric for each and every variation of a metric
// by providing the full name, including labels: there is no equivalent to the *Vec variants
// in the official Prometheus client.
//
// This function is a helper to build such a name, taking care of properly adding
// the namespace if present, subsystem if present and labels if present.
func (m *Metrics) buildName(name string, labels map[string]string) string {
var builder strings.Builder

if m.cfg.namespace != "" {
builder.WriteString(m.cfg.namespace + "_")
}
if m.cfg.subsystem != "" {
builder.WriteString(m.cfg.subsystem + "_")
}
builder.WriteString(name)

// Note: can't use maps.Keys yet because it needs Go 1.23+
labelNames := make([]string, 0, len(labels)/2)
for name := range labels {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)

if len(labels) > 0 {
builder.WriteRune('{')
for i, name := range labelNames {
value := labels[name]

builder.WriteString(name)
builder.WriteRune('=')
builder.WriteString(strconv.Quote(value))
if i+1 < len(labelNames) {
builder.WriteRune(',')
}
}
builder.WriteRune('}')
}

return builder.String()
}
Loading