diff --git a/cmd/metricsfetcher/README.md b/cmd/metricsfetcher/README.md index 966bda9e..dd480e0b 100644 --- a/cmd/metricsfetcher/README.md +++ b/cmd/metricsfetcher/README.md @@ -35,12 +35,20 @@ Usage of metricsfetcher: Datadog API key [METRICSFETCHER_API_KEY] -app-key string Datadog app key [METRICSFETCHER_APP_KEY] + -broker-id-tag string + Datadog host tag for broker ID [METRICSFETCHER_BROKER_ID_TAG] (default "broker_id") -broker-storage-query string - Datadog metric query to get storage free by broker_id [METRICSFETCHER_BROKER_STORAGE_QUERY] (default "avg:system.disk.free{service:kafka,device:/data} by {broker_id}") + Datadog metric query to get broker storage free [METRICSFETCHER_BROKER_STORAGE_QUERY] (default "avg:system.disk.free{service:kafka,device:/data}") + -compression + Whether to compress metrics data written to ZooKeeper [METRICSFETCHER_COMPRESSION] (default true) + -dry-run + Dry run mode (don't reach Zookeeper) [METRICSFETCHER_DRY_RUN] -partition-size-query string Datadog metric query to get partition size by topic, partition [METRICSFETCHER_PARTITION_SIZE_QUERY] (default "max:kafka.log.partition.size{service:kafka} by {topic,partition}") -span int Query range in seconds (now - span) [METRICSFETCHER_SPAN] (default 3600) + -verbose + Verbose output [METRICSFETCHER_VERBOSE] -zk-addr string ZooKeeper connect string [METRICSFETCHER_ZK_ADDR] (default "localhost:2181") -zk-prefix string @@ -59,7 +67,7 @@ Another detail to note regarding the partition size query is that `max` is being # Data Structures -The topicmappr storage placement strategy expects metrics in the following znodes under the parent `-zk-prefix` path (both metricsfetcher and topicmappr default to `topicmappr`), along with the described structure: +The topicmappr rebalance sub-command or the rebuild sub-command with the storage placement strategy expects metrics in the following znodes under the parent `-zk-prefix` path (both metricsfetcher and topicmappr default to `topicmappr`), along with the described structure: ### /topicmappr/partitionmeta `{"": {"": {"Size": }}}` @@ -78,3 +86,5 @@ Example: [zk: localhost:2181(CONNECTED) 0] get /topicmappr/brokermetrics {"1002":{"StorageFree":1280803388090.7295},"1003":{"StorageFree":1104897156296.092},"1004":{"StorageFree":1161254545714.023},"1005":{"StorageFree":1196051803924.5977},"1006":{"StorageFree":1103418346402.9092},"1007":{"StorageFree":1299083586345.6743}} ``` + +The znode data can be optionally compressed with gzip (metricsfetcher will do this by default, configurable with the `--compression` flag) in the case of a high number of partitions where the znode data size may exceed the configured limit. Topicmappr transparently supports reading gzip compressed metrics data. diff --git a/cmd/metricsfetcher/main.go b/cmd/metricsfetcher/main.go index b5047a55..a95ff9f2 100644 --- a/cmd/metricsfetcher/main.go +++ b/cmd/metricsfetcher/main.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "compress/gzip" "encoding/json" "errors" "flag" @@ -27,6 +29,7 @@ type Config struct { ZKPrefix string Verbose bool DryRun bool + Compression bool } var config = &Config{} // :( @@ -42,6 +45,7 @@ func init() { flag.StringVar(&config.ZKPrefix, "zk-prefix", "topicmappr", "ZooKeeper namespace prefix") flag.BoolVar(&config.Verbose, "verbose", false, "Verbose output") flag.BoolVar(&config.DryRun, "dry-run", false, "Dry run mode (don't reach Zookeeper)") + flag.BoolVar(&config.Compression, "compression", true, "Whether to compress metrics data written to ZooKeeper") envy.Parse("METRICSFETCHER") flag.Parse() @@ -112,11 +116,22 @@ func main() { } // Write to ZK. - err = zk.Set(paths[0], string(partnData)) - exitOnErr(err) + for i, data := range [][]byte{partnData, brokerData} { + // Optionally compress the data. + if config.Compression { + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) - err = zk.Set(paths[1], string(brokerData)) - exitOnErr(err) + _, err := zw.Write(data) + exitOnErr(err) + + zw.Close() + data = buf.Bytes() + } + + err = zk.Set(paths[i], string(data)) + exitOnErr(err) + } fmt.Println("\nData written to ZooKeeper") } diff --git a/kafkazk/zookeeper.go b/kafkazk/zookeeper.go index 42e7bb60..99acba44 100644 --- a/kafkazk/zookeeper.go +++ b/kafkazk/zookeeper.go @@ -1,9 +1,12 @@ package kafkazk import ( + "bytes" + "compress/gzip" "encoding/json" "errors" "fmt" + "io" "math" "regexp" "sort" @@ -461,6 +464,11 @@ func (z *ZKHandler) getBrokerMetrics() (BrokerMetricsMap, error) { return nil, fmt.Errorf("Error fetching broker metrics: %s", err.Error()) } + // Check if the data is compressed. + if out, compressed := uncompress(data); compressed { + data = out + } + bmm := BrokerMetricsMap{} err = json.Unmarshal(data, &bmm) if err != nil { @@ -489,6 +497,11 @@ func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error) { return nil, errors.New("No partition meta") } + // Check if the data is compressed. + if out, compressed := uncompress(data); compressed { + data = out + } + pmm := NewPartitionMetaMap() err = json.Unmarshal(data, &pmm) if err != nil { @@ -766,3 +779,20 @@ func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) (bool, error) { return true, nil } + +// uncompress takes a []byte and attempts to uncompress it as gzip. +// The uncompressed data and a bool that indicates whether the data +// was compressed is returned. +func uncompress(b []byte) ([]byte, bool) { + zr, err := gzip.NewReader(bytes.NewReader(b)) + if err == nil { + defer zr.Close() + var out bytes.Buffer + + if _, err := io.Copy(&out, zr); err == nil { + return out.Bytes(), true + } + } + + return nil, false +} diff --git a/kafkazk/zookeeper_test.go b/kafkazk/zookeeper_test.go index 2415e6e3..03a6be26 100644 --- a/kafkazk/zookeeper_test.go +++ b/kafkazk/zookeeper_test.go @@ -1,6 +1,8 @@ package kafkazk import ( + "bytes" + "compress/gzip" "encoding/json" "fmt" "regexp" @@ -32,9 +34,9 @@ var ( zkprefix + "/config/brokers", zkprefix + "/config/changes", // Topicmappr specific. - "/topicmappr", - "/topicmappr/brokermetrics", - "/topicmappr/partitionmeta", + "/topicmappr_test", + "/topicmappr_test/brokermetrics", + "/topicmappr_test/partitionmeta", } ) @@ -96,7 +98,7 @@ func TestSetup(t *testing.T) { zki, err = NewHandler(&Config{ Connect: zkaddr, Prefix: configPrefix, - MetricsPrefix: "topicmappr", + MetricsPrefix: "topicmappr_test", }) if err != nil { t.Errorf("Error initializing ZooKeeper client: %s", err) @@ -175,7 +177,7 @@ func TestSetup(t *testing.T) { // Store partition meta. data, _ = json.Marshal(partitionMeta) - _, err = zkc.Set("/topicmappr/partitionmeta", data, -1) + _, err = zkc.Set("/topicmappr_test/partitionmeta", data, -1) if err != nil { t.Error(err) } @@ -213,14 +215,7 @@ func TestSetup(t *testing.T) { } // Create broker metrics. - data = []byte(`{ - "1001": {"StorageFree": 10000.00}, - "1002": {"StorageFree": 20000.00}, - "1003": {"StorageFree": 30000.00}, - "1004": {"StorageFree": 40000.00}, - "1005": {"StorageFree": 50000.00}}`) - _, err = zkc.Set("/topicmappr/brokermetrics", data, -1) - if err != nil { + if err := setBrokerMetrics(); err != nil { t.Error(err) } @@ -229,6 +224,19 @@ func TestSetup(t *testing.T) { } } +func setBrokerMetrics() error { + data := []byte(`{ + "1001": {"StorageFree": 10000.00}, + "1002": {"StorageFree": 20000.00}, + "1003": {"StorageFree": 30000.00}, + "1004": {"StorageFree": 40000.00}, + "1005": {"StorageFree": 50000.00}}`) + + _, err := zkc.Set("/topicmappr_test/brokermetrics", data, -1) + + return err +} + func TestCreateSetGetDelete(t *testing.T) { if testing.Short() { t.Skip() @@ -437,7 +445,6 @@ func TestGetAllBrokerMeta(t *testing.T) { } } -/* This test is useless. func TestGetBrokerMetrics(t *testing.T) { if testing.Short() { t.Skip() @@ -463,7 +470,63 @@ func TestGetBrokerMetrics(t *testing.T) { } } } -*/ + +func TestGetBrokerMetricsCompressed(t *testing.T) { + if testing.Short() { + t.Skip() + } + + // Create a compressed version of the metrics data. + data := []byte(`{ + "1001": {"StorageFree": 10000.00}, + "1002": {"StorageFree": 20000.00}, + "1003": {"StorageFree": 30000.00}, + "1004": {"StorageFree": 40000.00}, + "1005": {"StorageFree": 50000.00}}`) + + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + + _, err := zw.Write(data) + if err != nil { + t.Error(err) + } + + if err = zw.Close(); err != nil { + t.Error(err) + } + + // Store the compressed version. + _, err = zkc.Set("/topicmappr_test/brokermetrics", buf.Bytes(), -1) + if err != nil { + t.Fatal(err) + } + + // Test fetching the compressed version. + bm, errs := zki.GetAllBrokerMeta(true) + if errs != nil { + t.Error(err) + } + + expected := map[int]float64{ + 1001: 10000.00, + 1002: 20000.00, + 1003: 30000.00, + 1004: 40000.00, + 1005: 50000.00, + } + + for b, v := range bm { + if v.StorageFree != expected[b] { + t.Errorf("Unexpected StorageFree metric for broker %d", b) + } + } + + // Rewrite the uncompressed version. + if err := setBrokerMetrics(); err != nil { + t.Error(err) + } +} func TestGetAllPartitionMeta(t *testing.T) { if testing.Short() { @@ -498,6 +561,74 @@ func TestGetAllPartitionMeta(t *testing.T) { } +func TestGetAllPartitionMetaCompressed(t *testing.T) { + if testing.Short() { + t.Skip() + } + + // Fetch and hold the original partition meta. + pm, err := zki.GetAllPartitionMeta() + if err != nil { + t.Error(err) + } + + pmOrig, _ := json.Marshal(pm) + + // Create a compressed copy. + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + + _, err = zw.Write(pmOrig) + if err != nil { + t.Error(err) + } + + if err := zw.Close(); err != nil { + t.Error(err) + } + + // Store the compressed copy. + _, err = zkc.Set("/topicmappr_test/partitionmeta", buf.Bytes(), -1) + if err != nil { + t.Error(err) + } + + // Test fetching the compressed copy. + + pm, err = zki.GetAllPartitionMeta() + if err != nil { + t.Error(err) + } + + expected := map[int]float64{ + 0: 1000.00, + 1: 2000.00, + 2: 3000.00, + 3: 4000.00, + } + + for i := 0; i < 5; i++ { + topic := fmt.Sprintf("topic%d", i) + meta, exists := pm[topic] + if !exists { + t.Errorf("Expected topic '%s' in partition meta", topic) + } + + for partn, m := range meta { + if m.Size != expected[partn] { + t.Errorf("Expected size %f for %s %d, got %f", expected[partn], topic, partn, m.Size) + } + } + } + + // Reset to the original partitionMeta. + _, err = zkc.Set("/topicmappr_test/partitionmeta", pmOrig, -1) + if err != nil { + t.Error(err) + } + +} + func TestOldestMetaTs(t *testing.T) { if testing.Short() { t.Skip() @@ -514,7 +645,7 @@ func TestOldestMetaTs(t *testing.T) { zkr, err := rawHandler(&Config{ Connect: zkaddr, Prefix: configPrefix, - MetricsPrefix: "topicmappr", + MetricsPrefix: "topicmappr_test", }) if err != nil { t.Errorf("Error initializing ZooKeeper client: %s", err) @@ -524,14 +655,14 @@ func TestOldestMetaTs(t *testing.T) { // Get the lowest Mtime value. - _, m, err = zkc.Get("/topicmappr/partitionmeta") + _, m, err = zkc.Get("/topicmappr_test/partitionmeta") if err != nil { t.Error(err) } ts1 := m.Mtime - _, m, err = zkc.Get("/topicmappr/brokermetrics") + _, m, err = zkc.Get("/topicmappr_test/brokermetrics") if err != nil { t.Error(err) }