Skip to content

Commit

Permalink
feat(blockbuilder): consolidate on record counting planner (#15247)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Dec 4, 2024
1 parent 0d67831 commit 532bdbc
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 164 deletions.
14 changes: 9 additions & 5 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,21 @@ block_scheduler:
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 5m]

# Period used by the planner to calculate the start and end offset such that
# each job consumes records spanning the target period.
# CLI flag: -block-scheduler.target-records-spanning-period
[target_records_spanning_period: <duration> | default = 1h]

# Lookback period in milliseconds used by the scheduler to plan jobs when the
# consumer group has no commits. -1 consumes from the latest offset. -2
# consumes from the start of the partition.
# CLI flag: -block-scheduler.lookback-period
[lookback_period: <int> | default = -2]

# Strategy used by the planner to plan jobs. One of record-count
# CLI flag: -block-scheduler.strategy
[strategy: <string> | default = "record-count"]

# Target record count used by the planner to plan jobs. Only used when
# strategy is record-count
# CLI flag: -block-scheduler.target-record-count
[target_record_count: <int> | default = 1000]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
44 changes: 37 additions & 7 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"flag"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand All @@ -22,17 +24,36 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
TargetRecordConsumptionPeriod time.Duration `yaml:"target_records_spanning_period"`
LookbackPeriod int64 `yaml:"lookback_period"`
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod int64 `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
planner Planner `yaml:"-"` // validated planner
TargetRecordCount int64 `yaml:"target_record_count"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.")
f.DurationVar(&cfg.TargetRecordConsumptionPeriod, prefix+"target-records-spanning-period", time.Hour, "Period used by the planner to calculate the start and end offset such that each job consumes records spanning the target period.")
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.")
f.Int64Var(&cfg.LookbackPeriod, prefix+"lookback-period", -2, "Lookback period in milliseconds used by the scheduler to plan jobs when the consumer group has no commits. -1 consumes from the latest offset. -2 consumes from the start of the partition.")
f.StringVar(
&cfg.Strategy,
prefix+"strategy",
RecordCountStrategy,
fmt.Sprintf(
"Strategy used by the planner to plan jobs. One of %s",
strings.Join(validStrategies, ", "),
),
)
f.Int64Var(
&cfg.TargetRecordCount,
prefix+"target-record-count",
1000,
fmt.Sprintf(
"Target record count used by the planner to plan jobs. Only used when strategy is %s",
RecordCountStrategy,
),
)
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -48,6 +69,16 @@ func (cfg *Config) Validate() error {
return errors.New("only -1(latest) and -2(earliest) are valid as negative values for lookback_period")
}

switch cfg.Strategy {
case RecordCountStrategy:
if cfg.TargetRecordCount <= 0 {
return errors.New("target record count must be a non-zero value")
}
cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount)
default:
return fmt.Errorf("invalid strategy: %s", cfg.Strategy)
}

return nil
}

Expand All @@ -66,10 +97,9 @@ type BlockScheduler struct {

// NewScheduler creates a new scheduler instance
func NewScheduler(cfg Config, queue *JobQueue, offsetReader OffsetReader, logger log.Logger, r prometheus.Registerer) *BlockScheduler {
planner := NewTimeRangePlanner(cfg.TargetRecordConsumptionPeriod, offsetReader, func() time.Time { return time.Now().UTC() }, logger)
s := &BlockScheduler{
cfg: cfg,
planner: planner,
planner: cfg.planner,
offsetReader: offsetReader,
logger: logger,
metrics: NewMetrics(r),
Expand Down
101 changes: 101 additions & 0 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,104 @@ func TestMultipleBuilders(t *testing.T) {
t.Error("builder1 got unexpected second job")
}
}

func TestConfig_Validate(t *testing.T) {
tests := []struct {
name string
cfg Config
wantErr string
}{
{
name: "valid config with record count strategy",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
},
{
name: "zero interval",
cfg: Config{
Interval: 0,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
wantErr: "interval must be a non-zero value",
},
{
name: "negative interval",
cfg: Config{
Interval: -time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
wantErr: "interval must be a non-zero value",
},
{
name: "invalid lookback period",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -3,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
wantErr: "only -1(latest) and -2(earliest) are valid as negative values for lookback_period",
},
{
name: "invalid strategy",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: "invalid",
TargetRecordCount: 1000,
},
wantErr: "invalid strategy: invalid",
},
{
name: "zero target record count",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 0,
},
wantErr: "target record count must be a non-zero value",
},
{
name: "negative target record count",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: -1000,
},
wantErr: "target record count must be a non-zero value",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr != "" {
if err == nil {
t.Errorf("Validate() error = nil, wantErr %v", tt.wantErr)
return
}
if err.Error() != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
if err != nil {
t.Errorf("Validate() error = %v, wantErr nil", err)
}
// Check that planner is set for valid configs
if tt.cfg.planner == nil {
t.Error("Validate() did not set planner for valid config")
}
})
}
}
112 changes: 26 additions & 86 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -14,7 +13,6 @@ import (

// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
type OffsetReader interface {
ListOffsetsAfterMilli(context.Context, int64) (map[int32]kadm.ListedOffset, error)
GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error)
}

Expand All @@ -24,10 +22,13 @@ type Planner interface {
}

const (
RecordCountStrategy = "record_count"
TimeRangeStrategy = "time_range"
RecordCountStrategy = "record-count"
)

var validStrategies = []string{
RecordCountStrategy,
}

// tries to consume upto targetRecordCount records per partition
type RecordCountPlanner struct {
targetRecordCount int64
Expand All @@ -52,101 +53,40 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
return nil, err
}

var jobs []*JobWithPriority[int]
jobs := make([]*JobWithPriority[int], 0, len(offsets))
for _, partitionOffset := range offsets {
// kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
startOffset := partitionOffset.Commit.At + 1
endOffset := min(startOffset+p.targetRecordCount, partitionOffset.End.Offset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
Min: startOffset,
Max: endOffset,
}), int(partitionOffset.End.Offset-startOffset),
)

jobs = append(jobs, job)
}

// Sort jobs by partition number to ensure consistent ordering
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].Job.Partition < jobs[j].Job.Partition
})

return jobs, nil
}

// Targets consuming records spanning a configured period.
// This is a stateless planner, it is upto the caller to deduplicate or update jobs that are already in queue or progress.
type TimeRangePlanner struct {
offsetReader OffsetReader

buffer time.Duration
targetPeriod time.Duration
now func() time.Time

logger log.Logger
}

func NewTimeRangePlanner(interval time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeRangePlanner {
return &TimeRangePlanner{
targetPeriod: interval,
buffer: interval,
offsetReader: offsetReader,
now: now,
logger: logger,
}
}

func (p *TimeRangePlanner) Name() string {
return TimeRangeStrategy
}

func (p *TimeRangePlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) {
// truncate to the nearest Interval
consumeUptoTS := p.now().Add(-p.buffer).Truncate(p.targetPeriod)

// this will return the latest offset in the partition if no records are produced after this ts.
consumeUptoOffsets, err := p.offsetReader.ListOffsetsAfterMilli(ctx, consumeUptoTS.UnixMilli())
if err != nil {
level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err)
return nil, err
}

offsets, err := p.offsetReader.GroupLag(ctx)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
return nil, err
}

var jobs []*JobWithPriority[int]
for _, partitionOffset := range offsets {
startOffset := partitionOffset.Commit.At + 1
// TODO: we could further break down the work into Interval sized chunks if this partition has pending records spanning a long time range
// or have the builder consume in chunks and commit the job status back to scheduler.
endOffset := consumeUptoOffsets[partitionOffset.Partition].Offset
endOffset := partitionOffset.End.Offset

// Skip if there's no lag
if startOffset >= endOffset {
level.Info(p.logger).Log("msg", "no pending records to process", "partition", partitionOffset.Partition,
"commitOffset", partitionOffset.Commit.At,
"consumeUptoOffset", consumeUptoOffsets[partitionOffset.Partition].Offset)
continue
}

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
Min: startOffset,
Max: endOffset,
}), int(endOffset-startOffset),
)
// Create jobs of size targetRecordCount until we reach endOffset
for currentStart := startOffset; currentStart < endOffset; {
currentEnd := min(currentStart+p.targetRecordCount, endOffset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
Min: currentStart,
Max: currentEnd,
}), int(endOffset-currentStart), // priority is remaining records to process
)
jobs = append(jobs, job)

jobs = append(jobs, job)
currentStart = currentEnd
}
}

// Sort jobs by partition number to ensure consistent ordering
// Sort jobs by partition then priority
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].Job.Partition < jobs[j].Job.Partition
if jobs[i].Job.Partition != jobs[j].Job.Partition {
return jobs[i].Job.Partition < jobs[j].Job.Partition
}
return jobs[i].Priority > jobs[j].Priority
})

return jobs, nil
Expand Down
Loading

0 comments on commit 532bdbc

Please sign in to comment.