Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/grillkafka/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func (gk *Kafka) AssertCount(topicName string, expectedCount int) grill.Assertion {
return grill.AssertionFunc(func() error {
group := fmt.Sprintf("%s_%d_%s", "oh_my_test_helper", rand.Intn(1000), time.Now())
consumer, err := gk.NewConsumer(group, topicName, time.Second*5)
consumer, err := gk.NewConsumer(group, topicName, time.Second)
if err != nil {
return err
}
Expand All @@ -33,7 +33,7 @@ func (gk *Kafka) AssertCount(topicName string, expectedCount int) grill.Assertio
func (gk *Kafka) AssertMessageCount(topic string, message Message, expectedCount int) grill.Assertion {
return grill.AssertionFunc(func() error {
group := fmt.Sprintf("%s_%d_%s", "oh_my_test_helper", rand.Intn(1000), time.Now())
consumer, err := gk.NewConsumer(group, topic, time.Second*5)
consumer, err := gk.NewConsumer(group, topic, time.Second)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/grillkafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_GrillKafka(t *testing.T) {
},
Assertions: []grill.Assertion{
grill.AssertOutput(true),
grill.Try(time.Second*30, 3, helper.AssertCount("test_topic", 1)),
grill.Try(time.Second*30, 3, helper.AssertCount("test_topic", 1), grill.WithCheckFrequency(1*time.Second)),
grill.Try(time.Second*30, 3, helper.AssertMessageCount("test_topic_error", testMessage, 1)),
},
Cleaners: []grill.Cleaner{
Expand Down
34 changes: 25 additions & 9 deletions try.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,38 @@ import (
"time"
)

func Try(deadline time.Duration, minSuccess int, assertion Assertion) Assertion {
return &tryAssertion{
assertion: assertion,
deadline: deadline,
minSuccess: minSuccess,
type TryOption func(*tryAssertion)

// WithCheckFrequency sets a custom frequency for assertion checks.
// If not set, frequency is derived from deadline and minSuccess.
func WithCheckFrequency(frequency time.Duration) TryOption {
return func(t *tryAssertion) {
t.checkFrequency = frequency
}
}

func Try(deadline time.Duration, minSuccess int, assertion Assertion, options ...TryOption) Assertion {
t := &tryAssertion{
assertion: assertion,
deadline: deadline,
minSuccess: minSuccess,
checkFrequency: deadline / time.Duration(minSuccess*3+3),
}
for _, opt := range options {
opt(t)
}
return t
}

type tryAssertion struct {
assertion Assertion
deadline time.Duration
minSuccess int
assertion Assertion
deadline time.Duration
minSuccess int
checkFrequency time.Duration
}

func (assert *tryAssertion) Assert() error {
checkC := time.Tick(assert.deadline / time.Duration(assert.minSuccess*3+3))
checkC := time.Tick(assert.checkFrequency)
quitC := time.Tick(assert.deadline)
var successCount = 0
var errors []string
Expand Down