diff --git a/pkg/kfake/01_fetch.go b/pkg/kfake/01_fetch.go index 53f9a8e6..a674e7c6 100644 --- a/pkg/kfake/01_fetch.go +++ b/pkg/kfake/01_fetch.go @@ -51,7 +51,7 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er if !ok || pd.createdAt.After(creq.at) { continue } - if pd.leader != creq.cc.b { + if pd.leader != creq.cc.b && !pd.followers.has(creq.cc.b) { returnEarly = true // NotLeaderForPartition break out } @@ -162,7 +162,7 @@ full: } continue } - if pd.leader != creq.cc.b { + if pd.leader != creq.cc.b && !pd.followers.has(creq.cc.b) { p := donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code) p.CurrentLeader.LeaderID = pd.leader.node p.CurrentLeader.LeaderEpoch = pd.epoch diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 3720cf61..f9504dee 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -980,6 +980,20 @@ func (c *Cluster) CoordinatorFor(key string) int32 { return n } +// LeaderFor returns the node ID of the topic partition. If the partition +// does not exist, this returns -1. +func (c *Cluster) LeaderFor(topic string, partition int32) int32 { + n := int32(-1) + c.admin(func() { + pd, ok := c.data.tps.getp(topic, partition) + if !ok { + return + } + n = pd.leader.node + }) + return n +} + // RehashCoordinators simulates group and transacational ID coordinators moving // around. All group and transactional IDs are rekeyed. This forces clients to // reload coordinators. @@ -1084,3 +1098,16 @@ func (c *Cluster) shufflePartitionsLocked() { p.epoch++ }) } + +// SetFollowers sets the node IDs of brokers that can also serve fetch requests +// for a partition. Setting followers to an empty or nil slice reverts to the +// default of only the leader being able to serve fetch requests. +func (c *Cluster) SetFollowers(topic string, partition int32, followers []int32) { + c.admin(func() { + pd, ok := c.data.tps.getp(topic, partition) + if !ok { + return + } + pd.followers = append([]int32(nil), followers...) + }) +} diff --git a/pkg/kfake/data.go b/pkg/kfake/data.go index 566d51b6..b2947b59 100644 --- a/pkg/kfake/data.go +++ b/pkg/kfake/data.go @@ -42,14 +42,17 @@ type ( nbytes int64 // abortedTxns - rf int8 - leader *broker + rf int8 + leader *broker + followers followers watch map[*watchFetch]struct{} createdAt time.Time } + followers []int32 + partBatch struct { kmsg.RecordBatch nbytes int @@ -68,6 +71,15 @@ type ( } ) +func (fs followers) has(b *broker) bool { + for _, f := range fs { + if f == b.node { + return true + } + } + return false +} + func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*string) { if d.tps != nil { if _, exists := d.tps[t]; exists { @@ -88,6 +100,9 @@ func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*stri } if nreplicas < 0 { nreplicas = 3 // cluster default + if nreplicas > len(d.c.bs) { + nreplicas = len(d.c.bs) + } } d.id2t[id] = t d.t2id[t] = id diff --git a/pkg/kfake/go.mod b/pkg/kfake/go.mod index 78cea4a0..e12238a7 100644 --- a/pkg/kfake/go.mod +++ b/pkg/kfake/go.mod @@ -6,6 +6,11 @@ toolchain go1.22.0 require ( github.com/twmb/franz-go v1.16.1 - github.com/twmb/franz-go/pkg/kmsg v1.8.0 + github.com/twmb/franz-go/pkg/kmsg v1.9.0 golang.org/x/crypto v0.23.0 ) + +require ( + github.com/klauspost/compress v1.17.8 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect +) diff --git a/pkg/kfake/go.sum b/pkg/kfake/go.sum index 9c3afb49..07d64e30 100644 --- a/pkg/kfake/go.sum +++ b/pkg/kfake/go.sum @@ -1,6 +1,10 @@ +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= diff --git a/pkg/kfake/issues_test.go b/pkg/kfake/issues_test.go new file mode 100644 index 00000000..1eadb9a8 --- /dev/null +++ b/pkg/kfake/issues_test.go @@ -0,0 +1,162 @@ +package kfake + +import ( + "context" + "os" + "strconv" + "testing" + "time" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestIssue885(t *testing.T) { + const ( + testTopic = "foo" + producedMessages = 5 + followerLogStart = 3 + ) + + c, err := NewCluster( + NumBrokers(2), + SleepOutOfOrder(), + SeedTopics(1, testTopic), + ) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + // Flow: + // + // * We always say one broker is the leader -- every Metadata response + // can be the same; we do not need to hijack Metadata + // + // * We produce 5 separate batches just to have some data + // + // * We hijack fetch: if to the leader, we say the other broker is the + // follower. + // + // * We hijack fetch 2: if to follower, we say "offset out of range". + // + // END SETUP STAGE. + // + // TEST + // + // * We return one batch at a time from the leader. + // * We expect the leader to receive 3 requests. + // * On the fourth, we redirect back to the follower. + // * Batch four and five are served from the follower. + // * We are done. + // * Any deviation is failure. + // + // We control the flow through the stages; any bug results in not continuing + // forward (i.e. looping through the stages and never finishing). + + // Inline anonymous function so that we can defer and cleanup within scope. + func() { + cl, err := kgo.NewClient( + kgo.DefaultProduceTopic(testTopic), + kgo.SeedBrokers(c.ListenAddrs()...), + ) + if err != nil { + t.Fatal(err) + } + defer cl.Close() + + for i := 0; i < producedMessages; i++ { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + err := cl.ProduceSync(ctx, kgo.StringRecord(strconv.Itoa(i))).FirstErr() + cancel() + if err != nil { + t.Fatal(err) + } + } + }() + + var followerOOOR bool + + ti := c.TopicInfo(testTopic) + pi := c.PartitionInfo(testTopic, 0) + follower := (pi.Leader + 1) % 2 + c.SetFollowers(testTopic, 0, []int32{follower}) + + c.ControlKey(1, func(kreq kmsg.Request) (kmsg.Response, error, bool) { + c.KeepControl() + + req := kreq.(*kmsg.FetchRequest) + if req.Version < 11 { + t.Fatal("unable to run test with fetch requests < v11") + } + + if len(req.Topics) != 1 || len(req.Topics[0].Partitions) != 1 { + t.Fatalf("unexpected malformed req topics or partitions: %v", req) + } + + // If we *do* return a batch, we want to ensure we return only + // one batch. We modify the incoming request to ensure at most + // one batch is returned. + req.MaxBytes = 1 + + resp := req.ResponseKind().(*kmsg.FetchResponse) + rt := kmsg.NewFetchResponseTopic() + rt.Topic = testTopic + rt.TopicID = ti.TopicID + rp := kmsg.NewFetchResponseTopicPartition() + + resp.Topics = append(resp.Topics, rt) + rtp := &resp.Topics[0] + + rtp.Partitions = append(rtp.Partitions, rp) + rpp := &rtp.Partitions[0] + + rpp.Partition = 0 + rpp.ErrorCode = 0 + rpp.HighWatermark = pi.HighWatermark + rpp.LastStableOffset = pi.LastStableOffset + rpp.LogStartOffset = 0 + + if c.CurrentNode() == pi.Leader { + if !followerOOOR || req.Topics[0].Partitions[0].FetchOffset >= followerLogStart { + rpp.PreferredReadReplica = (pi.Leader + 1) % 2 + return resp, nil, true + } + return nil, nil, false + } + + if req.Topics[0].Partitions[0].FetchOffset < followerLogStart { + rpp.ErrorCode = kerr.OffsetOutOfRange.Code + rpp.LogStartOffset = 2 + followerOOOR = true + return resp, nil, true + } + + return nil, nil, false + }) + + cl, err := kgo.NewClient( + kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)), + kgo.SeedBrokers(c.ListenAddrs()...), + kgo.ConsumeTopics(testTopic), + kgo.Rack("foo"), + kgo.DisableFetchSessions(), + ) + if err != nil { + t.Fatal(err) + } + defer cl.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + for consumed := 0; consumed != producedMessages; { + fs := cl.PollFetches(ctx) + if errs := fs.Errors(); errs != nil { + t.Errorf("consume error: %v", errs) + break + } + consumed += fs.NumRecords() + } +} diff --git a/pkg/kfake/topic_partition.go b/pkg/kfake/topic_partition.go index ac409c53..f7e90150 100644 --- a/pkg/kfake/topic_partition.go +++ b/pkg/kfake/topic_partition.go @@ -1,5 +1,7 @@ package kfake +import "sort" + type tps[V any] map[string]map[int32]*V func (tps *tps[V]) getp(t string, p int32) (*V, bool) { @@ -73,3 +75,100 @@ func (tps *tps[V]) delp(t string, p int32) { delete(*tps, t) } } + +// TopicInfo contains snapshot-in-time metadata about an existing topic. +type TopicInfo struct { + TopicID [16]byte // TopicID is the UUID of the topic. + NumReplicas int // NumReplicas is the replication factor for all partitions in this topic. + Configs map[string]*string // Configs contains all configuration values specified for this topic. +} + +// PartitionInfo contains snapshot-in-time metadata about an existing partition. +type PartitionInfo struct { + HighWatermark int64 // HighWatermark is the latest offset present in the partition. + LastStableOffset int64 // LastStableOffset is the last stable offset. + LogStartOffset int64 // LogStartOffsets is the first offset present in the partition. + Epoch int32 // Epoch is the current "epoch" of the partition -- how many times the partition transferred leaders. + MaxTimestamp int64 // MaxTimestamp is the current max timestamp across all batches. + NumBytes int64 // NumBytes is the current amount of data stored in the partition. + Leader int32 // Leader is the current leader of the partition. +} + +func (pd *partData) info() *PartitionInfo { + return &PartitionInfo{ + HighWatermark: pd.highWatermark, + LastStableOffset: pd.lastStableOffset, + LogStartOffset: pd.logStartOffset, + Epoch: pd.epoch, + MaxTimestamp: pd.maxTimestamp, + NumBytes: pd.nbytes, + Leader: pd.leader.node, + } +} + +// TopicInfo returns information about a topic if it exists. +func (c *Cluster) TopicInfo(topic string) *TopicInfo { + var i *TopicInfo + c.admin(func() { + id, exists := c.data.t2id[topic] + if !exists { + return + } + clone := func(m map[string]*string) map[string]*string { // a deeper maps.Clone + m2 := make(map[string]*string, len(m)) + for k, v := range m { + var v2 *string + if v != nil { + vv := *v + v2 = &vv + } + m2[k] = v2 + } + return m2 + } + i = &TopicInfo{ + TopicID: id, + NumReplicas: c.data.treplicas[topic], + Configs: clone(c.data.tcfgs[topic]), + } + }) + return i +} + +// PartitionInfo returns information about a partition if it exists. +func (c *Cluster) PartitionInfo(topic string, partition int32) *PartitionInfo { + var i *PartitionInfo + c.admin(func() { + pd, ok := c.data.tps.getp(topic, partition) + if !ok { + return + } + i = pd.info() + }) + return i +} + +// PartitionInfos returns information about all partitions in a topic, +// if it exists. The partitions are returned in sorted partition order, +// with partition 0 at index 0, partition 1 at index 1, etc. +func (c *Cluster) PartitionInfos(topic string) []*PartitionInfo { + var is []*PartitionInfo + c.admin(func() { + t, ok := c.data.tps.gett(topic) + if !ok { + return + } + partitions := make([]int32, 0, len(t)) + for p := range t { + partitions = append(partitions, p) + } + sort.Slice(partitions, func(i, j int) bool { + return partitions[i] < partitions[j] + }) + for _, p := range partitions { + pd, _ := c.data.tps.getp(topic, p) + is = append(is, pd.info()) + } + }) + return is +}