Skip to content

Commit

Permalink
add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Dec 7, 2020
1 parent 8e1e963 commit 6ed3371
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 20 deletions.
10 changes: 10 additions & 0 deletions cmd/alter/alter-partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package alter

import (
"github.com/deviceinsight/kafkactl/cmd/validation"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/operations/k8s"
"github.com/deviceinsight/kafkactl/operations/partitions"
"github.com/deviceinsight/kafkactl/output"
Expand Down Expand Up @@ -37,6 +38,15 @@ func newAlterPartitionCmd() *cobra.Command {
PreRunE: func(cmd *cobra.Command, args []string) error {
return validation.ValidateAtLeastOneRequiredFlag(cmd)
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) == 0 {
return operations.CompleteTopicNames(cmd, args, toComplete)
} else if len(args) == 1 {
return partitions.CompletePartitionIds(cmd, args, toComplete)
} else {
return nil, cobra.ShellCompDirectiveNoFileComp
}
},
}

cmdAlterPartition.Flags().Int32SliceVarP(&flags.Replicas, "replicas", "r", nil, "set replicas for a partition")
Expand Down
98 changes: 98 additions & 0 deletions cmd/alter/alter-partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package alter_test

import (
"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/test_util"
"gopkg.in/errgo.v2/fmt/errors"
"strings"
"testing"
"time"
)

func TestAlterPartitionAutoCompletionIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-p-complete-"

topicName1 := test_util.CreateTopic(t, prefix+"a", "--partitions", "2")
topicName2 := test_util.CreateTopic(t, prefix+"b")
topicName3 := test_util.CreateTopic(t, prefix+"c")

kafkaCtl := test_util.CreateKafkaCtlCommand()
kafkaCtl.Verbose = false

if _, err := kafkaCtl.Execute("__complete", "alter", "partition", ""); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

outputLines := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

test_util.AssertContains(t, topicName1, outputLines)
test_util.AssertContains(t, topicName2, outputLines)
test_util.AssertContains(t, topicName3, outputLines)

if _, err := kafkaCtl.Execute("__complete", "alter", "partition", topicName1, ""); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

outputLines = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

test_util.AssertContains(t, "0", outputLines)
test_util.AssertContains(t, "1", outputLines)
}

func TestAlterPartitionReplicationFactorIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-p-replicas-"

topicName := test_util.CreateTopic(t, prefix, "--partitions", "2", "--replication-factor", "3")

kafkaCtl := test_util.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("alter", "partition", topicName, "0", "--replicas", "101,102"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

checkReplicas := func(attempt uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")

if err != nil {
return err
} else {
topic, err := operations.TopicFromYaml(kafkaCtl.GetStdOut())
if err != nil {
return err
}

if len(topic.Partitions) != 2 {
return errors.Newf("expected 2 partitions, but was %d", len(topic.Partitions))
}

if len(topic.Partitions[0].Replicas) == 2 && len(topic.Partitions[1].Replicas) == 3 {
if topic.Partitions[0].Replicas[0] == 101 && topic.Partitions[0].Replicas[1] == 102 {
return nil
} else {
return errors.Newf("different brokers expected %v", topic.Partitions[0].Replicas)
}
} else {
return errors.Newf("replica count incorrect %v", topic.Partitions)
}
}
}

err := retry.Retry(
checkReplicas,
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not check Replicas for topic %s: %v", topicName, err)
}
}
142 changes: 142 additions & 0 deletions cmd/alter/alter-topic_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package alter_test

import (
"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/test_util"
"gopkg.in/errgo.v2/fmt/errors"
"strings"
"testing"
"time"
)

func TestAlterTopicAutoCompletionIntegration(t *testing.T) {
Expand All @@ -29,3 +35,139 @@ func TestAlterTopicAutoCompletionIntegration(t *testing.T) {
test_util.AssertContains(t, topicName2, outputLines)
test_util.AssertContains(t, topicName3, outputLines)
}

func TestAlterTopicPartitionsIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-t-partition-"

topicName := test_util.CreateTopic(t, prefix)

kafkaCtl := test_util.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("alter", "topic", topicName, "--partitions", "32"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

getPartitions := func(attempt uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")

if err != nil {
return err
} else {
topic, err := operations.TopicFromYaml(kafkaCtl.GetStdOut())
if err != nil {
return err
}

if len(topic.Partitions) == 32 {
return nil
} else {
return errors.Newf("only the following partitions present: %v", topic.Partitions)
}
}
}

err := retry.Retry(
getPartitions,
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not get partitions %s: %v", topicName, err)
}
}

func TestAlterTopicIncreaseReplicationFactorIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-t-ireplicas-"

topicName := test_util.CreateTopic(t, prefix, "--partitions", "32")

kafkaCtl := test_util.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("alter", "topic", topicName, "--replication-factor", "3"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

checkReplicas := func(attempt uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")

if err != nil {
return err
} else {
topic, err := operations.TopicFromYaml(kafkaCtl.GetStdOut())
if err != nil {
return err
}

for _, p := range topic.Partitions {
if len(p.Replicas) != 3 {
return errors.Newf("partition %d has %d replicas != 3", p.Id, len(p.Replicas))
}
}

return nil
}
}

err := retry.Retry(
checkReplicas,
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not check Replicas for topic %s: %v", topicName, err)
}
}

func TestAlterTopicDecreaseReplicationFactorIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-t-dreplicas-"

topicName := test_util.CreateTopic(t, prefix, "--partitions", "32", "--replication-factor", "3")

kafkaCtl := test_util.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("alter", "topic", topicName, "--replication-factor", "1"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

checkReplicas := func(attempt uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")

if err != nil {
return err
} else {
topic, err := operations.TopicFromYaml(kafkaCtl.GetStdOut())
if err != nil {
return err
}

for _, p := range topic.Partitions {
if len(p.Replicas) != 1 {
return errors.Newf("partition %d has %d replicas != 1", p.Id, len(p.Replicas))
}
}

return nil
}
}

err := retry.Retry(
checkReplicas,
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not check Replicas for topic %s: %v", topicName, err)
}
}
12 changes: 11 additions & 1 deletion generate_completion.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
#!/bin/sh

SCRIPT_PATH="$( cd "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
SCRIPT_PATH=`readlink -m $0/..`

TARGET=$1
BIN_PATH=$2

ARCH=linux_amd64

if [ "${BIN_PATH}" == "" ]; then
BIN_PATH=${SCRIPT_PATH}/kafkactl
fi

if [ "$TARGET" == "" ]; then
TARGET=$ARCH
fi

if [ "$(uname)" == "Darwin" ]; then
ARCH=darwin_amd64
fi
Expand All @@ -16,4 +24,6 @@ if [ "$ARCH" == "$TARGET" ]; then

echo "" > /tmp/empty.yaml
${BIN_PATH} completion bash > ${SCRIPT_PATH}/kafkactl-completion.bash --config-file=/tmp/empty.yaml
else
echo "not generating: $ARCH $TARGET"
fi
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/net v0.0.0-20200923182212-328152dc79b1 // indirect
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d // indirect
gopkg.in/errgo.v2 v2.1.0
gopkg.in/ini.v1 v1.61.0 // indirect
gopkg.in/yaml.v2 v2.3.0
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
Expand Down
1 change: 1 addition & 0 deletions it-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ contexts:
- localhost:39092
avro:
schemaRegistry: localhost:18081
kafkaversion: 2.5.0
current-context: default
42 changes: 42 additions & 0 deletions kafkactl-completion.bash
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,39 @@ __kafkactl_handle_word()
__kafkactl_handle_word
}

_kafkactl_alter_partition()
{
last_command="kafkactl_alter_partition"

command_aliases=()

commands=()

flags=()
two_word_flags=()
local_nonpersistent_flags=()
flags_with_completion=()
flags_completion=()

flags+=("--replicas=")
two_word_flags+=("--replicas")
two_word_flags+=("-r")
local_nonpersistent_flags+=("--replicas=")
flags+=("--validate-only")
flags+=("-v")
local_nonpersistent_flags+=("--validate-only")
flags+=("--config-file=")
two_word_flags+=("--config-file")
two_word_flags+=("-C")
flags+=("--verbose")
flags+=("-V")

must_have_one_flag=()
must_have_one_noun=()
has_completion_function=1
noun_aliases=()
}

_kafkactl_alter_topic()
{
last_command="kafkactl_alter_topic"
Expand All @@ -375,6 +408,10 @@ _kafkactl_alter_topic()
two_word_flags+=("--partitions")
two_word_flags+=("-p")
local_nonpersistent_flags+=("--partitions=")
flags+=("--replication-factor=")
two_word_flags+=("--replication-factor")
two_word_flags+=("-r")
local_nonpersistent_flags+=("--replication-factor=")
flags+=("--validate-only")
flags+=("-v")
local_nonpersistent_flags+=("--validate-only")
Expand All @@ -397,6 +434,7 @@ _kafkactl_alter()
command_aliases=()

commands=()
commands+=("partition")
commands+=("topic")

flags=()
Expand Down Expand Up @@ -1275,6 +1313,10 @@ _kafkactl_root_command()

commands=()
commands+=("alter")
if [[ -z "${BASH_VERSION}" || "${BASH_VERSINFO[0]}" -gt 3 ]]; then
command_aliases+=("edit")
aliashash["edit"]="alter"
fi
commands+=("attach")
commands+=("completion")
commands+=("config")
Expand Down
Loading

0 comments on commit 6ed3371

Please sign in to comment.