Skip to content

Commit

Permalink
fix parsing array params in kubernetes mode
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Sep 25, 2023
1 parent 15d5fe3 commit 2dbefb8
Show file tree
Hide file tree
Showing 33 changed files with 229 additions and 76 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Fixed
- [#165](https://github.com/deviceinsight/kafkactl/issues/165) Kubernetes pod name now always includes random suffix
- [#165](https://github.com/deviceinsight/kafkactl/issues/165) Kubernetes pod name now always includes random suffix
- [#165](https://github.com/deviceinsight/kafkactl/issues/158) fix parsing array params in kubernetes mode

## 3.3.0 - 2023-09-05
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cmd/alter/alter-partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func newAlterPartitionCmd() *cobra.Command {
Short: "alter a partition",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {

var partitionID int32

Expand Down
2 changes: 1 addition & 1 deletion cmd/alter/alter-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func newAlterTopicCmd() *cobra.Command {
Short: "alter a topic",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).AlterTopic(args[0], flags); err != nil {
output.Fail(err)
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/alter/alter-topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,48 @@ func TestAlterTopicDecreaseReplicationFactorIntegration(t *testing.T) {
t.Fatalf("could not check Replicas for topic %s: %v", topicName, err)
}
}

func TestAlterTopicConfigK8sIntegration(t *testing.T) {

testutil.StartIntegrationTestWithContext(t, "k8s-mock")

kafkaCtl := testutil.CreateKafkaCtlCommand()

type testCases struct {
description string
args []string
wantInKubectlCmd []string
}

for _, test := range []testCases{
{
description: "single_config_defined_with_space",
args: []string{"alter", "topic", "fake-topic", "--config", "retention.ms=86400000"},
wantInKubectlCmd: []string{"--config=retention.ms=86400000"},
},
{
description: "single_config_defined_with_equal",
args: []string{"alter", "topic", "fake-topic", "--config=retention.ms=86400000"},
wantInKubectlCmd: []string{"--config=retention.ms=86400000"},
},
{
description: "multiple_configs",
args: []string{"alter", "topic", "fake-topic", "--config", "retention.ms=86400000",
"--config", "cleanup.policy=compact"},
wantInKubectlCmd: []string{"--config=retention.ms=86400000", "--config=cleanup.policy=compact"},
},
} {
t.Run(test.description, func(t *testing.T) {

if _, err := kafkaCtl.Execute(test.args...); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

output := kafkaCtl.GetStdOut()

for _, wanted := range test.wantInKubectlCmd {
testutil.AssertContainSubstring(t, wanted, output)
}
})
}
}
2 changes: 1 addition & 1 deletion cmd/attach/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func NewAttachCmd() *cobra.Command {
Short: "run kafkactl pod in kubernetes and attach to it",
Args: cobra.NoArgs,
Run: func(cobraCmd *cobra.Command, args []string) {
if err := (&k8s.Operation{}).Attach(); err != nil {
if err := k8s.NewOperation().Attach(); err != nil {
output.Fail(err)
}
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/clone/clone-consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newCloneConsumerGroupCmd() *cobra.Command {
Short: "clone existing consumerGroup with all offsets",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroupoffsets.ConsumerGroupOffsetOperation{}).CloneConsumerGroup(args[0], args[1]); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/clone/clone-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func newCloneTopicCmd() *cobra.Command {
Short: "clone existing topic (number of partitions, replication factor, config entries) to new one",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).CloneTopic(args[0], args[1]); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/config/useContext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestUseContextAutoCompletionIntegration(t *testing.T) {

outputLines := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

expectedContexts := []string{"default", "no-avro", "sasl-admin", "sasl-user"}
expectedContexts := []string{"default", "k8s-mock", "no-avro", "sasl-admin", "sasl-user"}

if len(outputLines) != len(expectedContexts)+1 {
t.Fatalf("unexpected output. expected %d lines got %d: %s", len(expectedContexts)+1, len(outputLines), kafkaCtl.GetStdOut())
Expand Down
2 changes: 1 addition & 1 deletion cmd/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewConsumeCmd() *cobra.Command {
Short: "consume messages from a topic",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consume.Operation{}).Consume(args[0], flags); err != nil {
output.Fail(err)
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/consume/consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,47 @@ func TestConsumeGroupCompletionIntegration(t *testing.T) {
testutil.AssertContains(t, group2, outputLines)
testutil.AssertContains(t, group3, outputLines)
}

func TestConsumePartitionsK8sIntegration(t *testing.T) {

testutil.StartIntegrationTestWithContext(t, "k8s-mock")

kafkaCtl := testutil.CreateKafkaCtlCommand()

type testCases struct {
description string
args []string
wantInKubectlCmd []string
}

for _, test := range []testCases{
{
description: "single_partition_defined_with_space",
args: []string{"consume", "fake-topic", "--partitions", "5"},
wantInKubectlCmd: []string{"--partitions=5"},
},
{
description: "single_partition_defined_with_equal",
args: []string{"consume", "fake-topic", "--partitions=5"},
wantInKubectlCmd: []string{"--partitions=5"},
},
{
description: "multiple_partitions",
args: []string{"consume", "fake-topic", "--partitions", "5", "--partitions", "6"},
wantInKubectlCmd: []string{"--partitions=5", "--partitions=6"},
},
} {
t.Run(test.description, func(t *testing.T) {

if _, err := kafkaCtl.Execute(test.args...); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

output := kafkaCtl.GetStdOut()

for _, wanted := range test.wantInKubectlCmd {
testutil.AssertContainSubstring(t, wanted, output)
}
})
}
}
2 changes: 1 addition & 1 deletion cmd/create/create-acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func newCreateACLCmd() *cobra.Command {
Short: "create an acl",
Args: cobra.MaximumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&acl.Operation{}).CreateACL(flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/create/create-consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newCreateConsumerGroupCmd() *cobra.Command {
Short: "create a consumerGroup",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroupoffsets.ConsumerGroupOffsetOperation{}).CreateConsumerGroup(cgFlags, args[0]); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/create/create-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newCreateTopicCmd() *cobra.Command {
Short: "create a topic",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).CreateTopics(args, flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/deletion/delete-acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func newDeleteACLCmd() *cobra.Command {
Short: "delete an acl",
Args: cobra.MaximumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&acl.Operation{}).DeleteACL(flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/deletion/delete-consumer-group-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newDeleteConsumerGroupOffsetCmd() *cobra.Command {
Short: "delete a consumer-group-offset",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroupoffsets.ConsumerGroupOffsetOperation{}).DeleteConsumerGroupOffset(args[0], offsetFlags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/deletion/delete-consumer-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newDeleteConsumerGroupCmd() *cobra.Command {
Short: "delete a consumer-group",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroups.ConsumerGroupOperation{}).DeleteConsumerGroups(args); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/deletion/delete-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func newDeleteTopicCmd() *cobra.Command {
Short: "delete a topic",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).DeleteTopics(args); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/describe/describe-broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newDescribeBrokerCmd() *cobra.Command {
Short: "describe a broker",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {

id, err := strconv.ParseInt(args[0], 10, 32)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/describe/describe-consumer-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newDescribeConsumerGroupCmd() *cobra.Command {
Short: "describe a consumerGroup",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroups.ConsumerGroupOperation{}).DescribeConsumerGroup(flags, args[0]); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/describe/describe-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newDescribeTopicCmd() *cobra.Command {
Short: "describe a topic",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).DescribeTopic(args[0], flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/get/get-acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func newGetACLCmd() *cobra.Command {
Short: "list available acls",
Args: cobra.MaximumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&acl.Operation{}).GetACL(flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/get/get-brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newGetBrokersCmd() *cobra.Command {
Use: "brokers",
Short: "list brokers",
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&broker.Operation{}).GetBrokers(flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/get/get-consumer-groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newGetConsumerGroupsCmd() *cobra.Command {
Short: "list available consumerGroups",
Args: cobra.MaximumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroups.ConsumerGroupOperation{}).GetConsumerGroups(flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/get/get-topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newGetTopicsCmd() *cobra.Command {
Use: "topics",
Short: "list available topics",
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&topic.Operation{}).GetTopics(flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/produce/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewProduceCmd() *cobra.Command {
Short: "produce messages to a topic",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&producer.Operation{}).Produce(args[0], flags); err != nil {
output.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/reset/reset-consumer-group-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newResetOffsetCmd() *cobra.Command {
Short: "reset a consumer group offset",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.Operation{}).TryRun(cmd, args) {
if !k8s.NewOperation().TryRun(cmd, args) {
if err := (&consumergroupoffsets.ConsumerGroupOffsetOperation{}).ResetConsumerGroupOffset(offsetFlags, args[0]); err != nil {
output.Fail(err)
}
Expand Down
10 changes: 10 additions & 0 deletions docker/kubectl-mock.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash

if [ "$1" = "version" ]; then
# mock version command
echo '{"clientVersion": {"major": "9", "minor": "8", "gitVersion": "v9.8.7"}}'
exit 0
fi

# just print the actual command that was executed
echo "kubectl $@"
6 changes: 3 additions & 3 deletions internal/k8s/executer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestExecWithImageAndImagePullSecretProvided(t *testing.T) {
var testRunner = TestRunner{}
var runner k8s.Runner = &testRunner

exec := k8s.NewExecutor(context, &runner)
exec := k8s.NewExecutor(context, runner)

err := exec.Run("scratch", "/kafkactl", []string{"version"}, []string{"ENV_A=1"})
if err != nil {
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestExecWithImageAndTagFails(t *testing.T) {
var testRunner = TestRunner{}
var runner k8s.Runner = &testRunner

exec := k8s.NewExecutor(context, &runner)
exec := k8s.NewExecutor(context, runner)

err := exec.Run("scratch", "/kafkactl", []string{"version"}, []string{"ENV_A=1"})
testutil.AssertErrorContains(t, "image must not contain a tag", err)
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestParseKubectlVersion(t *testing.T) {

testRunner.response = []byte(test.kubectlOutput)

version := k8s.GetKubectlVersion("kubectl", &runner)
version := k8s.GetKubectlVersion("kubectl", runner)

if test.wantErr != "" {
if err == nil {
Expand Down
11 changes: 6 additions & 5 deletions internal/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type executor struct {
image string
imagePullSecret string
version Version
runner *Runner
runner Runner
clientID string
kubeConfig string
kubeContext string
Expand All @@ -46,8 +46,9 @@ func randomString(n int) string {
return string(b)
}

func getKubectlVersion(kubectlBinary string, runner *Runner) Version {
bytes, err := (*runner).ExecuteAndReturn(kubectlBinary, []string{"version", "--client", "-o", "json"})
func getKubectlVersion(kubectlBinary string, runner Runner) Version {

bytes, err := runner.ExecuteAndReturn(kubectlBinary, []string{"version", "--client", "-o", "json"})
if err != nil {
output.Fail(err)
return Version{}
Expand Down Expand Up @@ -89,7 +90,7 @@ func getKubectlVersion(kubectlBinary string, runner *Runner) Version {
}
}

func newExecutor(context internal.ClientContext, runner *Runner) *executor {
func newExecutor(context internal.ClientContext, runner Runner) *executor {
return &executor{
kubectlBinary: context.Kubernetes.Binary,
version: getKubectlVersion(context.Kubernetes.Binary, runner),
Expand Down Expand Up @@ -192,7 +193,7 @@ func (kubectl *executor) exec(args []string) error {
cmd := fmt.Sprintf("exec: %s %s", kubectl.kubectlBinary, join(args))
output.Debugf("kubectl version: %s", kubectl.version.GitVersion)
output.Debugf(cmd)
err := (*kubectl.runner).Execute(kubectl.kubectlBinary, args)
err := kubectl.runner.Execute(kubectl.kubectlBinary, args)
return err
}

Expand Down
Loading

0 comments on commit 2dbefb8

Please sign in to comment.