-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compute start and end offsets from timestamps #147
Compute start and end offsets from timestamps #147
Conversation
…ror because it is a meaningful value for sarama (sarama.OffsetNewest)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally the PR looks good 👍
I found mainly minor issues. The only thing that I like to be added is the parsing of different timestamp formats.
README.md
Outdated
@@ -332,6 +332,18 @@ The consumer can be stopped when the latest offset is reached using `--exit` par | |||
kafkactl consume my-topic --from-beginning --exit | |||
``` | |||
|
|||
The consumer can compute the offset it starts from using a timestamp : | |||
```bash | |||
kafkactl consume my-topic --from-timestamp <timestamp-with-milliseconds> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when reading <timestamp-with-milliseconds>
it is unclear what the parameter really expects. It could be an iso timestamp (e.g. 2015-08-29T11:22:09.815Z
) or epoch milliseconds.
I would like to have it as intuitive as possible. So I would suggest to change the parameter from an int64
to a string
and parse different timestamp formats.
The parsing could be done with a small helper function like parseTimestamp
below:
var dateFormats = []string{
"2006-01-02T15:04:05.123Z",
"2006-01-02T15:04:05.123",
"2006-01-02T15:04:05Z",
"2006-01-02T15:04:05",
"2006-01-02T15:04",
"2006-01-02",
}
func parseTimestamp(timestamp string) (time.Time, error) {
if timeMs, err := strconv.ParseInt(timestamp, 10, 64); err == nil {
return time.UnixMilli(timeMs), nil
}
for _, format := range dateFormats {
if val, e := time.Parse(format, timestamp); e == nil {
return val, nil
}
}
return time.Time{}, errors.Newf("unable to parse timestamp: %s", timestamp)
}
func TestTimestamp(t *testing.T) {
var examples = []string{
"2014-04-26T17:24:37.123Z",
"2014-04-26T17:24:37.123",
"2009-08-12T22:15:09Z",
"2017-07-19T03:21:51",
"2013-04-01T22:43",
"2014-04-26",
"1384216367189",
}
for _, example := range examples {
if val, err := parseTimestamp(example); err != nil {
fmt.Printf("Error converting %s - err %v\n", example, err)
} else {
fmt.Printf("converted %s to: %v\n", example, val)
}
}
}
util/util.go
Outdated
@@ -17,3 +17,15 @@ func ContainsInt32(list []int32, element int32) bool { | |||
} | |||
return false | |||
} | |||
|
|||
func StrArrayEqual(a, b []string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func StrArrayEqual(a, b []string) bool { | |
func StringArraysEqual(a, b []string) bool { |
Co-authored-by: d-rk <[email protected]>
Co-authored-by: d-rk <[email protected]>
Co-authored-by: d-rk <[email protected]>
Co-authored-by: d-rk <[email protected]>
Co-authored-by: d-rk <[email protected]>
Co-authored-by: d-rk <[email protected]>
Co-authored-by: d-rk <[email protected]>
Thanks for this very complete review and for the proposed improvements 😃. I think they are all implemented in the last commits. |
thank you for the PR @faillefer 👍 |
Description
This PR is based on PR #126 by @nanic (Thanks for the work !)
Implements #121
I've added integration tests and updated the code so that '--exit' is not mandatory when using --from-timestamp
I've had to rework getOffsetBounds method to better manage possible options
Also I removed the usage of '-1' value when returning offset when there's an error, because '-1' is a meaningful value for kafka (OffsetNewest)
Type of change
Documentation
## [Unreleased]
section ofCHANGELOG.md
README.md
was updatedREADME.md