Skip to content

Commit

Permalink
Merge pull request #891 from twmb/kadm-auth-ops
Browse files Browse the repository at this point in the history
kadm: always request authorized operations
  • Loading branch information
twmb authored Jan 20, 2025
2 parents 080214e + 13584b5 commit 237dc07
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
16 changes: 10 additions & 6 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type DescribedGroup struct {
Protocol string // Protocol is the partition assignor strategy this group is using.
Members []DescribedGroupMember // Members contains the members of this group sorted first by InstanceID, or if nil, by MemberID.

AuthorizedOperations []ACLOperation // AuthorizedOperations contains operations the requesting client is allowed to perform on this group.

Err error // Err is non-nil if the group could not be described.
}

Expand Down Expand Up @@ -275,6 +277,7 @@ func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (Describ

req := kmsg.NewPtrDescribeGroupsRequest()
req.Groups = groups
req.IncludeAuthorizedOperations = true

shards := cl.cl.RequestSharded(ctx, req)
described := make(DescribedGroups)
Expand All @@ -285,12 +288,13 @@ func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (Describ
return err
}
g := DescribedGroup{
Group: rg.Group,
Coordinator: b,
State: rg.State,
ProtocolType: rg.ProtocolType,
Protocol: rg.Protocol,
Err: kerr.ErrorForCode(rg.ErrorCode),
Group: rg.Group,
Coordinator: b,
State: rg.State,
ProtocolType: rg.ProtocolType,
Protocol: rg.Protocol,
AuthorizedOperations: DecodeACLOperations(rg.AuthorizedOperations),
Err: kerr.ErrorForCode(rg.ErrorCode),
}
for _, rm := range rg.Members {
gm := DescribedGroupMember{
Expand Down
27 changes: 17 additions & 10 deletions pkg/kadm/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type TopicDetail struct {
IsInternal bool // IsInternal is whether the topic is an internal topic.
Partitions PartitionDetails // Partitions contains details about the topic's partitions.

AuthorizedOperations []ACLOperation // AuthorizedOperations contains operations the requesting client is allowed to perform on this topic.

Err error // Err is non-nil if the topic could not be loaded.
}

Expand Down Expand Up @@ -183,10 +185,11 @@ func (ds TopicDetails) TopicsList() TopicsList {

// Metadata is the data from a metadata response.
type Metadata struct {
Cluster string // Cluster is the cluster name, if any.
Controller int32 // Controller is the node ID of the controller broker, if available, otherwise -1.
Brokers BrokerDetails // Brokers contains broker details, sorted by default.
Topics TopicDetails // Topics contains topic details.
Cluster string // Cluster is the cluster name, if any.
Controller int32 // Controller is the node ID of the controller broker, if available, otherwise -1.
Brokers BrokerDetails // Brokers contains broker details, sorted by default.
Topics TopicDetails // Topics contains topic details.
AuthorizedOperations []ACLOperation // AuthorizedOperations contains operations the requesting client is allowed to perform, if the client has Describe permissions on the cluster.
}

func int32s(is []int32) []int32 {
Expand Down Expand Up @@ -226,6 +229,8 @@ func (cl *Client) Metadata(

func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) (Metadata, error) {
req := kmsg.NewPtrMetadataRequest()
req.IncludeClusterAuthorizedOperations = true
req.IncludeTopicAuthorizedOperations = true
for _, t := range topics {
rt := kmsg.NewMetadataRequestTopic()
rt.Topic = kmsg.StringPtr(t)
Expand All @@ -245,10 +250,11 @@ func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string)
return Metadata{}, err
}
td := TopicDetail{
ID: t.TopicID,
Partitions: make(map[int32]PartitionDetail),
IsInternal: t.IsInternal,
Err: kerr.ErrorForCode(t.ErrorCode),
ID: t.TopicID,
Partitions: make(map[int32]PartitionDetail),
IsInternal: t.IsInternal,
AuthorizedOperations: DecodeACLOperations(t.AuthorizedOperations),
Err: kerr.ErrorForCode(t.ErrorCode),
}
if t.Topic != nil {
td.Topic = *t.Topic
Expand All @@ -271,8 +277,9 @@ func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string)
}

m := Metadata{
Controller: resp.ControllerID,
Topics: tds,
Controller: resp.ControllerID,
Topics: tds,
AuthorizedOperations: DecodeACLOperations(resp.AuthorizedOperations),
}
if resp.ClusterID != nil {
m.Cluster = *resp.ClusterID
Expand Down

0 comments on commit 237dc07

Please sign in to comment.