Skip to content

Commit

Permalink
chore: pass down reader metrics to avoid duplicate registration (#15246)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Dec 4, 2024
1 parent e0cf6da commit d59a5e2
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
26 changes: 11 additions & 15 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/loki/v3/pkg/kafka"

Expand All @@ -25,12 +26,6 @@ const (
KafkaEndOffset SpecialOffset = -1
)

var rm *readerMetrics

func init() {
rm = newReaderMetrics(prometheus.DefaultRegisterer)
}

type Record struct {
// Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka.
Ctx context.Context
Expand All @@ -47,18 +42,19 @@ type Reader interface {
SetOffsetForConsumption(offset int64)
}

// readerMetrics contains metrics specific to Kafka reading operations
type readerMetrics struct {
// ReaderMetrics contains metrics specific to Kafka reading operations
type ReaderMetrics struct {
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
fetchWaitDuration prometheus.Histogram
receiveDelay prometheus.Histogram
lastCommittedOffset prometheus.Gauge
kprom *kprom.Metrics
}

func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
return &readerMetrics{
func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics {
return &ReaderMetrics{
fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_kafka_reader_fetch_wait_duration_seconds",
Help: "How long the reader spent waiting for a batch of records from Kafka.",
Expand Down Expand Up @@ -86,6 +82,7 @@ func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18),
}),
kprom: client.NewReaderClientMetrics("partition-reader", r),
}
}

Expand All @@ -95,21 +92,20 @@ type KafkaReader struct {
topic string
partitionID int32
consumerGroup string
metrics *readerMetrics
metrics *ReaderMetrics
logger log.Logger
}

func NewKafkaReader(
cfg kafka.Config,
partitionID int32,
logger log.Logger,
reg prometheus.Registerer,
metrics *ReaderMetrics,
) (*KafkaReader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
cfg,
clientMetrics,
metrics.kprom,
log.With(logger, "component", "kafka-client"),
)
if err != nil {
Expand All @@ -120,7 +116,7 @@ func NewKafkaReader(
client: c,
topic: cfg.Topic,
partitionID: partitionID,
metrics: rm,
metrics: metrics,
logger: logger,
}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ func NewReaderService(
logger log.Logger,
reg prometheus.Registerer,
) (*ReaderService, error) {
readerMetrics := NewReaderMetrics(reg)
reader, err := NewKafkaReader(
kafkaCfg,
partitionID,
logger,
reg,
readerMetrics,
)
if err != nil {
return nil, fmt.Errorf("creating kafka reader: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,11 +1821,12 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}

readerMetrics := partition.NewReaderMetrics(prometheus.DefaultRegisterer)
reader, err := partition.NewKafkaReader(
t.Cfg.KafkaConfig,
ingestPartitionID,
logger,
prometheus.DefaultRegisterer,
readerMetrics,
)
if err != nil {
return nil, err
Expand Down

0 comments on commit d59a5e2

Please sign in to comment.