Skip to content

Commit

Permalink
kfake: 885-test, new APIs
Browse files Browse the repository at this point in the history
(commit to be fixed up once 885 is merged and released in 1.18.1)
  • Loading branch information
twmb committed Jan 19, 2025
1 parent 847095b commit 64ca34f
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 7 deletions.
4 changes: 2 additions & 2 deletions pkg/kfake/01_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er
if !ok || pd.createdAt.After(creq.at) {
continue
}
if pd.leader != creq.cc.b {
if pd.leader != creq.cc.b && !pd.followers.has(creq.cc.b) {
returnEarly = true // NotLeaderForPartition
break out
}
Expand Down Expand Up @@ -162,7 +162,7 @@ full:
}
continue
}
if pd.leader != creq.cc.b {
if pd.leader != creq.cc.b && !pd.followers.has(creq.cc.b) {
p := donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code)
p.CurrentLeader.LeaderID = pd.leader.node
p.CurrentLeader.LeaderEpoch = pd.epoch
Expand Down
27 changes: 27 additions & 0 deletions pkg/kfake/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,20 @@ func (c *Cluster) CoordinatorFor(key string) int32 {
return n
}

// LeaderFor returns the node ID of the topic partition. If the partition
// does not exist, this returns -1.
func (c *Cluster) LeaderFor(topic string, partition int32) int32 {
n := int32(-1)
c.admin(func() {
pd, ok := c.data.tps.getp(topic, partition)
if !ok {
return
}
n = pd.leader.node
})
return n
}

// RehashCoordinators simulates group and transacational ID coordinators moving
// around. All group and transactional IDs are rekeyed. This forces clients to
// reload coordinators.
Expand Down Expand Up @@ -1084,3 +1098,16 @@ func (c *Cluster) shufflePartitionsLocked() {
p.epoch++
})
}

// SetFollowers sets the node IDs of brokers that can also serve fetch requests
// for a partition. Setting followers to an empty or nil slice reverts to the
// default of only the leader being able to serve fetch requests.
func (c *Cluster) SetFollowers(topic string, partition int32, followers []int32) {
c.admin(func() {
pd, ok := c.data.tps.getp(topic, partition)
if !ok {
return
}
pd.followers = append([]int32(nil), followers...)
})
}
19 changes: 17 additions & 2 deletions pkg/kfake/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ type (
nbytes int64

// abortedTxns
rf int8
leader *broker
rf int8
leader *broker
followers followers

watch map[*watchFetch]struct{}

createdAt time.Time
}

followers []int32

partBatch struct {
kmsg.RecordBatch
nbytes int
Expand All @@ -68,6 +71,15 @@ type (
}
)

func (fs followers) has(b *broker) bool {
for _, f := range fs {
if f == b.node {
return true
}
}
return false
}

func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*string) {
if d.tps != nil {
if _, exists := d.tps[t]; exists {
Expand All @@ -88,6 +100,9 @@ func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*stri
}
if nreplicas < 0 {
nreplicas = 3 // cluster default
if nreplicas > len(d.c.bs) {
nreplicas = len(d.c.bs)
}
}
d.id2t[id] = t
d.t2id[t] = id
Expand Down
7 changes: 6 additions & 1 deletion pkg/kfake/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ toolchain go1.22.0

require (
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kmsg v1.8.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
golang.org/x/crypto v0.23.0
)

require (
github.com/klauspost/compress v1.17.8 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
)
8 changes: 6 additions & 2 deletions pkg/kfake/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
162 changes: 162 additions & 0 deletions pkg/kfake/issues_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package kfake

import (
"context"
"os"
"strconv"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)

func TestIssue885(t *testing.T) {
const (
testTopic = "foo"
producedMessages = 5
followerLogStart = 3
)

c, err := NewCluster(
NumBrokers(2),
SleepOutOfOrder(),
SeedTopics(1, testTopic),
)
if err != nil {
t.Fatal(err)
}
defer c.Close()

// Flow:
//
// * We always say one broker is the leader -- every Metadata response
// can be the same; we do not need to hijack Metadata
//
// * We produce 5 separate batches just to have some data
//
// * We hijack fetch: if to the leader, we say the other broker is the
// follower.
//
// * We hijack fetch 2: if to follower, we say "offset out of range".
//
// END SETUP STAGE.
//
// TEST
//
// * We return one batch at a time from the leader.
// * We expect the leader to receive 3 requests.
// * On the fourth, we redirect back to the follower.
// * Batch four and five are served from the follower.
// * We are done.
// * Any deviation is failure.
//
// We control the flow through the stages; any bug results in not continuing
// forward (i.e. looping through the stages and never finishing).

// Inline anonymous function so that we can defer and cleanup within scope.
func() {
cl, err := kgo.NewClient(
kgo.DefaultProduceTopic(testTopic),
kgo.SeedBrokers(c.ListenAddrs()...),
)
if err != nil {
t.Fatal(err)
}
defer cl.Close()

for i := 0; i < producedMessages; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
err := cl.ProduceSync(ctx, kgo.StringRecord(strconv.Itoa(i))).FirstErr()
cancel()
if err != nil {
t.Fatal(err)
}
}
}()

var followerOOOR bool

ti := c.TopicInfo(testTopic)
pi := c.PartitionInfo(testTopic, 0)
follower := (pi.Leader + 1) % 2
c.SetFollowers(testTopic, 0, []int32{follower})

c.ControlKey(1, func(kreq kmsg.Request) (kmsg.Response, error, bool) {
c.KeepControl()

req := kreq.(*kmsg.FetchRequest)
if req.Version < 11 {
t.Fatal("unable to run test with fetch requests < v11")
}

if len(req.Topics) != 1 || len(req.Topics[0].Partitions) != 1 {
t.Fatalf("unexpected malformed req topics or partitions: %v", req)
}

// If we *do* return a batch, we want to ensure we return only
// one batch. We modify the incoming request to ensure at most
// one batch is returned.
req.MaxBytes = 1

resp := req.ResponseKind().(*kmsg.FetchResponse)
rt := kmsg.NewFetchResponseTopic()
rt.Topic = testTopic
rt.TopicID = ti.TopicID
rp := kmsg.NewFetchResponseTopicPartition()

resp.Topics = append(resp.Topics, rt)
rtp := &resp.Topics[0]

rtp.Partitions = append(rtp.Partitions, rp)
rpp := &rtp.Partitions[0]

rpp.Partition = 0
rpp.ErrorCode = 0
rpp.HighWatermark = pi.HighWatermark
rpp.LastStableOffset = pi.LastStableOffset
rpp.LogStartOffset = 0

if c.CurrentNode() == pi.Leader {
if !followerOOOR || req.Topics[0].Partitions[0].FetchOffset >= followerLogStart {
rpp.PreferredReadReplica = (pi.Leader + 1) % 2
return resp, nil, true
}
return nil, nil, false
}

if req.Topics[0].Partitions[0].FetchOffset < followerLogStart {
rpp.ErrorCode = kerr.OffsetOutOfRange.Code
rpp.LogStartOffset = 2
followerOOOR = true
return resp, nil, true
}

return nil, nil, false
})

cl, err := kgo.NewClient(
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)),
kgo.SeedBrokers(c.ListenAddrs()...),
kgo.ConsumeTopics(testTopic),
kgo.Rack("foo"),
kgo.DisableFetchSessions(),
)
if err != nil {
t.Fatal(err)
}
defer cl.Close()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

for consumed := 0; consumed != producedMessages; {
fs := cl.PollFetches(ctx)
if errs := fs.Errors(); errs != nil {
t.Errorf("consume error: %v", errs)
break
}
consumed += fs.NumRecords()
}
}
99 changes: 99 additions & 0 deletions pkg/kfake/topic_partition.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kfake

import "sort"

type tps[V any] map[string]map[int32]*V

func (tps *tps[V]) getp(t string, p int32) (*V, bool) {
Expand Down Expand Up @@ -73,3 +75,100 @@ func (tps *tps[V]) delp(t string, p int32) {
delete(*tps, t)
}
}

// TopicInfo contains snapshot-in-time metadata about an existing topic.
type TopicInfo struct {
TopicID [16]byte // TopicID is the UUID of the topic.
NumReplicas int // NumReplicas is the replication factor for all partitions in this topic.
Configs map[string]*string // Configs contains all configuration values specified for this topic.
}

// PartitionInfo contains snapshot-in-time metadata about an existing partition.
type PartitionInfo struct {
HighWatermark int64 // HighWatermark is the latest offset present in the partition.
LastStableOffset int64 // LastStableOffset is the last stable offset.
LogStartOffset int64 // LogStartOffsets is the first offset present in the partition.
Epoch int32 // Epoch is the current "epoch" of the partition -- how many times the partition transferred leaders.
MaxTimestamp int64 // MaxTimestamp is the current max timestamp across all batches.
NumBytes int64 // NumBytes is the current amount of data stored in the partition.
Leader int32 // Leader is the current leader of the partition.
}

func (pd *partData) info() *PartitionInfo {
return &PartitionInfo{
HighWatermark: pd.highWatermark,
LastStableOffset: pd.lastStableOffset,
LogStartOffset: pd.logStartOffset,
Epoch: pd.epoch,
MaxTimestamp: pd.maxTimestamp,
NumBytes: pd.nbytes,
Leader: pd.leader.node,
}
}

// TopicInfo returns information about a topic if it exists.
func (c *Cluster) TopicInfo(topic string) *TopicInfo {
var i *TopicInfo
c.admin(func() {
id, exists := c.data.t2id[topic]
if !exists {
return
}
clone := func(m map[string]*string) map[string]*string { // a deeper maps.Clone
m2 := make(map[string]*string, len(m))
for k, v := range m {
var v2 *string
if v != nil {
vv := *v
v2 = &vv
}
m2[k] = v2
}
return m2
}
i = &TopicInfo{
TopicID: id,
NumReplicas: c.data.treplicas[topic],
Configs: clone(c.data.tcfgs[topic]),
}
})
return i
}

// PartitionInfo returns information about a partition if it exists.
func (c *Cluster) PartitionInfo(topic string, partition int32) *PartitionInfo {
var i *PartitionInfo
c.admin(func() {
pd, ok := c.data.tps.getp(topic, partition)
if !ok {
return
}
i = pd.info()
})
return i
}

// PartitionInfos returns information about all partitions in a topic,
// if it exists. The partitions are returned in sorted partition order,
// with partition 0 at index 0, partition 1 at index 1, etc.
func (c *Cluster) PartitionInfos(topic string) []*PartitionInfo {
var is []*PartitionInfo
c.admin(func() {
t, ok := c.data.tps.gett(topic)
if !ok {
return
}
partitions := make([]int32, 0, len(t))
for p := range t {
partitions = append(partitions, p)
}
sort.Slice(partitions, func(i, j int) bool {
return partitions[i] < partitions[j]
})
for _, p := range partitions {
pd, _ := c.data.tps.getp(topic, p)
is = append(is, pd.info())
}
})
return is
}

0 comments on commit 64ca34f

Please sign in to comment.