Skip to content

Commit 771b390

Browse files
committed
review comments
1 parent c459776 commit 771b390

File tree

5 files changed

+90
-73
lines changed

5 files changed

+90
-73
lines changed

config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
540540
}
541541
lifecycle.Append(fx.Hook{
542542
OnStart: func(context.Context) error {
543-
o.Start()
543+
o.Start(s)
544544
return nil
545545
},
546546
OnStop: func(context.Context) error {

p2p/host/basic/addrs_manager.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -161,33 +161,27 @@ func (a *addrsManager) updateAddrsSync() {
161161
}
162162
}
163163

164-
func closeIfError(err error, closer io.Closer, name string) error {
165-
if err != nil {
166-
err1 := closer.Close()
167-
if err1 != nil {
168-
err1 = fmt.Errorf("error closing %s: %w", name, err1)
169-
}
170-
return errors.Join(err, err1)
171-
}
172-
return nil
173-
}
174-
175164
func (a *addrsManager) startBackgroundWorker() (retErr error) {
176-
autoRelayAddrsSub, err := a.bus.Subscribe(new(event.EvtAutoRelayAddrsUpdated), eventbus.Name("addrs-manager"))
165+
autoRelayAddrsSub, err := a.bus.Subscribe(new(event.EvtAutoRelayAddrsUpdated), eventbus.Name("addrs-manager autorelay sub"))
177166
if err != nil {
178167
return fmt.Errorf("error subscribing to auto relay addrs: %s", err)
179168
}
180-
defer func() { retErr = closeIfError(retErr, autoRelayAddrsSub, "autorelay subscription") }()
181-
182-
autonatReachabilitySub, err := a.bus.Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("addrs-manager"))
169+
mc := multiCloser{autoRelayAddrsSub}
170+
autonatReachabilitySub, err := a.bus.Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("addrs-manager autonatv1 sub"))
183171
if err != nil {
184-
return fmt.Errorf("error subscribing to autonat reachability: %s", err)
172+
return errors.Join(
173+
fmt.Errorf("error subscribing to autonat reachability: %s", err),
174+
mc.Close(),
175+
)
185176
}
186-
defer func() { retErr = closeIfError(retErr, autonatReachabilitySub, "autonatReachability subscription") }()
177+
mc = append(mc, autonatReachabilitySub)
187178

188179
emitter, err := a.bus.Emitter(new(event.EvtHostReachableAddrsChanged), eventbus.Stateful)
189180
if err != nil {
190-
return fmt.Errorf("error creating reachability subscriber: %s", err)
181+
return errors.Join(
182+
fmt.Errorf("error creating reachability subscriber: %s", err),
183+
mc.Close(),
184+
)
191185
}
192186

193187
var relayAddrs []ma.Multiaddr
@@ -660,3 +654,21 @@ func removeNotInSource(addrs, source []ma.Multiaddr) []ma.Multiaddr {
660654
}
661655
return addrs[:i]
662656
}
657+
658+
type multiCloser []io.Closer
659+
660+
func (mc *multiCloser) Close() error {
661+
var errs []error
662+
for _, closer := range *mc {
663+
if err := closer.Close(); err != nil {
664+
var closerName string
665+
if named, ok := closer.(interface{ Name() string }); ok {
666+
closerName = named.Name()
667+
} else {
668+
closerName = fmt.Sprintf("%T", closer)
669+
}
670+
errs = append(errs, fmt.Errorf("error closing %s: %w", closerName, err))
671+
}
672+
}
673+
return errors.Join(errs...)
674+
}

p2p/host/observedaddrs/manager.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,45 +167,48 @@ func NewManager(eventbus event.Bus, net network.Network) (*Manager, error) {
167167
return nil, err
168168
}
169169

170-
// Maybe this should be done in start, but that makes testing without a swarm difficult.
171-
nb := &network.NotifyBundle{
172-
DisconnectedF: func(_ network.Network, c network.Conn) {
173-
o.RemoveConn(c)
174-
},
175-
}
176-
o.stopNotify = func() {
177-
net.StopNotify(nb)
178-
}
179170
return o, nil
180171
}
181172

182173
// newManagerWithListenAddrs uses the listenAddrs directly to simplify creation in tests.
183-
func newManagerWithListenAddrs(eventbus event.Bus, listenAddrs func() []ma.Multiaddr) (*Manager, error) {
174+
func newManagerWithListenAddrs(bus event.Bus, listenAddrs func() []ma.Multiaddr) (*Manager, error) {
184175
o := &Manager{
185176
externalAddrs: make(map[string]map[string]*observerSet),
186177
connObservedTWAddrs: make(map[connMultiaddrs]ma.Multiaddr),
187178
wch: make(chan observation, observedAddrManagerWorkerChannelSize),
188179
listenAddrs: listenAddrs,
189-
eventbus: eventbus,
180+
eventbus: bus,
190181
stopNotify: func() {},
191182
}
192183
o.ctx, o.ctxCancel = context.WithCancel(context.Background())
193184
return o, nil
194185
}
195186

196187
// Start tracking addrs
197-
func (o *Manager) Start() {
188+
func (o *Manager) Start(n network.Network) {
189+
nb := &network.NotifyBundle{
190+
DisconnectedF: func(_ network.Network, c network.Conn) {
191+
o.removeConn(c)
192+
},
193+
}
194+
198195
sub, err := o.eventbus.Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.Name("observed-addrs-manager"))
199196
if err != nil {
200-
log.Errorf("failed to start observed addrs manager")
197+
log.Errorf("failed to start observed addrs manager: identify subscription failed: %s", err)
201198
return
202199
}
203200
emitter, err := o.eventbus.Emitter(new(event.EvtNATDeviceTypeChanged), eventbus.Stateful)
204201
if err != nil {
205-
log.Errorf("failed to start nat device type changed emitter: %s", err)
202+
log.Errorf("failed to start observed addrs manager: nat device type changed emitter error: %s", err)
206203
sub.Close()
207204
return
208205
}
206+
207+
n.Notify(nb)
208+
o.stopNotify = func() {
209+
n.StopNotify(nb)
210+
}
211+
209212
o.wg.Add(2)
210213
go o.eventHandler(sub, emitter)
211214
go o.worker()
@@ -484,7 +487,7 @@ func (o *Manager) addExternalAddrsUnlocked(observedTWAddr ma.Multiaddr, observer
484487
s.ObservedBy[observer]++
485488
}
486489

487-
func (o *Manager) RemoveConn(conn connMultiaddrs) {
490+
func (o *Manager) removeConn(conn connMultiaddrs) {
488491
if conn == nil {
489492
return
490493
}

p2p/host/observedaddrs/manager_test.go

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/libp2p/go-libp2p/core/network"
1212
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1313

14+
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
1415
ma "github.com/multiformats/go-multiaddr"
1516
matest "github.com/multiformats/go-multiaddr/matest"
1617
manet "github.com/multiformats/go-multiaddr/net"
@@ -59,7 +60,8 @@ func TestObservedAddrsManager(t *testing.T) {
5960
if err != nil {
6061
t.Fatal(err)
6162
}
62-
o.Start()
63+
s := swarmt.GenSwarm(t)
64+
o.Start(s)
6365
t.Cleanup(func() { o.Close() })
6466
return o
6567
}
@@ -113,10 +115,10 @@ func TestObservedAddrsManager(t *testing.T) {
113115
require.Eventually(t, func() bool {
114116
return matest.AssertEqualMultiaddrs(t, o.Addrs(0), []ma.Multiaddr{observed})
115117
}, 1*time.Second, 100*time.Millisecond)
116-
o.RemoveConn(c1)
117-
o.RemoveConn(c2)
118-
o.RemoveConn(c3)
119-
o.RemoveConn(c4)
118+
o.removeConn(c1)
119+
o.removeConn(c2)
120+
o.removeConn(c3)
121+
o.removeConn(c4)
120122
require.Eventually(t, func() bool {
121123
return checkAllEntriesRemoved(o)
122124
}, 1*time.Second, 100*time.Millisecond)
@@ -139,7 +141,7 @@ func TestObservedAddrsManager(t *testing.T) {
139141
len(o.AddrsFor(tcp4ListenAddr)) == maxExternalThinWaistAddrsPerLocalAddr
140142
}, 1*time.Second, 100*time.Millisecond)
141143
for _, c := range conns {
142-
o.RemoveConn(c)
144+
o.removeConn(c)
143145
}
144146
require.Eventually(t, func() bool {
145147
return checkAllEntriesRemoved(o)
@@ -162,10 +164,10 @@ func TestObservedAddrsManager(t *testing.T) {
162164
require.EventuallyWithT(t, func(t *assert.CollectT) {
163165
matest.AssertEqualMultiaddrs(t, o.Addrs(0), []ma.Multiaddr{observedQuic, observedWebTransport})
164166
}, 1*time.Second, 100*time.Millisecond)
165-
o.RemoveConn(c1)
166-
o.RemoveConn(c2)
167-
o.RemoveConn(c3)
168-
o.RemoveConn(c4)
167+
o.removeConn(c1)
168+
o.removeConn(c2)
169+
o.removeConn(c3)
170+
o.removeConn(c4)
169171
require.Eventually(t, func() bool {
170172
return checkAllEntriesRemoved(o)
171173
}, 1*time.Second, 100*time.Millisecond)
@@ -187,10 +189,10 @@ func TestObservedAddrsManager(t *testing.T) {
187189
require.EventuallyWithT(t, func(t *assert.CollectT) {
188190
matest.AssertEqualMultiaddrs(t, o.Addrs(0), []ma.Multiaddr{observedQuic, inferredWebTransport})
189191
}, 1*time.Second, 100*time.Millisecond)
190-
o.RemoveConn(c1)
191-
o.RemoveConn(c2)
192-
o.RemoveConn(c3)
193-
o.RemoveConn(c4)
192+
o.removeConn(c1)
193+
o.removeConn(c2)
194+
o.removeConn(c3)
195+
o.removeConn(c4)
194196
require.Eventually(t, func() bool {
195197
return checkAllEntriesRemoved(o)
196198
}, 1*time.Second, 100*time.Millisecond)
@@ -225,7 +227,7 @@ func TestObservedAddrsManager(t *testing.T) {
225227

226228
// Now disconnect first observer group
227229
for i := 0; i < N; i++ {
228-
o.RemoveConn(ob1[i])
230+
o.removeConn(ob1[i])
229231
}
230232
time.Sleep(100 * time.Millisecond)
231233
if !matest.AssertEqualMultiaddrs(t, o.Addrs(0), []ma.Multiaddr{observedQuic, inferredWebTransport}) {
@@ -234,7 +236,7 @@ func TestObservedAddrsManager(t *testing.T) {
234236

235237
// Now disconnect the second group to check cleanup
236238
for i := 0; i < N; i++ {
237-
o.RemoveConn(ob2[i])
239+
o.removeConn(ob2[i])
238240
}
239241
require.Eventually(t, func() bool {
240242
return checkAllEntriesRemoved(o)
@@ -272,7 +274,7 @@ func TestObservedAddrsManager(t *testing.T) {
272274

273275
// Now disconnect first observer group
274276
for i := 0; i < N; i++ {
275-
o.RemoveConn(ob1[i])
277+
o.removeConn(ob1[i])
276278
}
277279
time.Sleep(100 * time.Millisecond)
278280
if !matest.AssertEqualMultiaddrs(t, o.Addrs(0), []ma.Multiaddr{observedQuic2, inferredWebTransport2}) {
@@ -281,7 +283,7 @@ func TestObservedAddrsManager(t *testing.T) {
281283

282284
// Now disconnect the second group to check cleanup
283285
for i := 0; i < N; i++ {
284-
o.RemoveConn(ob2[i])
286+
o.removeConn(ob2[i])
285287
}
286288
require.Eventually(t, func() bool {
287289
return checkAllEntriesRemoved(o)
@@ -325,17 +327,17 @@ func TestObservedAddrsManager(t *testing.T) {
325327

326328
for i := 0; i < 3; i++ {
327329
// remove non-recorded connection
328-
o.RemoveConn(c6)
330+
o.removeConn(c6)
329331
}
330332
requireEqualAddrs(t, []ma.Multiaddr{observedWebTransportWithCertHash}, o.AddrsFor(webTransport4ListenAddr))
331333
requireEqualAddrs(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr))
332334
requireAddrsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransportWithCertHash}, o.Addrs(0))
333335

334-
o.RemoveConn(c1)
335-
o.RemoveConn(c2)
336-
o.RemoveConn(c3)
337-
o.RemoveConn(c4)
338-
o.RemoveConn(c5)
336+
o.removeConn(c1)
337+
o.removeConn(c2)
338+
o.removeConn(c3)
339+
o.removeConn(c4)
340+
o.removeConn(c5)
339341
require.Eventually(t, func() bool {
340342
return checkAllEntriesRemoved(o)
341343
}, 1*time.Second, 100*time.Millisecond)
@@ -357,10 +359,10 @@ func TestObservedAddrsManager(t *testing.T) {
357359
require.EventuallyWithT(t, func(t *assert.CollectT) {
358360
matest.AssertMultiaddrsMatch(t, o.Addrs(0), []ma.Multiaddr{observedWebTransportWithCerthash, inferredQUIC})
359361
}, 1*time.Second, 100*time.Millisecond)
360-
o.RemoveConn(c1)
361-
o.RemoveConn(c2)
362-
o.RemoveConn(c3)
363-
o.RemoveConn(c4)
362+
o.removeConn(c1)
363+
o.removeConn(c2)
364+
o.removeConn(c3)
365+
o.removeConn(c4)
364366
require.Eventually(t, func() bool {
365367
return checkAllEntriesRemoved(o)
366368
}, 1*time.Second, 100*time.Millisecond)
@@ -418,8 +420,8 @@ func TestObservedAddrsManager(t *testing.T) {
418420
require.Equal(t, udpNAT, network.NATDeviceTypeEndpointDependent)
419421

420422
for i := 0; i < N; i++ {
421-
o.RemoveConn(tcpConns[i])
422-
o.RemoveConn(quicConns[i])
423+
o.removeConn(tcpConns[i])
424+
o.removeConn(quicConns[i])
423425
}
424426
require.Eventually(t, func() bool {
425427
return checkAllEntriesRemoved(o)
@@ -433,7 +435,7 @@ func TestObservedAddrsManager(t *testing.T) {
433435
o.maybeRecordObservation(newConn(tcp4ListenAddr, remoteAddr), nil)
434436
o.maybeRecordObservation(nil, remoteAddr)
435437
o.AddrsFor(nil)
436-
o.RemoveConn(nil)
438+
o.removeConn(nil)
437439
})
438440

439441
t.Run("Many connection many observations IP4 And IP6", func(t *testing.T) {
@@ -518,12 +520,12 @@ func TestObservedAddrsManager(t *testing.T) {
518520
}, 1*time.Second, 100*time.Millisecond)
519521

520522
for i := 0; i < N; i++ {
521-
o.RemoveConn(tcp4Conns[i])
522-
o.RemoveConn(quic4Conns[i])
523-
o.RemoveConn(webTransport4Conns[i])
524-
o.RemoveConn(tcp6Conns[i])
525-
o.RemoveConn(quic6Conns[i])
526-
o.RemoveConn(webTransport6Conns[i])
523+
o.removeConn(tcp4Conns[i])
524+
o.removeConn(quic4Conns[i])
525+
o.removeConn(webTransport4Conns[i])
526+
o.removeConn(tcp6Conns[i])
527+
o.removeConn(quic6Conns[i])
528+
o.removeConn(webTransport6Conns[i])
527529
}
528530
require.Eventually(t, func() bool {
529531
return checkAllEntriesRemoved(o)
@@ -596,7 +598,7 @@ func FuzzObservedAddrsManager(f *testing.F) {
596598
o.maybeRecordObservation(c, addrs[i])
597599
o.maybeRecordObservation(c, nil)
598600
o.maybeRecordObservation(nil, addrs[i])
599-
o.RemoveConn(c)
601+
o.removeConn(c)
600602
}
601603
}
602604
})

x/simlibp2p/libp2p.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func newBlankHost(opts BlankHostOpts) (*wrappedHost, error) {
170170

171171
host := blankhost.NewBlankHost(swarm, blankhost.WithEventBus(eb), blankhost.WithConnectionManager(cm))
172172

173-
idService, err := identify.NewIDService(host, identify.DisableObservedAddrManager())
173+
idService, err := identify.NewIDService(host)
174174
if err != nil {
175175
return nil, err
176176
}

0 commit comments

Comments
 (0)