Skip to content

Commit 1d833ec

Browse files
[azservicebus] Adding a timeout so link closures have a maximum of 60 seconds before they are cancelled. (Azure#19886)
Adding a timeout so link closures have a maximum of 60 seconds before they time out. This fixes an issue that came up when I made a fix in the last release to not timeout on close (since that can leave us in an inconsistent state). This surfaced an interesting problem where customers do see Close() hang. So this adds in a timeout instead, which will cause us to close the connection to avoid inconsistency and also to recover overall since it's likely there's a problem that will require a more thorough reset than a simple link detach/reattach would give us. As part of this I also created some more thorough mocks that test more of the connection/open behavior as well as links, ensuring that we do cleanup and end up in the state that we want. It's not 100% complete but it's good enough for this kind of testing and can be expanded.
1 parent 40b4989 commit 1d833ec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3739
-728
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
# Release History
22

3-
## 1.1.5 (Unreleased)
3+
## 1.2.0 (Unreleased)
44

55
### Features Added
66

77
### Breaking Changes
88

99
### Bugs Fixed
1010

11+
- Links could hang when closing, preventing recovery from completing and making a link appear stalled. (PR#19886)
12+
1113
### Other Changes
1214

1315
## 1.1.4 (2023-01-10)

sdk/messaging/azservicebus/client.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1616
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1717
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1819
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1920
)
2021

@@ -28,15 +29,10 @@ type Client struct {
2829
// PR: https://github.com/Azure/azure-sdk-for-go/pull/16847
2930
linkCounter uint64
3031

31-
linksMu *sync.Mutex
32-
links map[uint64]internal.Closeable
33-
creds clientCreds
34-
namespace interface {
35-
// used internally by `Client`
36-
internal.NamespaceWithNewAMQPLinks
37-
// for child clients
38-
internal.NamespaceForAMQPLinks
39-
}
32+
linksMu *sync.Mutex
33+
links map[uint64]amqpwrap.Closeable
34+
creds clientCreds
35+
namespace internal.NamespaceForAMQPLinks
4036
retryOptions RetryOptions
4137

4238
// acceptNextTimeout controls how long the session accept can take before
@@ -85,7 +81,9 @@ func NewClient(fullyQualifiedNamespace string, credential azcore.TokenCredential
8581
return newClientImpl(clientCreds{
8682
credential: credential,
8783
fullyQualifiedNamespace: fullyQualifiedNamespace,
88-
}, options)
84+
}, clientImplArgs{
85+
ClientOptions: options,
86+
})
8987
}
9088

9189
// NewClientFromConnectionString creates a new Client for a Service Bus namespace using a connection string.
@@ -105,7 +103,9 @@ func NewClientFromConnectionString(connectionString string, options *ClientOptio
105103

106104
return newClientImpl(clientCreds{
107105
connectionString: connectionString,
108-
}, options)
106+
}, clientImplArgs{
107+
ClientOptions: options,
108+
})
109109
}
110110

111111
// Next overloads (ie, credential sticks with the client)
@@ -120,11 +120,16 @@ type clientCreds struct {
120120
credential azcore.TokenCredential
121121
}
122122

123-
func newClientImpl(creds clientCreds, options *ClientOptions) (*Client, error) {
123+
type clientImplArgs struct {
124+
ClientOptions *ClientOptions
125+
NSOptions []internal.NamespaceOption
126+
}
127+
128+
func newClientImpl(creds clientCreds, args clientImplArgs) (*Client, error) {
124129
client := &Client{
125130
linksMu: &sync.Mutex{},
126131
creds: creds,
127-
links: map[uint64]internal.Closeable{},
132+
links: map[uint64]amqpwrap.Closeable{},
128133
}
129134

130135
var err error
@@ -140,24 +145,26 @@ func newClientImpl(creds clientCreds, options *ClientOptions) (*Client, error) {
140145
nsOptions = append(nsOptions, option)
141146
}
142147

143-
if options != nil {
144-
client.retryOptions = options.RetryOptions
148+
if args.ClientOptions != nil {
149+
client.retryOptions = args.ClientOptions.RetryOptions
145150

146-
if options.TLSConfig != nil {
147-
nsOptions = append(nsOptions, internal.NamespaceWithTLSConfig(options.TLSConfig))
151+
if args.ClientOptions.TLSConfig != nil {
152+
nsOptions = append(nsOptions, internal.NamespaceWithTLSConfig(args.ClientOptions.TLSConfig))
148153
}
149154

150-
if options.NewWebSocketConn != nil {
151-
nsOptions = append(nsOptions, internal.NamespaceWithWebSocket(options.NewWebSocketConn))
155+
if args.ClientOptions.NewWebSocketConn != nil {
156+
nsOptions = append(nsOptions, internal.NamespaceWithWebSocket(args.ClientOptions.NewWebSocketConn))
152157
}
153158

154-
if options.ApplicationID != "" {
155-
nsOptions = append(nsOptions, internal.NamespaceWithUserAgent(options.ApplicationID))
159+
if args.ClientOptions.ApplicationID != "" {
160+
nsOptions = append(nsOptions, internal.NamespaceWithUserAgent(args.ClientOptions.ApplicationID))
156161
}
157162

158-
nsOptions = append(nsOptions, internal.NamespaceWithRetryOptions(options.RetryOptions))
163+
nsOptions = append(nsOptions, internal.NamespaceWithRetryOptions(args.ClientOptions.RetryOptions))
159164
}
160165

166+
nsOptions = append(nsOptions, args.NSOptions...)
167+
161168
client.namespace, err = internal.NewNamespace(nsOptions...)
162169
return client, err
163170
}
@@ -303,7 +310,7 @@ func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topi
303310
// Close closes the current connection Service Bus as well as any Senders or Receivers created
304311
// using this client.
305312
func (client *Client) Close(ctx context.Context) error {
306-
var links []internal.Closeable
313+
var links []amqpwrap.Closeable
307314

308315
client.linksMu.Lock()
309316

@@ -319,7 +326,7 @@ func (client *Client) Close(ctx context.Context) error {
319326
}
320327
}
321328

322-
return client.namespace.Close(ctx, true)
329+
return client.namespace.Close(true)
323330
}
324331

325332
func (client *Client) acceptNextSessionForEntity(ctx context.Context, entity entity, options *SessionReceiverOptions) (*SessionReceiver, error) {
@@ -347,7 +354,7 @@ func (client *Client) acceptNextSessionForEntity(ctx context.Context, entity ent
347354
return sessionReceiver, nil
348355
}
349356

350-
func (client *Client) addCloseable(id uint64, closeable internal.Closeable) {
357+
func (client *Client) addCloseable(id uint64, closeable amqpwrap.Closeable) {
351358
client.linksMu.Lock()
352359
client.links[id] = closeable
353360
client.linksMu.Unlock()

sdk/messaging/azservicebus/client_test.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -395,52 +395,6 @@ func TestNewClientUnitTests(t *testing.T) {
395395
require.EqualValues(t, ns.FQDN, "mysb.windows.servicebus.net")
396396
})
397397

398-
t.Run("CloseAndLinkTracking", func(t *testing.T) {
399-
setupClient := func() (*Client, *internal.FakeNS) {
400-
client, err := NewClient("fake.something", struct{ azcore.TokenCredential }{}, nil)
401-
require.NoError(t, err)
402-
403-
ns := &internal.FakeNS{
404-
AMQPLinks: &internal.FakeAMQPLinks{
405-
Receiver: &internal.FakeAMQPReceiver{},
406-
},
407-
}
408-
409-
client.namespace = ns
410-
return client, ns
411-
}
412-
413-
client, ns := setupClient()
414-
_, err := client.NewSender("hello", nil)
415-
416-
require.NoError(t, err)
417-
require.EqualValues(t, 1, len(client.links))
418-
require.NotNil(t, client.links[1])
419-
require.NoError(t, client.Close(context.Background()))
420-
require.Empty(t, client.links)
421-
require.True(t, ns.AMQPLinks.ClosedPermanently())
422-
423-
client, ns = setupClient()
424-
_, err = client.NewReceiverForQueue("hello", nil)
425-
426-
require.NoError(t, err)
427-
require.EqualValues(t, 1, len(client.links))
428-
require.NotNil(t, client.links[1])
429-
require.NoError(t, client.Close(context.Background()))
430-
require.Empty(t, client.links)
431-
require.True(t, ns.AMQPLinks.ClosedPermanently())
432-
433-
client, ns = setupClient()
434-
_, err = client.NewReceiverForSubscription("hello", "world", nil)
435-
436-
require.NoError(t, err)
437-
require.EqualValues(t, 1, len(client.links))
438-
require.NotNil(t, client.links[1])
439-
require.NoError(t, client.Close(context.Background()))
440-
require.Empty(t, client.links)
441-
require.EqualValues(t, 1, ns.AMQPLinks.Closed)
442-
})
443-
444398
t.Run("RetryOptionsArePropagated", func(t *testing.T) {
445399
// retry options are passed and copied along several routes, just make sure it's properly propagated.
446400
// NOTE: session receivers are checked in a separate test because they require actual SB access.

sdk/messaging/azservicebus/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ require (
88
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0
99
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0
1010
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2
11-
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
1211
)
1312

1413
require (
@@ -25,7 +24,7 @@ require (
2524
nhooyr.io/websocket v1.8.6
2625
)
2726

28-
require golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
27+
require github.com/golang/mock v1.6.0
2928

3029
require (
3130
code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c // indirect
@@ -40,6 +39,7 @@ require (
4039
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
4140
github.com/pmezard/go-difflib v1.0.0 // indirect
4241
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 // indirect
42+
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
4343
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
4444
golang.org/x/text v0.3.7 // indirect
4545
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect

sdk/messaging/azservicebus/go.sum

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
3737
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
3838
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
3939
github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU=
40+
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
41+
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
4042
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
4143
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
4244
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
@@ -91,28 +93,44 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
9193
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
9294
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
9395
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
96+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
9497
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
98+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
9599
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
96100
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 h1:Tgea0cVUD0ivh5ADBX4WwuI12DUd2to3nCYe2eayMIw=
97101
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
102+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
98103
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
99104
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
105+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
106+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
100107
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA=
101108
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
102109
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
110+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
111+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
103112
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
104113
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
105114
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
106115
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
116+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
107117
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
118+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
119+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
108120
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
109121
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
122+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
110123
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
111124
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
125+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
112126
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
113127
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
114128
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
115129
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
130+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
131+
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
132+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
133+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
116134
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
117135
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
118136
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

sdk/messaging/azservicebus/internal/amqpInterfaces.go

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)