Skip to content

Commit

Permalink
feat: Add a metric ingestion time SM sanitization (#15222)
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan authored Dec 4, 2024
1 parent 658fb24 commit e9d0c3e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
22 changes: 17 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ type Distributor struct {
RequestParserWrapper push.RequestParserWrapper

// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter
ingesterAppends *prometheus.CounterVec
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter
tenantPushSanitizedStructuredMetadata *prometheus.CounterVec

usageTracker push.UsageTracker
ingesterTasks chan pushIngesterTask
Expand Down Expand Up @@ -284,6 +285,11 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
tenantPushSanitizedStructuredMetadata: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_push_structured_metadata_sanitized_total",
Help: "The total number of times we've had to sanitize structured metadata (names or values) at ingestion time per tenant.",
}, []string{"tenant"}),
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_kafka_appends_total",
Expand Down Expand Up @@ -527,11 +533,17 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

var normalized string
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
for i := range entry.StructuredMetadata {
structuredMetadata[i].Name = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
normalized = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
if normalized != structuredMetadata[i].Name {
structuredMetadata[i].Name = normalized
d.tenantPushSanitizedStructuredMetadata.WithLabelValues(tenantID).Inc()
}
if strings.ContainsRune(structuredMetadata[i].Value, utf8.RuneError) {
structuredMetadata[i].Value = strings.Map(removeInvalidUtf, structuredMetadata[i].Value)
d.tenantPushSanitizedStructuredMetadata.WithLabelValues(tenantID).Inc()
}
}
if shouldDiscoverLevels {
Expand Down
8 changes: 8 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"
"unicode/utf8"

"github.com/prometheus/client_golang/prometheus/testutil"

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -1961,22 +1963,27 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
for _, tc := range []struct {
req *logproto.PushRequest
expectedResponse *logproto.PushResponse
numSanitizations float64
}{
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, false),
success,
0,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, false),
success,
10,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, true),
success,
10,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, true),
success,
20,
},
} {
distributors, _ := prepare(t, 1, 5, limits, nil)
Expand All @@ -1988,5 +1995,6 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
response, err := distributors[0].Push(ctx, &request)
require.NoError(t, err)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.numSanitizations, testutil.ToFloat64(distributors[0].tenantPushSanitizedStructuredMetadata.WithLabelValues("test")))
}
}

0 comments on commit e9d0c3e

Please sign in to comment.