@@ -2,6 +2,7 @@ package discovery
22
33import (
44 "context"
5+ "iter"
56 "time"
67
78 "github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -30,8 +31,8 @@ type ChannelGraphTimeSeries interface {
3031 // update timestamp between the start time and end time. We'll use this
3132 // to catch up a remote node to the set of channel updates that they
3233 // may have missed out on within the target chain.
33- UpdatesInHorizon (chain chainhash.Hash ,
34- startTime time. Time , endTime time.Time ) ([] lnwire.Message , error )
34+ UpdatesInHorizon (chain chainhash.Hash , startTime time. Time ,
35+ endTime time.Time ) iter. Seq2 [ lnwire.Message , error ]
3536
3637 // FilterKnownChanIDs takes a target chain, and a set of channel ID's,
3738 // and returns a filtered set of chan ID's. This filtered set of chan
@@ -108,140 +109,100 @@ func (c *ChanSeries) HighestChanID(ctx context.Context,
108109//
109110// NOTE: This is part of the ChannelGraphTimeSeries interface.
110111func (c * ChanSeries ) UpdatesInHorizon (chain chainhash.Hash ,
111- startTime time. Time , endTime time.Time ) ([] lnwire.Message , error ) {
112+ startTime , endTime time.Time ) iter. Seq2 [ lnwire.Message , error ] {
112113
113- var updates []lnwire.Message
114-
115- // First, we'll query for all the set of channels that have an update
116- // that falls within the specified horizon.
117- chansInHorizon , err := c .graph .ChanUpdatesInHorizon (
118- startTime , endTime ,
119- )
120- if err != nil {
121- return nil , err
122- }
123-
124- // nodesFromChan records the nodes seen from the channels.
125- nodesFromChan := make (map [[33 ]byte ]struct {}, len (chansInHorizon )* 2 )
126-
127- for _ , channel := range chansInHorizon {
128- // If the channel hasn't been fully advertised yet, or is a
129- // private channel, then we'll skip it as we can't construct a
130- // full authentication proof if one is requested.
131- if channel .Info .AuthProof == nil {
132- continue
133- }
134-
135- chanAnn , edge1 , edge2 , err := netann .CreateChanAnnouncement (
136- channel .Info .AuthProof , channel .Info , channel .Policy1 ,
137- channel .Policy2 ,
114+ return func (yield func (lnwire.Message , error ) bool ) {
115+ // First, we'll query for all the set of channels that have an
116+ // update that falls within the specified horizon.
117+ chansInHorizon := c .graph .ChanUpdatesInHorizon (
118+ startTime , endTime ,
138119 )
139- if err != nil {
140- return nil , err
141- }
142120
143- // Create a slice to hold the `channel_announcement` and
144- // potentially two `channel_update` msgs.
145- //
146- // NOTE: Based on BOLT7, if a channel_announcement has no
147- // corresponding channel_updates, we must not send the
148- // channel_announcement. Thus we use this slice to decide we
149- // want to send this `channel_announcement` or not. By the end
150- // of the operation, if the len of the slice is 1, we will not
151- // send the `channel_announcement`. Otherwise, when sending the
152- // msgs, the `channel_announcement` must be sent prior to any
153- // corresponding `channel_update` or `node_annoucement`, that's
154- // why we create a slice here to maintain the order.
155- chanUpdates := make ([]lnwire.Message , 0 , 3 )
156- chanUpdates = append (chanUpdates , chanAnn )
157-
158- if edge1 != nil {
159- // We don't want to send channel updates that don't
160- // conform to the spec (anymore).
161- err := netann .ValidateChannelUpdateFields (0 , edge1 )
121+ for channel , err := range chansInHorizon {
162122 if err != nil {
163- log .Errorf ("not sending invalid channel " +
164- "update %v: %v" , edge1 , err )
165- } else {
166- chanUpdates = append (chanUpdates , edge1 )
123+ yield (nil , err )
124+ return
167125 }
168- }
169-
170- if edge2 != nil {
171- err := netann .ValidateChannelUpdateFields (0 , edge2 )
172- if err != nil {
173- log .Errorf ("not sending invalid channel " +
174- "update %v: %v" , edge2 , err )
175- } else {
176- chanUpdates = append (chanUpdates , edge2 )
126+ // If the channel hasn't been fully advertised yet, or
127+ // is a private channel, then we'll skip it as we can't
128+ // construct a full authentication proof if one is
129+ // requested.
130+ if channel .Info .AuthProof == nil {
131+ continue
177132 }
178- }
179133
180- // If there's no corresponding `channel_update` to send, skip
181- // sending this `channel_announcement`.
182- if len (chanUpdates ) < 2 {
183- continue
184- }
185-
186- // Append the all the msgs to the slice.
187- updates = append (updates , chanUpdates ... )
188-
189- // Record the nodes seen.
190- nodesFromChan [channel .Info .NodeKey1Bytes ] = struct {}{}
191- nodesFromChan [channel .Info .NodeKey2Bytes ] = struct {}{}
192- }
193-
194- // Next, we'll send out all the node announcements that have an update
195- // within the horizon as well. We send these second to ensure that they
196- // follow any active channels they have.
197- nodeAnnsInHorizon , err := c .graph .NodeUpdatesInHorizon (
198- startTime , endTime ,
199- )
200- if err != nil {
201- return nil , err
202- }
134+ //nolint:ll
135+ chanAnn , edge1 , edge2 , err := netann .CreateChanAnnouncement (
136+ channel .Info .AuthProof , channel .Info ,
137+ channel .Policy1 , channel .Policy2 ,
138+ )
139+ if err != nil {
140+ if ! yield (nil , err ) {
141+ return
142+ }
203143
204- for _ , nodeAnn := range nodeAnnsInHorizon {
205- // If this node has not been seen in the above channels, we can
206- // skip sending its NodeAnnouncement.
207- if _ , seen := nodesFromChan [nodeAnn .PubKeyBytes ]; ! seen {
208- log .Debugf ("Skipping forwarding as node %x not found " +
209- "in channel announcement" , nodeAnn .PubKeyBytes )
210- continue
211- }
144+ continue
145+ }
212146
213- // Ensure we only forward nodes that are publicly advertised to
214- // prevent leaking information about nodes.
215- isNodePublic , err := c .graph .IsPublicNode (nodeAnn .PubKeyBytes )
216- if err != nil {
217- log .Errorf ("Unable to determine if node %x is " +
218- "advertised: %v" , nodeAnn .PubKeyBytes , err )
219- continue
220- }
147+ if ! yield (chanAnn , nil ) {
148+ return
149+ }
221150
222- if ! isNodePublic {
223- log .Tracef ("Skipping forwarding announcement for " +
224- "node %x due to being unadvertised" ,
225- nodeAnn .PubKeyBytes )
226- continue
151+ // We don't want to send channel updates that don't
152+ // conform to the spec (anymore), so check to make sure
153+ // that these channel updates are valid before yielding
154+ // them.
155+ if edge1 != nil {
156+ err := netann .ValidateChannelUpdateFields (
157+ 0 , edge1 ,
158+ )
159+ if err != nil {
160+ log .Errorf ("not sending invalid " +
161+ "channel update %v: %v" ,
162+ edge1 , err )
163+ } else if ! yield (edge1 , nil ) {
164+ return
165+ }
166+ }
167+ if edge2 != nil {
168+ err := netann .ValidateChannelUpdateFields (
169+ 0 , edge2 ,
170+ )
171+ if err != nil {
172+ log .Errorf ("not sending invalid " +
173+ "channel update %v: %v" , edge2 ,
174+ err )
175+ } else if ! yield (edge2 , nil ) {
176+ return
177+ }
178+ }
227179 }
228180
229- nodeUpdate , err := nodeAnn .NodeAnnouncement (true )
230- if err != nil {
231- return nil , err
232- }
181+ // Next, we'll send out all the node announcements that have an
182+ // update within the horizon as well. We send these second to
183+ // ensure that they follow any active channels they have.
184+ nodeAnnsInHorizon := c .graph .NodeUpdatesInHorizon (
185+ startTime , endTime , graphdb .WithIterPublicNodesOnly (),
186+ )
187+ for nodeAnn , err := range nodeAnnsInHorizon {
188+ if err != nil {
189+ yield (nil , err )
190+ return
191+ }
192+ nodeUpdate , err := nodeAnn .NodeAnnouncement (true )
193+ if err != nil {
194+ if ! yield (nil , err ) {
195+ return
196+ }
233197
234- if err := netann .ValidateNodeAnnFields (nodeUpdate ); err != nil {
235- log .Debugf ("Skipping forwarding invalid node " +
236- "announcement %x: %v" , nodeAnn .PubKeyBytes , err )
198+ continue
199+ }
237200
238- continue
201+ if ! yield (nodeUpdate , nil ) {
202+ return
203+ }
239204 }
240-
241- updates = append (updates , nodeUpdate )
242205 }
243-
244- return updates , nil
245206}
246207
247208// FilterKnownChanIDs takes a target chain, and a set of channel ID's, and
0 commit comments