Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into block-scheduler-que…
Browse files Browse the repository at this point in the history
…ue-priority
  • Loading branch information
owen-d committed Dec 4, 2024
2 parents 2ecc114 + e0cf6da commit 4218c7c
Show file tree
Hide file tree
Showing 21 changed files with 421 additions and 275 deletions.
1 change: 1 addition & 0 deletions docs/sources/send-data/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ By adding our output plugin you can quickly try Loki without doing big configura
These third-party clients also enable sending logs to Loki:

- [Cribl Loki Destination](https://docs.cribl.io/stream/destinations-loki)
- [GrafanaLokiLogger](https://github.com/antoniojmsjr/GrafanaLokiLogger) (Delphi/Lazarus)
- [ilogtail](https://github.com/alibaba/ilogtail) (Go)
- [Log4j2 appender for Loki](https://github.com/tkowalcz/tjahzi) (Java)
- [loki-logback-appender](https://github.com/loki4j/loki-logback-appender) (Java)
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/send-data/otel/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ For ingesting logs to Loki using the OpenTelemetry Collector, you must use the [

When logs are ingested by Loki using an OpenTelemetry protocol (OTLP) ingestion endpoint, some of the data is stored as [Structured Metadata]({{< relref "../../get-started/labels/structured-metadata" >}}).

You must set `allow_structured_metadata` to `true` within your Loki config file. Otherwise, Loki will reject the log payload as malformed.
You must set `allow_structured_metadata` to `true` within your Loki config file. Otherwise, Loki will reject the log payload as malformed. Note that Structured Metadata is enabled by default in Loki 3.0 and later.

```yaml
limits_config:
Expand Down
8 changes: 4 additions & 4 deletions docs/sources/send-data/promtail/stages/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type: Counter
[max_idle_duration: <string>]

config:
# If present and true all log lines will be counted without
# attempting to match the source to the extract map.
# If present and true all log lines will be counted without attempting
# to match the `value` to the field specified by `source` in the extracted map.
# It is an error to specify `match_all: true` and also specify a `value`
[match_all: <bool>]

Expand Down Expand Up @@ -231,7 +231,7 @@ This pipeline first tries to find text in the format `order_status=<value>` in
the log line, pulling out the `<value>` into the extracted map with the key
`order_status`.

The metric stages creates `successful_orders_total` and `failed_orders_total`
The metrics stage creates `successful_orders_total` and `failed_orders_total`
metrics that only increment when the value of `order_status` in the extracted
map is `success` or `fail` respectively.

Expand Down Expand Up @@ -265,7 +265,7 @@ number in the `retries` field from the extracted map.
- metrics:
http_response_time_seconds:
type: Histogram
description: "length of each log line"
description: "distribution of log response time"
source: response_time
config:
buckets: [0.001,0.0025,0.005,0.010,0.025,0.050]
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.55.5
github.com/baidubce/bce-sdk-go v0.9.202
github.com/baidubce/bce-sdk-go v0.9.203
github.com/bmatcuk/doublestar/v4 v4.7.1
github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500
github.com/cespare/xxhash/v2 v2.3.0
Expand Down Expand Up @@ -76,7 +76,7 @@ require (
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing-contrib/go-grpc v0.1.0
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing-contrib/go-stdlib v1.1.0
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b
github.com/oschwald/geoip2-golang v1.11.0
// github.com/pierrec/lz4 v2.0.5+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,8 @@ github.com/aws/smithy-go v1.11.1 h1:IQ+lPZVkSM3FRtyaDox41R8YS6iwPMYIreejOgPW49g=
github.com/aws/smithy-go v1.11.1/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM=
github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo=
github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM=
github.com/baidubce/bce-sdk-go v0.9.202 h1:TGRdO4g4CtiI2IZ6MxeUmkbKe6l8kq+mYH6SbxczO3g=
github.com/baidubce/bce-sdk-go v0.9.202/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/baidubce/bce-sdk-go v0.9.203 h1:D4YBk4prtlIjrnwrh5nvsSSjLjataApDmeL0fxvI/KU=
github.com/baidubce/bce-sdk-go v0.9.203/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
Expand Down Expand Up @@ -2309,8 +2309,8 @@ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2sz
github.com/opentracing-contrib/go-grpc v0.1.0 h1:9JHDtQXv6UL0tFF8KJB/4ApJgeOcaHp1h07d0PJjESc=
github.com/opentracing-contrib/go-grpc v0.1.0/go.mod h1:i3/jx/TvJZ/HKidtT4XGIi/NosUEpzS9xjVJctbKZzI=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing-contrib/go-stdlib v1.0.0 h1:TBS7YuVotp8myLon4Pv7BtCBzOTo1DeZCld0Z63mW2w=
github.com/opentracing-contrib/go-stdlib v1.0.0/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0=
github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down
39 changes: 21 additions & 18 deletions pkg/blockbuilder/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ type PartitionController interface {
//
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
logger log.Logger
stepLen int64
reader partition.Reader
offsetManager partition.OffsetManager
backoff backoff.Config
decoder *kafka.Decoder
logger log.Logger
}

func NewPartitionJobController(
controller partition.Reader,
reader partition.Reader,
offsetManager partition.OffsetManager,
backoff backoff.Config,
logger log.Logger,
) (*PartitionJobController, error) {
Expand All @@ -73,14 +75,15 @@ func NewPartitionJobController(
return nil, err
}
return &PartitionJobController{
stepLen: 1000, // Default step length of 1000 offsets per job
part: controller,
backoff: backoff,
decoder: decoder,
stepLen: 1000, // Default step length of 1000 offsets per job
reader: reader,
offsetManager: offsetManager,
backoff: backoff,
decoder: decoder,
logger: log.With(logger,
"component", "job-controller",
"topic", controller.Topic(),
"partition", controller.Partition(),
"topic", offsetManager.Topic(),
"partition", offsetManager.Partition(),
),
}, nil
}
Expand All @@ -90,7 +93,7 @@ func (l *PartitionJobController) HighestCommittedOffset(ctx context.Context) (in
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchLastCommittedOffset(ctx)
return l.offsetManager.FetchLastCommittedOffset(ctx)
},
)
}
Expand All @@ -100,7 +103,7 @@ func (l *PartitionJobController) HighestPartitionOffset(ctx context.Context) (in
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaEndOffset)
return l.offsetManager.FetchPartitionOffset(ctx, partition.KafkaEndOffset)
},
)
}
Expand All @@ -110,13 +113,13 @@ func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (i
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaStartOffset)
return l.offsetManager.FetchPartitionOffset(ctx, partition.KafkaStartOffset)
},
)
}

func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
l.part.SetOffsetForConsumption(offsets.Min)
l.reader.SetOffsetForConsumption(offsets.Min)

var (
lastOffset = offsets.Min - 1
Expand All @@ -126,7 +129,7 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offs

for lastOffset < offsets.Max && boff.Ongoing() {
var records []partition.Record
records, err = l.part.Poll(ctx, int(offsets.Max-lastOffset))
records, err = l.reader.Poll(ctx, int(offsets.Max-lastOffset))
if err != nil {
boff.Wait()
continue
Expand Down Expand Up @@ -217,7 +220,7 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *types.Job,
}

// Convert partition from int32 to int
job := types.NewJob(int(l.part.Partition()), offsets)
job := types.NewJob(int(l.reader.Partition()), offsets)
return true, job, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/builder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
return false, nil
}

if err = i.jobController.part.Commit(ctx, lastOffset); err != nil {
if err = i.jobController.offsetManager.Commit(ctx, lastOffset); err != nil {
level.Error(logger).Log(
"msg", "failed to commit offset",
"last_offset", lastOffset,
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestTimeRangePlanner_Plan(t *testing.T) {
require.NoError(t, err)

require.Equal(t, len(tc.expectedJobs), len(jobs))
require.Equal(t, tc.expectedJobs, jobs)
require.ElementsMatch(t, tc.expectedJobs, jobs)
})
}
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ type Distributor struct {
RequestParserWrapper push.RequestParserWrapper

// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter
ingesterAppends *prometheus.CounterVec
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter
tenantPushSanitizedStructuredMetadata *prometheus.CounterVec

usageTracker push.UsageTracker
ingesterTasks chan pushIngesterTask
Expand Down Expand Up @@ -284,6 +285,11 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
tenantPushSanitizedStructuredMetadata: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_push_structured_metadata_sanitized_total",
Help: "The total number of times we've had to sanitize structured metadata (names or values) at ingestion time per tenant.",
}, []string{"tenant"}),
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_kafka_appends_total",
Expand Down Expand Up @@ -527,11 +533,17 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

var normalized string
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
for i := range entry.StructuredMetadata {
structuredMetadata[i].Name = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
normalized = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
if normalized != structuredMetadata[i].Name {
structuredMetadata[i].Name = normalized
d.tenantPushSanitizedStructuredMetadata.WithLabelValues(tenantID).Inc()
}
if strings.ContainsRune(structuredMetadata[i].Value, utf8.RuneError) {
structuredMetadata[i].Value = strings.Map(removeInvalidUtf, structuredMetadata[i].Value)
d.tenantPushSanitizedStructuredMetadata.WithLabelValues(tenantID).Inc()
}
}
if shouldDiscoverLevels {
Expand Down
8 changes: 8 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"
"unicode/utf8"

"github.com/prometheus/client_golang/prometheus/testutil"

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -1961,22 +1963,27 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
for _, tc := range []struct {
req *logproto.PushRequest
expectedResponse *logproto.PushResponse
numSanitizations float64
}{
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, false),
success,
0,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, false),
success,
10,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, true),
success,
10,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, true),
success,
20,
},
} {
distributors, _ := prepare(t, 1, 5, limits, nil)
Expand All @@ -1988,5 +1995,6 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
response, err := distributors[0].Push(ctx, &request)
require.NoError(t, err)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.numSanitizations, testutil.ToFloat64(distributors[0].tenantPushSanitizedStructuredMetadata.WithLabelValues("test")))
}
}
24 changes: 12 additions & 12 deletions pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,34 @@ type partitionCommitter struct {
commitFailuresTotal prometheus.Counter
lastCommittedOffset prometheus.Gauge

logger log.Logger
reader Reader
commitFreq time.Duration
logger log.Logger
offsetManager OffsetManager
commitFreq time.Duration

toCommit *atomic.Int64
wg sync.WaitGroup
cancel context.CancelFunc
}

func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newCommitter(offsetManager OffsetManager, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
reader: reader,
commitFreq: commitFreq,
logger: logger,
offsetManager: offsetManager,
commitFreq: commitFreq,
commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_offset_commit_requests_total",
Help: "Total number of requests issued to commit the last consumed offset (includes both successful and failed requests).",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
}),
commitFailuresTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_offset_commit_failures_total",
Help: "Total number of failed requests to commit the last consumed offset.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
}),
commitRequestsLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_offset_commit_request_duration_seconds",
Help: "The duration of requests to commit the last consumed offset.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: time.Hour,
Expand All @@ -61,7 +61,7 @@ func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, re
lastCommittedOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_last_committed_offset",
Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
}),
toCommit: atomic.NewInt64(-1),
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *partitionCommitter) Commit(ctx context.Context, offset int64) error {
startTime := time.Now()
c.commitRequestsTotal.Inc()

if err := c.reader.Commit(ctx, offset); err != nil {
if err := c.offsetManager.Commit(ctx, offset); err != nil {
level.Error(c.logger).Log("msg", "failed to commit offset", "err", err, "offset", offset)
c.commitFailuresTotal.Inc()
c.commitRequestsLatency.Observe(time.Since(startTime).Seconds())
Expand Down
3 changes: 1 addition & 2 deletions pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ func TestPartitionCommitter(t *testing.T) {
reg := prometheus.NewRegistry()
partitionID := int32(1)
consumerGroup := "test-consumer-group"
reader := newKafkaReader(
reader := newKafkaOffsetManager(
client,
kafkaCfg.Topic,
partitionID,
consumerGroup,
logger,
reg,
)
committer := newCommitter(reader, kafkaCfg.ConsumerGroupOffsetCommitInterval, logger, reg)

Expand Down
Loading

0 comments on commit 4218c7c

Please sign in to comment.