Skip to content

Commit

Permalink
Tags support (#272)
Browse files Browse the repository at this point in the history
* Refactor metric parsing

* Labels parsing

* Update carbonapi

* Use ParsedMetric.RawMetric for MatchedMetric.Metric

* Not include name to labels

* Refactor metric parsing tests

* ScanBytes -> NewBytesScanner, ByteSliceSplitScanner -> BytesScanner

* Move unsafe to unsafe.go
  • Loading branch information
Pliner authored Mar 6, 2019
1 parent 683fa52 commit 94139ce
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 688 deletions.
118 changes: 118 additions & 0 deletions filter/metrics_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package filter

import (
"fmt"
"strconv"

"github.com/moira-alert/moira"
)

//ParsedMetric represents a result of ParseMetric.
type ParsedMetric struct {
Metric string
Name string
Labels map[string]string
Value float64
Timestamp int64
}

// ParseMetric parses metric from string
// supported format: "<metricString> <valueFloat64> <timestampInt64>"
func ParseMetric(input []byte) (*ParsedMetric, error) {
if !isPrintableASCII(input) {
return nil, fmt.Errorf("non-ascii or non-printable chars in metric name: '%s'", input)
}

var metricBytes, valueBytes, timestampBytes []byte
inputScanner := moira.NewBytesScanner(input, ' ')
if !inputScanner.HasNext() {
return nil, fmt.Errorf("too few space-separated items: '%s'", input)
}
metricBytes = inputScanner.Next()
if !inputScanner.HasNext() {
return nil, fmt.Errorf("too few space-separated items: '%s'", input)
}
valueBytes = inputScanner.Next()
if !inputScanner.HasNext() {
return nil, fmt.Errorf("too few space-separated items: '%s'", input)
}
timestampBytes = inputScanner.Next()
if inputScanner.HasNext() {
return nil, fmt.Errorf("too many space-separated items: '%s'", input)
}

name, labels, err := parseNameAndLabels(metricBytes)
if err != nil {
return nil, fmt.Errorf("cannot parse metric: '%s' (%s)", input, err)
}

value, err := parseFloat(valueBytes)
if err != nil {
return nil, fmt.Errorf("cannot parse value: '%s' (%s)", input, err)
}

timestamp, err := parseFloat(timestampBytes)
if err != nil {
return nil, fmt.Errorf("cannot parse timestamp: '%s' (%s)", input, err)
}

parsedMetric := &ParsedMetric{
moira.UnsafeBytesToString(metricBytes),
name,
labels,
value,
int64(timestamp),
}
return parsedMetric, nil
}

func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) {
metricBytesScanner := moira.NewBytesScanner(metricBytes, ';')
if !metricBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too few colon-separated items: '%s'", metricBytes)
}
nameBytes := metricBytesScanner.Next()
if len(nameBytes) == 0 {
return "", nil, fmt.Errorf("empty metric name: '%s'", metricBytes)
}
name := moira.UnsafeBytesToString(nameBytes)
labels := make(map[string]string)
for metricBytesScanner.HasNext() {
labelBytes := metricBytesScanner.Next()
labelBytesScanner := moira.NewBytesScanner(labelBytes, '=')

var labelNameBytes, labelValueBytes []byte
if !labelBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes)
}
labelNameBytes = labelBytesScanner.Next()
if !labelBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes)
}
labelValueBytes = labelBytesScanner.Next()
if labelBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too many equal-separated items: '%s'", labelBytes)
}
if len(labelNameBytes) == 0 {
return "", nil, fmt.Errorf("empty label name: '%s'", labelBytes)
}
labelName := moira.UnsafeBytesToString(labelNameBytes)
labelValue := moira.UnsafeBytesToString(labelValueBytes)
labels[labelName] = labelValue
}
return name, labels, nil
}

func parseFloat(input []byte) (float64, error) {
return strconv.ParseFloat(moira.UnsafeBytesToString(input), 64)
}

func isPrintableASCII(b []byte) bool {
for i := 0; i < len(b); i++ {
if b[i] < 0x20 || b[i] > 0x7E {
return false
}
}

return true
}
103 changes: 103 additions & 0 deletions filter/metrics_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package filter

import (
"math/rand"
"strconv"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestParseMetric(t *testing.T) {
type ValidMetricCase struct {
input string
metric string
name string
labels map[string]string
value float64
timestamp int64
}

Convey("Given invalid metric strings, should return errors", t, func() {
invalidMetrics := []string{
"Invalid.value 12g5 1234567890",
"No.value.two.spaces 1234567890",
"No.timestamp.space.in.the.end 12 ",
"No.timestamp 12",
" 12 1234567890",
"Non-ascii.こんにちは 12 1234567890",
"Non-printable.\000 12 1234567890",
"",
"\n",
"Too.many.parts 1 2 3 4 12 1234567890",
"Space.in.the.end 12 1234567890 ",
" Space.in.the.beginning 12 1234567890",
"\tNon-printable.in.the.beginning 12 1234567890",
"\rNon-printable.in.the.beginning 12 1234567890",
"Newline.in.the.end 12 1234567890\n",
"Newline.in.the.end 12 1234567890\r",
"Newline.in.the.end 12 1234567890\r\n",
";empty.name.but.with.label= 1 2",
"no.labels.but.delimiter.in.the.end; 1 2",
"empty.label.name;= 1 2",
}

for _, invalidMetric := range invalidMetrics {
_, err := ParseMetric([]byte(invalidMetric))
So(err, ShouldBeError)
}
})

Convey("Given valid metric strings, should return parsed values", t, func() {
validMetrics := []ValidMetricCase{
{"One.two.three 123 1234567890", "One.two.three", "One.two.three", map[string]string{}, 123, 1234567890},
{"One.two.three 1.23e2 1234567890", "One.two.three", "One.two.three", map[string]string{}, 123, 1234567890},
{"One.two.three -123 1234567890", "One.two.three", "One.two.three", map[string]string{}, -123, 1234567890},
{"One.two.three +123 1234567890", "One.two.three", "One.two.three", map[string]string{}, 123, 1234567890},
{"One.two.three 123. 1234567890", "One.two.three", "One.two.three", map[string]string{}, 123, 1234567890},
{"One.two.three 123.0 1234567890", "One.two.three", "One.two.three", map[string]string{}, 123, 1234567890},
{"One.two.three .123 1234567890", "One.two.three", "One.two.three", map[string]string{}, 0.123, 1234567890},
{"One.two.three;four=five 123 1234567890", "One.two.three;four=five", "One.two.three", map[string]string{"four": "five"}, 123, 1234567890},
{"One.two.three;four= 123 1234567890", "One.two.three;four=", "One.two.three", map[string]string{"four": ""}, 123, 1234567890},
{"One.two.three;four=five;six=seven 123 1234567890", "One.two.three;four=five;six=seven", "One.two.three", map[string]string{"four": "five", "six": "seven"}, 123, 1234567890},
}

for _, validMetric := range validMetrics {
parsedMetric, err := ParseMetric([]byte(validMetric.input))
So(err, ShouldBeEmpty)
So(parsedMetric.Metric, ShouldEqual, validMetric.metric)
So(parsedMetric.Name, ShouldEqual, validMetric.name)
So(parsedMetric.Labels, ShouldResemble, validMetric.labels)
So(parsedMetric.Value, ShouldEqual, validMetric.value)
So(parsedMetric.Timestamp, ShouldEqual, validMetric.timestamp)
}
})

Convey("Given valid metric strings with float64 timestamp, should return parsed values", t, func() {
var testTimestamp int64 = 1234567890

// Create and test n metrics with float64 timestamp with fractional part of length n (n=19)
//
// For example:
//
// [n=1] One.two.three 123 1234567890.6
// [n=2] One.two.three 123 1234567890.94
// [n=3] One.two.three 123 1234567890.665
// [n=4] One.two.three 123 1234567890.4377
// ...
// [n=19] One.two.three 123 1234567890.6790847778320312500

for i := 1; i < 20; i++ {
rawTimestamp := strconv.FormatFloat(float64(testTimestamp)+rand.Float64(), 'f', i, 64)
rawMetric := "One.two.three 123 " + rawTimestamp
validMetric := ValidMetricCase{rawMetric, "One.two.three", "One.two.three", map[string]string{}, 123, testTimestamp}
parsedMetric, err := ParseMetric([]byte(validMetric.input))
So(err, ShouldBeEmpty)
So(parsedMetric.Metric, ShouldResemble, validMetric.metric)
So(parsedMetric.Name, ShouldResemble, validMetric.name)
So(parsedMetric.Labels, ShouldResemble, validMetric.labels)
So(parsedMetric.Value, ShouldEqual, validMetric.value)
So(parsedMetric.Timestamp, ShouldEqual, validMetric.timestamp)
}
})
}
80 changes: 13 additions & 67 deletions filter/patterns_storage.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package filter

import (
"bytes"
"fmt"
"path"
"strconv"
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics/graphite"
Expand Down Expand Up @@ -59,7 +56,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte) *moira.Ma
storage.metrics.TotalMetricsReceived.Inc(1)
count := storage.metrics.TotalMetricsReceived.Count()

metric, value, timestamp, err := storage.parseMetric(lineBytes)
parsedMetric, err := ParseMetric(lineBytes)
if err != nil {
storage.logger.Infof("cannot parse input: %v", err)
return nil
Expand All @@ -68,26 +65,26 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte) *moira.Ma
storage.metrics.ValidMetricsReceived.Inc(1)

matchingStart := time.Now()
matched := storage.matchPattern(metric)
matchedPatterns := storage.matchPatterns(parsedMetric.Name)
if count%10 == 0 {
storage.metrics.MatchingTimer.UpdateSince(matchingStart)
}
if len(matched) > 0 {
if len(matchedPatterns) > 0 {
storage.metrics.MatchingMetricsReceived.Inc(1)
return &moira.MatchedMetric{
Metric: string(metric),
Patterns: matched,
Value: value,
Timestamp: timestamp,
RetentionTimestamp: timestamp,
Metric: parsedMetric.Metric,
Patterns: matchedPatterns,
Value: parsedMetric.Value,
Timestamp: parsedMetric.Timestamp,
RetentionTimestamp: parsedMetric.Timestamp,
Retention: 60,
}
}
return nil
}

// matchPattern returns array of matched patterns
func (storage *PatternStorage) matchPattern(metric []byte) []string {
// matchPatterns returns array of matched patterns
func (storage *PatternStorage) matchPatterns(metric string) []string {
currentLevel := []*PatternNode{storage.PatternTree.Load().(*PatternNode)}
var found, index int
for i, c := range metric {
Expand Down Expand Up @@ -123,38 +120,6 @@ func (storage *PatternStorage) matchPattern(metric []byte) []string {
return matched
}

// parseMetric parses metric from string
// supported format: "<metricString> <valueFloat64> <timestampInt64>"
func (*PatternStorage) parseMetric(input []byte) ([]byte, float64, int64, error) {
firstSpaceIndex := bytes.IndexByte(input, ' ')
if firstSpaceIndex < 1 {
return nil, 0, 0, fmt.Errorf("too few space-separated items: '%s'", input)
}

secondSpaceIndex := bytes.IndexByte(input[firstSpaceIndex+1:], ' ')
if secondSpaceIndex < 1 {
return nil, 0, 0, fmt.Errorf("too few space-separated items: '%s'", input)
}
secondSpaceIndex += firstSpaceIndex + 1

metric := input[:firstSpaceIndex]
if !isPrintableASCII(metric) {
return nil, 0, 0, fmt.Errorf("non-ascii or non-printable chars in metric name: '%s'", input)
}

value, err := strconv.ParseFloat(unsafeString(input[firstSpaceIndex+1:secondSpaceIndex]), 64)
if err != nil {
return nil, 0, 0, fmt.Errorf("cannot parse value: '%s' (%s)", input, err)
}

timestamp, err := parseTimestamp(unsafeString(input[secondSpaceIndex+1:]))
if err != nil || timestamp == 0 {
return nil, 0, 0, fmt.Errorf("cannot parse timestamp: '%s' (%s)", input, err)
}

return metric, value, timestamp, nil
}

func (storage *PatternStorage) buildTree(patterns []string) error {
newTree := &PatternNode{}

Expand Down Expand Up @@ -209,11 +174,6 @@ func (storage *PatternStorage) buildTree(patterns []string) error {
return nil
}

func parseTimestamp(unixTimestamp string) (int64, error) {
timestamp, err := strconv.ParseFloat(unixTimestamp, 64)
return int64(timestamp), err
}

func hasEmptyParts(parts []string) bool {
for _, part := range parts {
if part == "" {
Expand All @@ -223,9 +183,9 @@ func hasEmptyParts(parts []string) bool {
return false
}

func findPart(part []byte, currentLevel []*PatternNode) ([]*PatternNode, int) {
func findPart(part string, currentLevel []*PatternNode) ([]*PatternNode, int) {
nextLevel := make([]*PatternNode, 0, 64)
hash := xxhash.Checksum32(part)
hash := xxhash.Checksum32(moira.UnsafeStringToBytes(part))
for _, node := range currentLevel {
for _, child := range node.Children {
match := false
Expand All @@ -234,7 +194,7 @@ func findPart(part []byte, currentLevel []*PatternNode) ([]*PatternNode, int) {
match = true
} else if len(child.InnerParts) > 0 {
for _, innerPart := range child.InnerParts {
innerMatch, _ := path.Match(innerPart, string(part))
innerMatch, _ := path.Match(innerPart, part)
if innerMatch {
match = true
break
Expand All @@ -257,17 +217,3 @@ func split2(s, sep string) (string, string) {
}
return splitResult[0], splitResult[1]
}

func unsafeString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

func isPrintableASCII(b []byte) bool {
for i := 0; i < len(b); i++ {
if b[i] < 0x20 || b[i] > 0x7E {
return false
}
}

return true
}
Loading

0 comments on commit 94139ce

Please sign in to comment.