Skip to content

jamie/metrics compression #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/metricsfetcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,20 @@ Usage of metricsfetcher:
Datadog API key [METRICSFETCHER_API_KEY]
-app-key string
Datadog app key [METRICSFETCHER_APP_KEY]
-broker-id-tag string
Datadog host tag for broker ID [METRICSFETCHER_BROKER_ID_TAG] (default "broker_id")
-broker-storage-query string
Datadog metric query to get storage free by broker_id [METRICSFETCHER_BROKER_STORAGE_QUERY] (default "avg:system.disk.free{service:kafka,device:/data} by {broker_id}")
Datadog metric query to get broker storage free [METRICSFETCHER_BROKER_STORAGE_QUERY] (default "avg:system.disk.free{service:kafka,device:/data}")
-compression
Whether to compress metrics data written to ZooKeeper [METRICSFETCHER_COMPRESSION] (default true)
-dry-run
Dry run mode (don't reach Zookeeper) [METRICSFETCHER_DRY_RUN]
-partition-size-query string
Datadog metric query to get partition size by topic, partition [METRICSFETCHER_PARTITION_SIZE_QUERY] (default "max:kafka.log.partition.size{service:kafka} by {topic,partition}")
-span int
Query range in seconds (now - span) [METRICSFETCHER_SPAN] (default 3600)
-verbose
Verbose output [METRICSFETCHER_VERBOSE]
-zk-addr string
ZooKeeper connect string [METRICSFETCHER_ZK_ADDR] (default "localhost:2181")
-zk-prefix string
Expand All @@ -59,7 +67,7 @@ Another detail to note regarding the partition size query is that `max` is being

# Data Structures

The topicmappr storage placement strategy expects metrics in the following znodes under the parent `-zk-prefix` path (both metricsfetcher and topicmappr default to `topicmappr`), along with the described structure:
The topicmappr rebalance sub-command or the rebuild sub-command with the storage placement strategy expects metrics in the following znodes under the parent `-zk-prefix` path (both metricsfetcher and topicmappr default to `topicmappr`), along with the described structure:

### /topicmappr/partitionmeta
`{"<topic name>": {"<partition number>": {"Size": <bytes>}}}`
Expand All @@ -78,3 +86,5 @@ Example:
[zk: localhost:2181(CONNECTED) 0] get /topicmappr/brokermetrics
{"1002":{"StorageFree":1280803388090.7295},"1003":{"StorageFree":1104897156296.092},"1004":{"StorageFree":1161254545714.023},"1005":{"StorageFree":1196051803924.5977},"1006":{"StorageFree":1103418346402.9092},"1007":{"StorageFree":1299083586345.6743}}
```

The znode data can be optionally compressed with gzip (metricsfetcher will do this by default, configurable with the `--compression` flag) in the case of a high number of partitions where the znode data size may exceed the configured limit. Topicmappr transparently supports reading gzip compressed metrics data.
23 changes: 19 additions & 4 deletions cmd/metricsfetcher/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"flag"
Expand All @@ -27,6 +29,7 @@ type Config struct {
ZKPrefix string
Verbose bool
DryRun bool
Compression bool
}

var config = &Config{} // :(
Expand All @@ -42,6 +45,7 @@ func init() {
flag.StringVar(&config.ZKPrefix, "zk-prefix", "topicmappr", "ZooKeeper namespace prefix")
flag.BoolVar(&config.Verbose, "verbose", false, "Verbose output")
flag.BoolVar(&config.DryRun, "dry-run", false, "Dry run mode (don't reach Zookeeper)")
flag.BoolVar(&config.Compression, "compression", true, "Whether to compress metrics data written to ZooKeeper")

envy.Parse("METRICSFETCHER")
flag.Parse()
Expand Down Expand Up @@ -112,11 +116,22 @@ func main() {
}

// Write to ZK.
err = zk.Set(paths[0], string(partnData))
exitOnErr(err)
for i, data := range [][]byte{partnData, brokerData} {
// Optionally compress the data.
if config.Compression {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)

err = zk.Set(paths[1], string(brokerData))
exitOnErr(err)
_, err := zw.Write(data)
exitOnErr(err)

zw.Close()
data = buf.Bytes()
}

err = zk.Set(paths[i], string(data))
exitOnErr(err)
}

fmt.Println("\nData written to ZooKeeper")
}
Expand Down
30 changes: 30 additions & 0 deletions kafkazk/zookeeper.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package kafkazk

import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"regexp"
"sort"
Expand Down Expand Up @@ -461,6 +464,11 @@ func (z *ZKHandler) getBrokerMetrics() (BrokerMetricsMap, error) {
return nil, fmt.Errorf("Error fetching broker metrics: %s", err.Error())
}

// Check if the data is compressed.
if out, compressed := uncompress(data); compressed {
data = out
}

bmm := BrokerMetricsMap{}
err = json.Unmarshal(data, &bmm)
if err != nil {
Expand Down Expand Up @@ -489,6 +497,11 @@ func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error) {
return nil, errors.New("No partition meta")
}

// Check if the data is compressed.
if out, compressed := uncompress(data); compressed {
data = out
}

pmm := NewPartitionMetaMap()
err = json.Unmarshal(data, &pmm)
if err != nil {
Expand Down Expand Up @@ -766,3 +779,20 @@ func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) (bool, error) {

return true, nil
}

// uncompress takes a []byte and attempts to uncompress it as gzip.
// The uncompressed data and a bool that indicates whether the data
// was compressed is returned.
func uncompress(b []byte) ([]byte, bool) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err == nil {
defer zr.Close()
var out bytes.Buffer

if _, err := io.Copy(&out, zr); err == nil {
return out.Bytes(), true
}
}

return nil, false
}
167 changes: 149 additions & 18 deletions kafkazk/zookeeper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafkazk

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"regexp"
Expand Down Expand Up @@ -32,9 +34,9 @@ var (
zkprefix + "/config/brokers",
zkprefix + "/config/changes",
// Topicmappr specific.
"/topicmappr",
"/topicmappr/brokermetrics",
"/topicmappr/partitionmeta",
"/topicmappr_test",
"/topicmappr_test/brokermetrics",
"/topicmappr_test/partitionmeta",
}
)

Expand Down Expand Up @@ -96,7 +98,7 @@ func TestSetup(t *testing.T) {
zki, err = NewHandler(&Config{
Connect: zkaddr,
Prefix: configPrefix,
MetricsPrefix: "topicmappr",
MetricsPrefix: "topicmappr_test",
})
if err != nil {
t.Errorf("Error initializing ZooKeeper client: %s", err)
Expand Down Expand Up @@ -175,7 +177,7 @@ func TestSetup(t *testing.T) {

// Store partition meta.
data, _ = json.Marshal(partitionMeta)
_, err = zkc.Set("/topicmappr/partitionmeta", data, -1)
_, err = zkc.Set("/topicmappr_test/partitionmeta", data, -1)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -213,14 +215,7 @@ func TestSetup(t *testing.T) {
}

// Create broker metrics.
data = []byte(`{
"1001": {"StorageFree": 10000.00},
"1002": {"StorageFree": 20000.00},
"1003": {"StorageFree": 30000.00},
"1004": {"StorageFree": 40000.00},
"1005": {"StorageFree": 50000.00}}`)
_, err = zkc.Set("/topicmappr/brokermetrics", data, -1)
if err != nil {
if err := setBrokerMetrics(); err != nil {
t.Error(err)
}

Expand All @@ -229,6 +224,19 @@ func TestSetup(t *testing.T) {
}
}

func setBrokerMetrics() error {
data := []byte(`{
"1001": {"StorageFree": 10000.00},
"1002": {"StorageFree": 20000.00},
"1003": {"StorageFree": 30000.00},
"1004": {"StorageFree": 40000.00},
"1005": {"StorageFree": 50000.00}}`)

_, err := zkc.Set("/topicmappr_test/brokermetrics", data, -1)

return err
}

func TestCreateSetGetDelete(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -437,7 +445,6 @@ func TestGetAllBrokerMeta(t *testing.T) {
}
}

/* This test is useless.
func TestGetBrokerMetrics(t *testing.T) {
if testing.Short() {
t.Skip()
Expand All @@ -463,7 +470,63 @@ func TestGetBrokerMetrics(t *testing.T) {
}
}
}
*/

func TestGetBrokerMetricsCompressed(t *testing.T) {
if testing.Short() {
t.Skip()
}

// Create a compressed version of the metrics data.
data := []byte(`{
"1001": {"StorageFree": 10000.00},
"1002": {"StorageFree": 20000.00},
"1003": {"StorageFree": 30000.00},
"1004": {"StorageFree": 40000.00},
"1005": {"StorageFree": 50000.00}}`)

var buf bytes.Buffer
zw := gzip.NewWriter(&buf)

_, err := zw.Write(data)
if err != nil {
t.Error(err)
}

if err = zw.Close(); err != nil {
t.Error(err)
}

// Store the compressed version.
_, err = zkc.Set("/topicmappr_test/brokermetrics", buf.Bytes(), -1)
if err != nil {
t.Fatal(err)
}

// Test fetching the compressed version.
bm, errs := zki.GetAllBrokerMeta(true)
if errs != nil {
t.Error(err)
}

expected := map[int]float64{
1001: 10000.00,
1002: 20000.00,
1003: 30000.00,
1004: 40000.00,
1005: 50000.00,
}

for b, v := range bm {
if v.StorageFree != expected[b] {
t.Errorf("Unexpected StorageFree metric for broker %d", b)
}
}

// Rewrite the uncompressed version.
if err := setBrokerMetrics(); err != nil {
t.Error(err)
}
}

func TestGetAllPartitionMeta(t *testing.T) {
if testing.Short() {
Expand Down Expand Up @@ -498,6 +561,74 @@ func TestGetAllPartitionMeta(t *testing.T) {

}

func TestGetAllPartitionMetaCompressed(t *testing.T) {
if testing.Short() {
t.Skip()
}

// Fetch and hold the original partition meta.
pm, err := zki.GetAllPartitionMeta()
if err != nil {
t.Error(err)
}

pmOrig, _ := json.Marshal(pm)

// Create a compressed copy.
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)

_, err = zw.Write(pmOrig)
if err != nil {
t.Error(err)
}

if err := zw.Close(); err != nil {
t.Error(err)
}

// Store the compressed copy.
_, err = zkc.Set("/topicmappr_test/partitionmeta", buf.Bytes(), -1)
if err != nil {
t.Error(err)
}

// Test fetching the compressed copy.

pm, err = zki.GetAllPartitionMeta()
if err != nil {
t.Error(err)
}

expected := map[int]float64{
0: 1000.00,
1: 2000.00,
2: 3000.00,
3: 4000.00,
}

for i := 0; i < 5; i++ {
topic := fmt.Sprintf("topic%d", i)
meta, exists := pm[topic]
if !exists {
t.Errorf("Expected topic '%s' in partition meta", topic)
}

for partn, m := range meta {
if m.Size != expected[partn] {
t.Errorf("Expected size %f for %s %d, got %f", expected[partn], topic, partn, m.Size)
}
}
}

// Reset to the original partitionMeta.
_, err = zkc.Set("/topicmappr_test/partitionmeta", pmOrig, -1)
if err != nil {
t.Error(err)
}

}

func TestOldestMetaTs(t *testing.T) {
if testing.Short() {
t.Skip()
Expand All @@ -514,7 +645,7 @@ func TestOldestMetaTs(t *testing.T) {
zkr, err := rawHandler(&Config{
Connect: zkaddr,
Prefix: configPrefix,
MetricsPrefix: "topicmappr",
MetricsPrefix: "topicmappr_test",
})
if err != nil {
t.Errorf("Error initializing ZooKeeper client: %s", err)
Expand All @@ -524,14 +655,14 @@ func TestOldestMetaTs(t *testing.T) {

// Get the lowest Mtime value.

_, m, err = zkc.Get("/topicmappr/partitionmeta")
_, m, err = zkc.Get("/topicmappr_test/partitionmeta")
if err != nil {
t.Error(err)
}

ts1 := m.Mtime

_, m, err = zkc.Get("/topicmappr/brokermetrics")
_, m, err = zkc.Get("/topicmappr_test/brokermetrics")
if err != nil {
t.Error(err)
}
Expand Down