From d572e5c532d8081604f7e07a6d4bd9d765999f6a Mon Sep 17 00:00:00 2001 From: sladkoff Date: Thu, 20 Dec 2018 12:11:55 +0100 Subject: [PATCH 1/2] Add `alter topic` command --- CHANGELOG.md | 3 ++ README.md | 13 ++++++ cmd/alter/alter-topic.go | 30 ++++++++++++ cmd/alter/alter.go | 14 ++++++ cmd/root.go | 2 + cmd/validation/cmd-validation.go | 44 ++++++++++++++++++ operations/topic-operation.go | 78 ++++++++++++++++++++++++++++++++ 7 files changed, 184 insertions(+) create mode 100644 cmd/alter/alter-topic.go create mode 100644 cmd/alter/alter.go create mode 100644 cmd/validation/cmd-validation.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d2234a..61f7282 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- Add `alter topic` command for increasing partition count and editing topic configs + ### Changed - Sort result of `kafkactl get topics` - `consume` now uses a simpler consumer without consumerGroup. diff --git a/README.md b/README.md index f788fd8..b73093d 100644 --- a/README.md +++ b/README.md @@ -138,3 +138,16 @@ kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random ``` +### altering topics + +Using the `alter topic` command allows you to change the partition count and topic-level configurations of an existing topic. + +The partition count can be increased with: +```bash +kafkactl alter topic my-topic --partitions 32 +``` + +The topic configs can be edited by supplying key value pairs as follows: +```bash +kafkactl alter topic my-topic --config retention.ms=3600 --config cleanup.policy=compact +``` \ No newline at end of file diff --git a/cmd/alter/alter-topic.go b/cmd/alter/alter-topic.go new file mode 100644 index 0000000..cf7006a --- /dev/null +++ b/cmd/alter/alter-topic.go @@ -0,0 +1,30 @@ +package alter + +import ( + "github.com/deviceinsight/kafkactl/cmd/validation" + "github.com/deviceinsight/kafkactl/operations" + "github.com/spf13/cobra" +) + +var flags operations.AlterTopicFlags + +var cmdAlterTopic = &cobra.Command{ + Use: "topic", + Short: "alter a topic", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + (&operations.TopicOperation{}).AlterTopic(args[0], flags) + }, + PreRunE: func(cmd *cobra.Command, args []string) error { + return validation.ValidateAtLeastOneRequiredFlag(cmd) + }, +} + +func init() { + cmdAlterTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", flags.Partitions, "number of partitions") + cmdAlterTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`") + cmdAlterTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only") + + validation.MarkFlagAtLeastOneRequired(cmdAlterTopic.Flags(), "partitions") + validation.MarkFlagAtLeastOneRequired(cmdAlterTopic.Flags(), "config") +} diff --git a/cmd/alter/alter.go b/cmd/alter/alter.go new file mode 100644 index 0000000..73f5c5c --- /dev/null +++ b/cmd/alter/alter.go @@ -0,0 +1,14 @@ +package alter + +import ( + "github.com/spf13/cobra" +) + +var CmdAlter = &cobra.Command{ + Use: "alter", + Short: "alter topics", +} + +func init() { + CmdAlter.AddCommand(cmdAlterTopic) +} diff --git a/cmd/root.go b/cmd/root.go index d97aaad..308a3ff 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "github.com/deviceinsight/kafkactl/cmd/alter" "github.com/deviceinsight/kafkactl/cmd/config" "github.com/deviceinsight/kafkactl/cmd/consume" "github.com/deviceinsight/kafkactl/cmd/create" @@ -40,6 +41,7 @@ func init() { rootCmd.AddCommand(config.CmdConfig) rootCmd.AddCommand(consume.CmdConsume) rootCmd.AddCommand(create.CmdCreate) + rootCmd.AddCommand(alter.CmdAlter) rootCmd.AddCommand(deletion.CmdDelete) rootCmd.AddCommand(describe.CmdDescribe) rootCmd.AddCommand(get.CmdGet) diff --git a/cmd/validation/cmd-validation.go b/cmd/validation/cmd-validation.go new file mode 100644 index 0000000..5fc01dc --- /dev/null +++ b/cmd/validation/cmd-validation.go @@ -0,0 +1,44 @@ +package validation + +import ( + "errors" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "strings" +) + +const ( + BashCompAtLeastOneRequiredFlag = "cobra_annotation_bash_completion_at_least_one_required_flag" +) + +func MarkFlagAtLeastOneRequired(flags *pflag.FlagSet, name string) error { + return flags.SetAnnotation(name, BashCompAtLeastOneRequiredFlag, []string{"true"}) +} + +func ValidateAtLeastOneRequiredFlag(cmd *cobra.Command) error { + requiredError := true + atLeastRequiredFlags := []string{} + + cmd.Flags().VisitAll(func(flag *pflag.Flag) { + atLeastOneRequiredAnnotation := flag.Annotations[BashCompAtLeastOneRequiredFlag] + if len(atLeastOneRequiredAnnotation) == 0 { + return + } + + flagRequired := atLeastOneRequiredAnnotation[0] == "true" + + if flagRequired { + atLeastRequiredFlags = append(atLeastRequiredFlags, flag.Name) + } + + if flag.Changed { + requiredError = false + } + }) + + if requiredError { + return errors.New("At least one of the following flags must be set: " + strings.Join(atLeastRequiredFlags[:], ", ")) + } + + return nil +} diff --git a/operations/topic-operation.go b/operations/topic-operation.go index dfd2dae..d5d519c 100644 --- a/operations/topic-operation.go +++ b/operations/topic-operation.go @@ -40,6 +40,12 @@ type CreateTopicFlags struct { Configs []string } +type AlterTopicFlags struct { + Partitions int32 + ValidateOnly bool + Configs []string +} + type TopicOperation struct { } @@ -129,6 +135,78 @@ func (operation *TopicOperation) DescribeTopic(topic string) { output.PrintObject(t, "yaml") } +func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags) { + + context := createClientContext() + + var ( + client sarama.Client + admin sarama.ClusterAdmin + err error + exists bool + ) + + if client, err = createClient(&context); err != nil { + output.Failf("failed to create client err=%v", err) + } + + if exists, err = topicExists(&client, topic); err != nil { + output.Failf("failed to read topics err=%v", err) + } + + if !exists { + output.Failf("topic '%s' does not exist", topic) + } + + if admin, err = createClusterAdmin(&context); err != nil { + output.Failf("failed to create cluster admin: %v", err) + } + + var t, _ = readTopic(&client, &admin, topic, true, false, false, true) + + if flags.Partitions != 0 { + if len(t.Partitions) > int(flags.Partitions) { + output.Failf("Decreasing the number of partitions is not supported") + } + + var emptyAssignment [][]int32 = make([][]int32, 0, 0) + + err = admin.CreatePartitions(topic, flags.Partitions, emptyAssignment, flags.ValidateOnly) + if err != nil { + output.Failf("Could not create partitions for topic '%s': %v", topic, err) + } + } + + if len(flags.Configs) == 0 { + operation.DescribeTopic(topic) + return + } + + mergedConfigEntries := make(map[string]*string) + + for i, config := range t.Configs { + mergedConfigEntries[config.Name] = &(t.Configs[i].Value) + } + + for _, config := range flags.Configs { + configParts := strings.Split(config, "=") + + if len(configParts) == 2 { + if len(configParts[1]) == 0 { + delete(mergedConfigEntries, configParts[0]) + } else { + mergedConfigEntries[configParts[0]] = &configParts[1] + } + } + } + + if err = admin.AlterConfig(sarama.TopicResource, topic, mergedConfigEntries, flags.ValidateOnly); err != nil { + output.Failf("Could not alter topic config '%s': %v", topic, err) + } + + operation.DescribeTopic(topic) +} + func (operation *TopicOperation) GetTopics(flags GetTopicsFlags) { context := createClientContext() From 575dcdb59840212ee00e1bd95ed5146bf77ff1ae Mon Sep 17 00:00:00 2001 From: sladkoff Date: Mon, 7 Jan 2019 09:08:39 +0100 Subject: [PATCH 2/2] Fix global config-file flag The shorthand `C` was actually used as default value for the flag --- cmd/root.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/root.go b/cmd/root.go index 308a3ff..f16e3b3 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -48,7 +48,7 @@ func init() { rootCmd.AddCommand(produce.CmdProduce) // use upper-case letters for shorthand params to avoid conflicts with local flags - rootCmd.PersistentFlags().StringVar(&cfgFile, "config-file", "C", "config file (default is $HOME/.kafkactl.yml)") + rootCmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "C", "", "config file (default is $HOME/.kafkactl.yml)") rootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "V", false, "verbose output") }