Skip to content

Commit

Permalink
Merge pull request deviceinsight#7 from sladkoff/feature/alter-topics
Browse files Browse the repository at this point in the history
Add `alter topic` command
  • Loading branch information
d-rk authored Jan 7, 2019
2 parents 916f0e1 + 575dcdb commit 0c80319
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Add `alter topic` command for increasing partition count and editing topic configs

### Changed
- Sort result of `kafkactl get topics`
- `consume` now uses a simpler consumer without consumerGroup.
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,16 @@ kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
```

### altering topics

Using the `alter topic` command allows you to change the partition count and topic-level configurations of an existing topic.

The partition count can be increased with:
```bash
kafkactl alter topic my-topic --partitions 32
```

The topic configs can be edited by supplying key value pairs as follows:
```bash
kafkactl alter topic my-topic --config retention.ms=3600 --config cleanup.policy=compact
```
30 changes: 30 additions & 0 deletions cmd/alter/alter-topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package alter

import (
"github.com/deviceinsight/kafkactl/cmd/validation"
"github.com/deviceinsight/kafkactl/operations"
"github.com/spf13/cobra"
)

var flags operations.AlterTopicFlags

var cmdAlterTopic = &cobra.Command{
Use: "topic",
Short: "alter a topic",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
(&operations.TopicOperation{}).AlterTopic(args[0], flags)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return validation.ValidateAtLeastOneRequiredFlag(cmd)
},
}

func init() {
cmdAlterTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", flags.Partitions, "number of partitions")
cmdAlterTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`")
cmdAlterTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")

validation.MarkFlagAtLeastOneRequired(cmdAlterTopic.Flags(), "partitions")
validation.MarkFlagAtLeastOneRequired(cmdAlterTopic.Flags(), "config")
}
14 changes: 14 additions & 0 deletions cmd/alter/alter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package alter

import (
"github.com/spf13/cobra"
)

var CmdAlter = &cobra.Command{
Use: "alter",
Short: "alter topics",
}

func init() {
CmdAlter.AddCommand(cmdAlterTopic)
}
4 changes: 3 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"
"github.com/deviceinsight/kafkactl/cmd/alter"
"github.com/deviceinsight/kafkactl/cmd/config"
"github.com/deviceinsight/kafkactl/cmd/consume"
"github.com/deviceinsight/kafkactl/cmd/create"
Expand Down Expand Up @@ -40,13 +41,14 @@ func init() {
rootCmd.AddCommand(config.CmdConfig)
rootCmd.AddCommand(consume.CmdConsume)
rootCmd.AddCommand(create.CmdCreate)
rootCmd.AddCommand(alter.CmdAlter)
rootCmd.AddCommand(deletion.CmdDelete)
rootCmd.AddCommand(describe.CmdDescribe)
rootCmd.AddCommand(get.CmdGet)
rootCmd.AddCommand(produce.CmdProduce)

// use upper-case letters for shorthand params to avoid conflicts with local flags
rootCmd.PersistentFlags().StringVar(&cfgFile, "config-file", "C", "config file (default is $HOME/.kafkactl.yml)")
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "C", "", "config file (default is $HOME/.kafkactl.yml)")
rootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "V", false, "verbose output")
}

Expand Down
44 changes: 44 additions & 0 deletions cmd/validation/cmd-validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package validation

import (
"errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"strings"
)

const (
BashCompAtLeastOneRequiredFlag = "cobra_annotation_bash_completion_at_least_one_required_flag"
)

func MarkFlagAtLeastOneRequired(flags *pflag.FlagSet, name string) error {
return flags.SetAnnotation(name, BashCompAtLeastOneRequiredFlag, []string{"true"})
}

func ValidateAtLeastOneRequiredFlag(cmd *cobra.Command) error {
requiredError := true
atLeastRequiredFlags := []string{}

cmd.Flags().VisitAll(func(flag *pflag.Flag) {
atLeastOneRequiredAnnotation := flag.Annotations[BashCompAtLeastOneRequiredFlag]
if len(atLeastOneRequiredAnnotation) == 0 {
return
}

flagRequired := atLeastOneRequiredAnnotation[0] == "true"

if flagRequired {
atLeastRequiredFlags = append(atLeastRequiredFlags, flag.Name)
}

if flag.Changed {
requiredError = false
}
})

if requiredError {
return errors.New("At least one of the following flags must be set: " + strings.Join(atLeastRequiredFlags[:], ", "))
}

return nil
}
78 changes: 78 additions & 0 deletions operations/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type CreateTopicFlags struct {
Configs []string
}

type AlterTopicFlags struct {
Partitions int32
ValidateOnly bool
Configs []string
}

type TopicOperation struct {
}

Expand Down Expand Up @@ -129,6 +135,78 @@ func (operation *TopicOperation) DescribeTopic(topic string) {
output.PrintObject(t, "yaml")
}

func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags) {

context := createClientContext()

var (
client sarama.Client
admin sarama.ClusterAdmin
err error
exists bool
)

if client, err = createClient(&context); err != nil {
output.Failf("failed to create client err=%v", err)
}

if exists, err = topicExists(&client, topic); err != nil {
output.Failf("failed to read topics err=%v", err)
}

if !exists {
output.Failf("topic '%s' does not exist", topic)
}

if admin, err = createClusterAdmin(&context); err != nil {
output.Failf("failed to create cluster admin: %v", err)
}

var t, _ = readTopic(&client, &admin, topic, true, false, false, true)

if flags.Partitions != 0 {
if len(t.Partitions) > int(flags.Partitions) {
output.Failf("Decreasing the number of partitions is not supported")
}

var emptyAssignment [][]int32 = make([][]int32, 0, 0)

err = admin.CreatePartitions(topic, flags.Partitions, emptyAssignment, flags.ValidateOnly)
if err != nil {
output.Failf("Could not create partitions for topic '%s': %v", topic, err)
}
}

if len(flags.Configs) == 0 {
operation.DescribeTopic(topic)
return
}

mergedConfigEntries := make(map[string]*string)

for i, config := range t.Configs {
mergedConfigEntries[config.Name] = &(t.Configs[i].Value)
}

for _, config := range flags.Configs {
configParts := strings.Split(config, "=")

if len(configParts) == 2 {
if len(configParts[1]) == 0 {
delete(mergedConfigEntries, configParts[0])
} else {
mergedConfigEntries[configParts[0]] = &configParts[1]
}
}
}

if err = admin.AlterConfig(sarama.TopicResource, topic, mergedConfigEntries, flags.ValidateOnly); err != nil {
output.Failf("Could not alter topic config '%s': %v", topic, err)
}

operation.DescribeTopic(topic)
}

func (operation *TopicOperation) GetTopics(flags GetTopicsFlags) {

context := createClientContext()
Expand Down

0 comments on commit 0c80319

Please sign in to comment.