Skip to content

Commit 0777ecd

Browse files
committed
Fix race in service IP allocation repair loop
1 parent 2b5a10f commit 0777ecd

File tree

5 files changed

+164
-25
lines changed

5 files changed

+164
-25
lines changed

hack/update-openapi-spec.sh

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ kube::log::status "Starting kube-apiserver"
6565
--cert-dir="${TMP_DIR}/certs" \
6666
--runtime-config="api/all=true" \
6767
--token-auth-file=$TMP_DIR/tokenauth.csv \
68+
--logtostderr \
69+
--v=2 \
6870
--service-cluster-ip-range="10.0.0.0/24" >/tmp/openapi-api-server.log 2>&1 &
6971
APISERVER_PID=$!
7072

pkg/registry/core/service/ipallocator/allocator.go

+23
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,19 @@ func NewCIDRRange(cidr *net.IPNet) *Range {
9090
})
9191
}
9292

93+
// NewFromSnapshot allocates a Range and initializes it from a snapshot.
94+
func NewFromSnapshot(snap *api.RangeAllocation) (*Range, error) {
95+
_, ipnet, err := net.ParseCIDR(snap.Range)
96+
if err != nil {
97+
return nil, err
98+
}
99+
r := NewCIDRRange(ipnet)
100+
if err := r.Restore(ipnet, snap.Data); err != nil {
101+
return nil, err
102+
}
103+
return r, nil
104+
}
105+
93106
func maximum(a, b int) int {
94107
if a > b {
95108
return a
@@ -102,6 +115,16 @@ func (r *Range) Free() int {
102115
return r.alloc.Free()
103116
}
104117

118+
// Used returns the count of IP addresses used in the range.
119+
func (r *Range) Used() int {
120+
return r.max - r.alloc.Free()
121+
}
122+
123+
// CIDR returns the CIDR covered by the range.
124+
func (r *Range) CIDR() net.IPNet {
125+
return *r.net
126+
}
127+
105128
// Allocate attempts to reserve the provided IP. ErrNotInRange or
106129
// ErrAllocated will be returned if the IP is not valid for this range
107130
// or has already been reserved. ErrFull will be returned if there

pkg/registry/core/service/ipallocator/allocator_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ func TestAllocate(t *testing.T) {
3434
if f := r.Free(); f != 254 {
3535
t.Errorf("unexpected free %d", f)
3636
}
37+
if f := r.Used(); f != 0 {
38+
t.Errorf("unexpected used %d", f)
39+
}
3740
found := sets.NewString()
3841
count := 0
3942
for r.Free() > 0 {
@@ -61,6 +64,9 @@ func TestAllocate(t *testing.T) {
6164
if f := r.Free(); f != 1 {
6265
t.Errorf("unexpected free %d", f)
6366
}
67+
if f := r.Used(); f != 253 {
68+
t.Errorf("unexpected free %d", f)
69+
}
6470
ip, err := r.AllocateNext()
6571
if err != nil {
6672
t.Fatal(err)
@@ -87,12 +93,18 @@ func TestAllocate(t *testing.T) {
8793
if f := r.Free(); f != 1 {
8894
t.Errorf("unexpected free %d", f)
8995
}
96+
if f := r.Used(); f != 253 {
97+
t.Errorf("unexpected free %d", f)
98+
}
9099
if err := r.Allocate(released); err != nil {
91100
t.Fatal(err)
92101
}
93102
if f := r.Free(); f != 0 {
94103
t.Errorf("unexpected free %d", f)
95104
}
105+
if f := r.Used(); f != 254 {
106+
t.Errorf("unexpected free %d", f)
107+
}
96108
}
97109

98110
func TestAllocateTiny(t *testing.T) {
@@ -256,3 +268,42 @@ func TestSnapshot(t *testing.T) {
256268
t.Errorf("counts do not match: %d", other.Free())
257269
}
258270
}
271+
272+
func TestNewFromSnapshot(t *testing.T) {
273+
_, cidr, err := net.ParseCIDR("192.168.0.0/24")
274+
if err != nil {
275+
t.Fatal(err)
276+
}
277+
r := NewCIDRRange(cidr)
278+
allocated := []net.IP{}
279+
for i := 0; i < 128; i++ {
280+
ip, err := r.AllocateNext()
281+
if err != nil {
282+
t.Fatal(err)
283+
}
284+
allocated = append(allocated, ip)
285+
}
286+
287+
snapshot := api.RangeAllocation{}
288+
if err = r.Snapshot(&snapshot); err != nil {
289+
t.Fatal(err)
290+
}
291+
292+
r, err = NewFromSnapshot(&snapshot)
293+
if err != nil {
294+
t.Fatal(err)
295+
}
296+
297+
if x := r.Free(); x != 126 {
298+
t.Fatalf("expected 126 free IPs, got %d", x)
299+
}
300+
if x := r.Used(); x != 128 {
301+
t.Fatalf("expected 128 used IPs, got %d", x)
302+
}
303+
304+
for _, ip := range allocated {
305+
if !r.Has(ip) {
306+
t.Fatalf("expected IP to be allocated, but it was not")
307+
}
308+
}
309+
}

pkg/registry/core/service/ipallocator/controller/repair.go

+62-14
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,13 @@ type Repair struct {
5151
serviceClient coreclient.ServicesGetter
5252
network *net.IPNet
5353
alloc rangeallocation.RangeRegistry
54+
leaks map[string]int // counter per leaked IP
5455
}
5556

57+
// How many times we need to detect a leak before we clean up. This is to
58+
// avoid races between allocating an IP and using it.
59+
const numRepairsBeforeLeakCleanup = 3
60+
5661
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
5762
// and generates informational warnings for a cluster that is not in sync.
5863
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
@@ -61,6 +66,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter,
6166
serviceClient: serviceClient,
6267
network: network,
6368
alloc: alloc,
69+
leaks: map[string]int{},
6470
}
6571
}
6672

@@ -89,18 +95,27 @@ func (c *Repair) runOnce() error {
8995

9096
// If etcd server is not running we should wait for some time and fail only then. This is particularly
9197
// important when we start apiserver and etcd at the same time.
92-
var latest *api.RangeAllocation
93-
var err error
94-
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
95-
latest, err = c.alloc.Get()
98+
var snapshot *api.RangeAllocation
99+
err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
100+
var err error
101+
snapshot, err = c.alloc.Get()
96102
return err == nil, err
97103
})
98104
if err != nil {
99105
return fmt.Errorf("unable to refresh the service IP block: %v", err)
100106
}
107+
// If not yet initialized.
108+
if snapshot.Range == "" {
109+
snapshot.Range = c.network.String()
110+
}
111+
// Create an allocator because it is easy to use.
112+
stored, err := ipallocator.NewFromSnapshot(snapshot)
113+
if err != nil {
114+
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
115+
}
101116

102117
// We explicitly send no resource version, since the resource version
103-
// of 'latest' is from a different collection, it's not comparable to
118+
// of 'snapshot' is from a different collection, it's not comparable to
104119
// the service collection. The caching layer keeps per-collection RVs,
105120
// and this is proper, since in theory the collections could be hosted
106121
// in separate etcd (or even non-etcd) instances.
@@ -109,40 +124,73 @@ func (c *Repair) runOnce() error {
109124
return fmt.Errorf("unable to refresh the service IP block: %v", err)
110125
}
111126

112-
r := ipallocator.NewCIDRRange(c.network)
127+
rebuilt := ipallocator.NewCIDRRange(c.network)
128+
// Check every Service's ClusterIP, and rebuild the state as we think it should be.
113129
for _, svc := range list.Items {
114130
if !api.IsServiceIPSet(&svc) {
131+
// didn't need a cluster IP
115132
continue
116133
}
117134
ip := net.ParseIP(svc.Spec.ClusterIP)
118135
if ip == nil {
119-
// cluster IP is broken, reallocate
136+
// cluster IP is corrupt
120137
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
121138
continue
122139
}
123-
switch err := r.Allocate(ip); err {
140+
// mark it as in-use
141+
switch err := rebuilt.Allocate(ip); err {
124142
case nil:
143+
if stored.Has(ip) {
144+
// remove it from the old set, so we can find leaks
145+
stored.Release(ip)
146+
} else {
147+
// cluster IP doesn't seem to be allocated
148+
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
149+
}
150+
delete(c.leaks, ip.String()) // it is used, so it can't be leaked
125151
case ipallocator.ErrAllocated:
126152
// TODO: send event
127-
// cluster IP is broken, reallocate
153+
// cluster IP is duplicate
128154
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace))
129155
case ipallocator.ErrNotInRange:
130156
// TODO: send event
131-
// cluster IP is broken, reallocate
157+
// cluster IP is out of range
132158
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
133159
case ipallocator.ErrFull:
134160
// TODO: send event
135-
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r)
161+
// somehow we are out of IPs
162+
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt)
136163
default:
137164
return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err)
138165
}
139166
}
140167

141-
if err := r.Snapshot(latest); err != nil {
168+
// Check for IPs that are left in the old set. They appear to have been leaked.
169+
stored.ForEach(func(ip net.IP) {
170+
count, found := c.leaks[ip.String()]
171+
switch {
172+
case !found:
173+
// flag it to be cleaned up after any races (hopefully) are gone
174+
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip))
175+
count = numRepairsBeforeLeakCleanup - 1
176+
fallthrough
177+
case count > 0:
178+
// pretend it is still in use until count expires
179+
c.leaks[ip.String()] = count - 1
180+
if err := rebuilt.Allocate(ip); err != nil {
181+
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
182+
}
183+
default:
184+
// do not add it to the rebuilt set, which means it will be available for reuse
185+
runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
186+
}
187+
})
188+
189+
// Blast the rebuilt state into storage.
190+
if err := rebuilt.Snapshot(snapshot); err != nil {
142191
return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
143192
}
144-
145-
if err := c.alloc.CreateOrUpdate(latest); err != nil {
193+
if err := c.alloc.CreateOrUpdate(snapshot); err != nil {
146194
if errors.IsConflict(err) {
147195
return err
148196
}

pkg/registry/core/service/ipallocator/controller/repair_test.go

+26-11
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
5050

5151
func TestRepair(t *testing.T) {
5252
fakeClient := fake.NewSimpleClientset()
53-
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
5453
ipregistry := &mockRangeRegistry{
55-
item: &api.RangeAllocation{},
54+
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
5655
}
56+
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
5757
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
5858

5959
if err := r.RunOnce(); err != nil {
@@ -64,7 +64,7 @@ func TestRepair(t *testing.T) {
6464
}
6565

6666
ipregistry = &mockRangeRegistry{
67-
item: &api.RangeAllocation{},
67+
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
6868
updateErr: fmt.Errorf("test error"),
6969
}
7070
r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
@@ -73,7 +73,7 @@ func TestRepair(t *testing.T) {
7373
}
7474
}
7575

76-
func TestRepairEmpty(t *testing.T) {
76+
func TestRepairLeak(t *testing.T) {
7777
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
7878
previous := ipallocator.NewCIDRRange(cidr)
7979
previous.Allocate(net.ParseIP("192.168.1.10"))
@@ -94,16 +94,31 @@ func TestRepairEmpty(t *testing.T) {
9494
Data: dst.Data,
9595
},
9696
}
97+
9798
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
99+
// Run through the "leak detection holdoff" loops.
100+
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
101+
if err := r.RunOnce(); err != nil {
102+
t.Fatal(err)
103+
}
104+
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
105+
if err != nil {
106+
t.Fatal(err)
107+
}
108+
if !after.Has(net.ParseIP("192.168.1.10")) {
109+
t.Errorf("expected ipallocator to still have leaked IP")
110+
}
111+
}
112+
// Run one more time to actually remove the leak.
98113
if err := r.RunOnce(); err != nil {
99114
t.Fatal(err)
100115
}
101-
after := ipallocator.NewCIDRRange(cidr)
102-
if err := after.Restore(cidr, ipregistry.updated.Data); err != nil {
116+
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
117+
if err != nil {
103118
t.Fatal(err)
104119
}
105120
if after.Has(net.ParseIP("192.168.1.10")) {
106-
t.Errorf("unexpected ipallocator state: %#v", after)
121+
t.Errorf("expected ipallocator to not have leaked IP")
107122
}
108123
}
109124

@@ -157,14 +172,14 @@ func TestRepairWithExisting(t *testing.T) {
157172
if err := r.RunOnce(); err != nil {
158173
t.Fatal(err)
159174
}
160-
after := ipallocator.NewCIDRRange(cidr)
161-
if err := after.Restore(cidr, ipregistry.updated.Data); err != nil {
175+
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
176+
if err != nil {
162177
t.Fatal(err)
163178
}
164179
if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) {
165180
t.Errorf("unexpected ipallocator state: %#v", after)
166181
}
167-
if after.Free() != 252 {
168-
t.Errorf("unexpected ipallocator state: %#v", after)
182+
if free := after.Free(); free != 252 {
183+
t.Errorf("unexpected ipallocator state: %d free", free)
169184
}
170185
}

0 commit comments

Comments
 (0)