-
Notifications
You must be signed in to change notification settings - Fork 805
/
Copy pathlistpartitionreassignments.go
135 lines (114 loc) · 3.73 KB
/
listpartitionreassignments.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package kafka
import (
"context"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
)
// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
type ListPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topics we want reassignments for, mapped by their name, or nil to list everything.
Topics map[string]ListPartitionReassignmentsRequestTopic
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
// topic.
type ListPartitionReassignmentsRequestTopic struct {
// The partitions to list partition reassignments for.
PartitionIndexes []int
}
// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
type ListPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered.
Error error
// Topics contains results for each topic, mapped by their name.
Topics map[string]ListPartitionReassignmentsResponseTopic
}
// ListPartitionReassignmentsResponseTopic contains the detailed result of
// ongoing reassignments for a topic.
type ListPartitionReassignmentsResponseTopic struct {
// Partitions contains result for topic partitions.
Partitions []ListPartitionReassignmentsResponsePartition
}
// ListPartitionReassignmentsResponsePartition contains the detailed result of
// ongoing reassignments for a single partition.
type ListPartitionReassignmentsResponsePartition struct {
// PartitionIndex contains index of the partition.
PartitionIndex int
// Replicas contains the current replica set.
Replicas []int
// AddingReplicas contains the set of replicas we are currently adding.
AddingReplicas []int
// RemovingReplicas contains the set of replicas we are currently removing.
RemovingReplicas []int
}
func (c *Client) ListPartitionReassignments(
ctx context.Context,
req *ListPartitionReassignmentsRequest,
) (*ListPartitionReassignmentsResponse, error) {
apiReq := &listpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
}
for topicName, topicReq := range req.Topics {
apiReq.Topics = append(
apiReq.Topics,
listpartitionreassignments.RequestTopic{
Name: topicName,
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
},
)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*listpartitionreassignments.Response)
resp := &ListPartitionReassignmentsResponse{
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
}
for _, topicResult := range apiResp.Topics {
respTopic := ListPartitionReassignmentsResponseTopic{}
for _, partitionResult := range topicResult.Partitions {
respTopic.Partitions = append(
respTopic.Partitions,
ListPartitionReassignmentsResponsePartition{
PartitionIndex: int(partitionResult.PartitionIndex),
Replicas: int32ToIntArray(partitionResult.Replicas),
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas),
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
},
)
}
resp.Topics[topicResult.Name] = respTopic
}
return resp, nil
}
func intToInt32Array(arr []int) []int32 {
if arr == nil {
return nil
}
res := make([]int32, len(arr))
for i := range arr {
res[i] = int32(arr[i])
}
return res
}
func int32ToIntArray(arr []int32) []int {
if arr == nil {
return nil
}
res := make([]int, len(arr))
for i := range arr {
res[i] = int(arr[i])
}
return res
}