Skip to content

Compute start and end offsets from timestamps #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
225d5c0
Add integration for --from-timestamp and --to-timestamp parameters
faillefer Feb 25, 2023
009d23c
rename --end-timestamp to --to-timestamp (for consistency with --from…
faillefer Feb 26, 2023
5681153
Clarify offsets bounds computation
faillefer Feb 26, 2023
d8062f0
Improve variables naming. Do not use -1 as offset when there is an er…
faillefer Feb 26, 2023
7ea4c68
Rename EndTimestamp to ToTimestamp
faillefer Feb 26, 2023
48e2c33
Rename EndTimestamp to ToTimestamp
faillefer Feb 26, 2023
6f46f16
Add unit tests for --from-timestamp and --end-timestamp parameters
faillefer Feb 26, 2023
f834a37
Changes for consume op based on ts
May 11, 2022
893fe01
Changes for consume op based on ts
May 11, 2022
8370eef
Amend README with usage details for the change
May 11, 2022
4f6ecc5
Add usage examples for --from-timestamp and --to-timestamp parameters
faillefer Feb 25, 2023
14ec8fb
Fix linting issues. Clean '.gitignore'. Update CHANGELOG.md
faillefer Feb 27, 2023
eeda909
Fix indent-error-flow error
faillefer Feb 27, 2023
3a23b99
Fix README
faillefer Feb 27, 2023
971140f
Update CHANGELOG.md
faillefer Mar 2, 2023
cbfb30c
Update README.md / Fix typo
faillefer Mar 2, 2023
2328ebd
Use sarama.OffsetNewest instead of -1
faillefer Mar 2, 2023
8768512
Use ErrOffset constant
faillefer Mar 2, 2023
dc6a694
Use ErrOffset constant
faillefer Mar 2, 2023
ccd27d3
Update README.md / Fix typo
faillefer Mar 2, 2023
8d682e1
Update README.md / Fix typo
faillefer Mar 2, 2023
de1a0d1
Allow formatted date in --from-timestamp and --to-timestamp
faillefer Mar 2, 2023
ab6c8e0
Fix linting issues
faillefer Mar 2, 2023
33c36a4
update readme
d-rk Mar 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- [#121](https://github.com/deviceinsight/kafkactl/issues/121) Support for from-timestamp message consumption


## 3.0.3 - 2023-02-01
### Changed
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,18 @@ The consumer can be stopped when the latest offset is reached using `--exit` par
kafkactl consume my-topic --from-beginning --exit
```

The consumer can compute the offset it starts from using a timestamp :
```bash
kafkactl consume my-topic --from-timestamp <timestamp-with-milliseconds>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when reading <timestamp-with-milliseconds> it is unclear what the parameter really expects. It could be an iso timestamp (e.g. 2015-08-29T11:22:09.815Z) or epoch milliseconds.

I would like to have it as intuitive as possible. So I would suggest to change the parameter from an int64 to a string and parse different timestamp formats.
The parsing could be done with a small helper function like parseTimestamp below:

var dateFormats = []string{
	"2006-01-02T15:04:05.123Z",
	"2006-01-02T15:04:05.123",
	"2006-01-02T15:04:05Z",
	"2006-01-02T15:04:05",
	"2006-01-02T15:04",
	"2006-01-02",
}

func parseTimestamp(timestamp string) (time.Time, error) {

	if timeMs, err := strconv.ParseInt(timestamp, 10, 64); err == nil {
		return time.UnixMilli(timeMs), nil
	}

	for _, format := range dateFormats {
		if val, e := time.Parse(format, timestamp); e == nil {
			return val, nil
		}
	}
	return time.Time{}, errors.Newf("unable to parse timestamp: %s", timestamp)
}

func TestTimestamp(t *testing.T) {

	var examples = []string{
		"2014-04-26T17:24:37.123Z",
		"2014-04-26T17:24:37.123",
		"2009-08-12T22:15:09Z",
		"2017-07-19T03:21:51",
		"2013-04-01T22:43",
		"2014-04-26",
		"1384216367189",
	}

	for _, example := range examples {

		if val, err := parseTimestamp(example); err != nil {
			fmt.Printf("Error converting %s - err %v\n", example, err)
		} else {
			fmt.Printf("converted %s to: %v\n", example, val)
		}
	}
}

```
**NOTE:** `--from-timestamp` is not designed to schedule the beginning of consumer's consumption. The offset corresponding to the timestamp is computed at the begininng of the process. So if you set it to a date in the future, the consumer will start from the latest offset.

The consumer can be stopped when the offset corresponding to a particuliar timestamp is reached :
```bash
kafkactl consume my-topic --from-timestamp <timestamp-with-milliseconds> --to-timestamp <timestamp-with-milliseconds>
```
**NOTE:** `--to-timestamp` is not designed to schedule the end of consumer's consumption. The offset corresponding to the timestamp is computed at the begininng of the process. So if you set it to a date in the future, the consumer will stop to the current latest offset.

The following example prints keys in hex and values in base64:
```bash
kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64
Expand Down
2 changes: 2 additions & 0 deletions cmd/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func NewConsumeCmd() *cobra.Command {
cmdConsume.Flags().BoolVarP(&flags.PrintAvroSchema, "print-schema", "a", false, "print details about avro schema used for decoding")
cmdConsume.Flags().BoolVarP(&flags.PrintHeaders, "print-headers", "", false, "print message headers")
cmdConsume.Flags().IntVarP(&flags.Tail, "tail", "", -1, "show only the last n messages on the topic")
cmdConsume.Flags().Int64VarP(&flags.FromTimestamp, "from-timestamp", "", -1, "consume data from offset of given timestamp")
cmdConsume.Flags().Int64VarP(&flags.ToTimestamp, "to-timestamp", "", -1, "consume data till offset of given timestamp")
cmdConsume.Flags().Int64VarP(&flags.MaxMessages, "max-messages", "", -1, "stop consuming after n messages have been read")
cmdConsume.Flags().BoolVarP(&flags.Exit, "exit", "e", flags.Exit, "stop consuming when latest offset is reached")
cmdConsume.Flags().IntSliceVarP(&flags.Partitions, "partitions", "p", flags.Partitions, "partitions to consume. The default is to consume from all partitions.")
Expand Down
91 changes: 90 additions & 1 deletion cmd/consume/consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consume_test
import (
"encoding/hex"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -85,14 +86,102 @@ func TestConsumeTailIntegration(t *testing.T) {
messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

if len(messages) != 3 {
t.Fatalf("expected 3 messages")
t.Fatalf("expected 3 messages, got %d", len(messages))
}

testutil.AssertEquals(t, "test-key-2#test-value-2a", messages[0])
testutil.AssertEquals(t, "test-key-3#test-value-3", messages[1])
testutil.AssertEquals(t, "test-key-2#test-value-2b", messages[2])
}

func TestConsumeFromTimestamp(t *testing.T) {

testutil.StartIntegrationTest(t)

topicName := testutil.CreateTopic(t, "consume-topic", "--partitions", "2")

testutil.ProduceMessageOnPartition(t, topicName, "key-1", "a", 0, 0)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "b", 0, 1)

time.Sleep(1 * time.Millisecond) // need to have messaged produced at different milliseconds to have reproducible test
t1 := time.Now().UnixMilli()

testutil.ProduceMessageOnPartition(t, topicName, "key-2", "c", 1, 0)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "d", 1, 1)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "e", 0, 2)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "f", 1, 2)

time.Sleep(1 * time.Millisecond)
t2 := time.Now().UnixMilli()

testutil.ProduceMessageOnPartition(t, topicName, "key-2", "g", 1, 3)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "h", 0, 3)

//test --from-timestamp with --to-timestamp
kafkaCtl := testutil.CreateKafkaCtlCommand()
if _, err := kafkaCtl.Execute("consume", topicName, "--from-timestamp", strconv.FormatInt(t1, 10), "--to-timestamp", strconv.FormatInt(t2, 10)); err != nil {
t.Fatalf("failed to execute command: %v", err)
}
messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"c", "d", "e", "f"}, messages)

//test --from-timestamp with --max-messages (--partitions present for reproducibility)
kafkaCtl = testutil.CreateKafkaCtlCommand()
if _, err := kafkaCtl.Execute("consume", topicName, "--from-timestamp", strconv.FormatInt(t1, 10), "--max-messages", strconv.Itoa(2), "--partitions", strconv.Itoa(1)); err != nil {
t.Fatalf("failed to execute command: %v", err)
}
messages = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"c", "d"}, messages)

//test --from-timestamp with --exit
kafkaCtl = testutil.CreateKafkaCtlCommand()
if _, err := kafkaCtl.Execute("consume", topicName, "--from-timestamp", strconv.FormatInt(t2, 10), "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}
messages = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"g", "h"}, messages)
}

func TestConsumeToTimestamp(t *testing.T) {

testutil.StartIntegrationTest(t)

topicName := testutil.CreateTopic(t, "consume-topic", "--partitions", "2")

testutil.ProduceMessageOnPartition(t, topicName, "key-1", "a", 0, 0)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "b", 0, 1)

time.Sleep(1 * time.Millisecond) // need to have messages produced at different milliseconds to have reproducible test
t1 := time.Now().UnixMilli()

testutil.ProduceMessageOnPartition(t, topicName, "key-2", "c", 1, 0)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "d", 1, 1)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "e", 0, 2)
testutil.ProduceMessageOnPartition(t, topicName, "key-2", "f", 1, 2)

time.Sleep(1 * time.Millisecond)
t2 := time.Now().UnixMilli()

testutil.ProduceMessageOnPartition(t, topicName, "key-2", "g", 1, 3)
testutil.ProduceMessageOnPartition(t, topicName, "key-1", "h", 0, 3)

//test --from-beginning with --to-timestamp
kafkaCtl := testutil.CreateKafkaCtlCommand()
if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--to-timestamp", strconv.FormatInt(t1, 10)); err != nil {
t.Fatalf("failed to execute command: %v", err)
}
messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"a", "b"}, messages)

//test --to-timestamp with --tail
kafkaCtl = testutil.CreateKafkaCtlCommand()
if _, err := kafkaCtl.Execute("consume", topicName, "--to-timestamp", strconv.FormatInt(t2, 10), "--tail", strconv.Itoa(4)); err != nil {
t.Fatalf("failed to execute command: %v", err)
}
messages = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"c", "d", "e", "f"}, messages)
}

func TestConsumeWithKeyAndValueAsBase64Integration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand Down
109 changes: 66 additions & 43 deletions internal/consume/PartitionConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consume

import (
"context"
"math"
"strconv"
"strings"
"time"
Expand All @@ -12,6 +13,10 @@ import (
"golang.org/x/sync/errgroup"
)

const (
ErrOffset = math.MinInt64
)

type PartitionConsumer struct {
topic string
partitions []int32
Expand Down Expand Up @@ -69,7 +74,7 @@ func (c *PartitionConsumer) Start(ctx context.Context, flags Flags, messages cha
return errors.Errorf("Failed to start consumer for partition %d: %s", partitionID, err)
}

if lastOffset == -1 && (flags.Exit || flags.Tail > 0) {
if lastOffset == -1 && (flags.Exit || flags.Tail > 0 || flags.ToTimestamp > -1) {
output.Debugf("Skipping empty partition %d", partitionID)
return nil
} else if lastOffset == -1 || initialOffset <= lastOffset {
Expand Down Expand Up @@ -133,46 +138,70 @@ func (c *PartitionConsumer) Close() error {
}

func getOffsetBounds(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, int64, error) {

if flags.Exit && len(flags.Offsets) == 0 && !flags.FromBeginning {
return -1, -1, errors.Errorf("parameter --exit has to be used in combination with --from-beginning or --offset")
} else if flags.Tail > 0 && len(flags.Offsets) > 0 {
return -1, -1, errors.Errorf("parameters --offset and --tail cannot be used together")
} else if flags.Tail > 0 {

newestOffset, oldestOffset, err := getBoundaryOffsets(client, topic, currentPartition)
if err != nil {
return -1, -1, err
}

minOffset := newestOffset - int64(flags.Tail)
maxOffset := newestOffset - 1
if minOffset < oldestOffset {
minOffset = oldestOffset
var startOffset int64
var endOffset int64
var err error
if startOffset, err = getStartOffset(client, topic, flags, currentPartition); err != nil {
return ErrOffset, ErrOffset, err
}
if endOffset, err = getEndOffset(client, topic, flags, currentPartition); err != nil {
return ErrOffset, ErrOffset, err
} else if startOffset == endOffset {
endOffset = -1 //nothing to consume on this partition
} else if endOffset != sarama.OffsetNewest {
endOffset = endOffset - 1
}
if flags.Tail > 0 && startOffset == sarama.OffsetNewest {
//When --tail is used compute startOffset so that it minimizes the number of messages consumed
if endOffset-int64(flags.Tail) > 0 {
startOffset = endOffset - int64(flags.Tail)
} else {
startOffset = sarama.OffsetOldest
}
return minOffset, maxOffset, nil
}
output.Debugf("consumer will consume offset %d to %d on partition %d", startOffset, endOffset, currentPartition)
return startOffset, endOffset, nil
}

lastOffset := int64(-1)
oldestOffset := sarama.OffsetOldest
func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) {
if hasExclusiveConditions(flags.FromTimestamp > -1, flags.FromBeginning, len(flags.Offsets) > 0) {
return -1, errors.Errorf("parameters '--from-timestamp', '--offset' and '--from-beginning' are exclusive")
}
if flags.FromTimestamp != -1 {
return (*client).GetOffset(topic, currentPartition, flags.FromTimestamp)
} else if flags.FromBeginning {
return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
} else if len(flags.Offsets) > 0 {
return extractOffsetForPartition(flags, currentPartition)
} else {
return sarama.OffsetNewest, nil
}
}

if flags.Exit {
newestOffset, oldestOff, err := getBoundaryOffsets(client, topic, currentPartition)
if err != nil {
return -1, -1, err
func getEndOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) {
if flags.ToTimestamp > -1 {
return (*client).GetOffset(topic, currentPartition, flags.ToTimestamp)
} else if flags.Exit || flags.Tail > 0 {
var newestOffset int64
var err error
if newestOffset, err = (*client).GetOffset(topic, currentPartition, sarama.OffsetNewest); err != nil {
return ErrOffset, err
}
lastOffset = newestOffset - 1
oldestOffset = oldestOff
return newestOffset, nil
} else {
return sarama.OffsetNewest, nil
}
}

func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) {
for _, offsetFlag := range flags.Offsets {
offsetParts := strings.Split(offsetFlag, "=")

if len(offsetParts) == 2 {

partition, err := strconv.Atoi(offsetParts[0])
if err != nil {
return -1, -1, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
return -1, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

if int32(partition) != currentPartition {
Expand All @@ -181,27 +210,21 @@ func getOffsetBounds(client *sarama.Client, topic string, flags Flags, currentPa

offset, err := strconv.ParseInt(offsetParts[1], 10, 64)
if err != nil {
return -1, -1, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
return ErrOffset, errors.Errorf("unable to parse offset parameter: %s (%v)", offsetFlag, err)
}

return offset, lastOffset, nil
return offset, nil
}
}

if flags.FromBeginning {
return oldestOffset, lastOffset, nil
}
return sarama.OffsetNewest, -1, nil
return ErrOffset, errors.Errorf("unable to find offset parameter for partition %d: %s", currentPartition, flags.Offsets)
}

func getBoundaryOffsets(client *sarama.Client, topic string, partition int32) (newestOffset int64, oldestOffset int64, err error) {

if newestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetNewest); err != nil {
return -1, -1, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err)
}

if oldestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetOldest); err != nil {
return -1, -1, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err)
func hasExclusiveConditions(flags ...bool) bool {
value := 0
for _, flag := range flags {
if flag {
value++
}
}
return newestOffset, oldestOffset, nil
return value > 1
}
2 changes: 2 additions & 0 deletions internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Flags struct {
Partitions []int
Offsets []string
FromBeginning bool
FromTimestamp int64
ToTimestamp int64
Tail int
Exit bool
MaxMessages int64
Expand Down
12 changes: 12 additions & 0 deletions testutil/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -123,6 +124,17 @@ func ProduceMessage(t *testing.T, topic, key, value string, expectedPartition, e
AssertEquals(t, fmt.Sprintf("message produced (partition=%d\toffset=%d)", expectedPartition, expectedOffset), kafkaCtl.GetStdOut())
}

func ProduceMessageOnPartition(t *testing.T, topic, key, value string, partition int32, expectedOffset int64) {

kafkaCtl := CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topic, "--key", key, "--value", value, "--partition", strconv.FormatInt(int64(partition), 10)); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

AssertEquals(t, fmt.Sprintf("message produced (partition=%d\toffset=%d)", partition, expectedOffset), kafkaCtl.GetStdOut())
}

func VerifyGroupExists(t *testing.T, group string) {

kafkaCtl := CreateKafkaCtlCommand()
Expand Down
10 changes: 10 additions & 0 deletions testutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -174,6 +175,15 @@ func AssertEquals(t *testing.T, expected, actual string) {
}
}

func AssertArraysEquals(t *testing.T, expected, actual []string) {
sort.Strings(expected)
sort.Strings(actual)

if !util.StrArrayEqual(actual, expected) {
t.Fatalf("unexpected values:\nexpected:\n--\n%s\n--\nactual:\n--\n%s\n--", expected, actual)
}
}

func AssertErrorContainsOneOf(t *testing.T, expected []string, err error) {

if err == nil {
Expand Down
12 changes: 12 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,15 @@ func ContainsInt32(list []int32, element int32) bool {
}
return false
}

func StrArrayEqual(a, b []string) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func StrArrayEqual(a, b []string) bool {
func StringArraysEqual(a, b []string) bool {

if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}