-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
Copy pathmodification_interceptor.go
199 lines (159 loc) · 5.94 KB
/
modification_interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package invoices
import (
"errors"
"fmt"
"sync/atomic"
"github.com/lightningnetwork/lnd/fn/v2"
)
var (
// ErrInterceptorClientAlreadyConnected is an error that is returned
// when a client tries to connect to the interceptor service while
// another client is already connected.
ErrInterceptorClientAlreadyConnected = errors.New(
"interceptor client already connected",
)
// ErrInterceptorClientDisconnected is an error that is returned when
// the client disconnects during an interceptor session.
ErrInterceptorClientDisconnected = errors.New(
"interceptor client disconnected",
)
)
// safeCallback is a wrapper around a callback function that is safe for
// concurrent access.
type safeCallback struct {
// callback is the actual callback function that is called when an
// invoice is intercepted. This might be nil if no client is currently
// connected.
callback atomic.Pointer[HtlcModifyCallback]
}
// Set atomically sets the callback function. If a callback is already set, an
// error is returned. The returned function can be used to reset the callback to
// nil once the client is done.
func (s *safeCallback) Set(callback HtlcModifyCallback) (func(), error) {
if !s.callback.CompareAndSwap(nil, &callback) {
return nil, ErrInterceptorClientAlreadyConnected
}
return func() {
s.callback.Store(nil)
}, nil
}
// IsConnected returns true if a client is currently connected.
func (s *safeCallback) IsConnected() bool {
return s.callback.Load() != nil
}
// Exec executes the callback function if it is set. If the callback is not set,
// an error is returned.
func (s *safeCallback) Exec(req HtlcModifyRequest) (*HtlcModifyResponse,
error) {
callback := s.callback.Load()
if callback == nil {
return nil, ErrInterceptorClientDisconnected
}
return (*callback)(req)
}
// HtlcModificationInterceptor is a service that intercepts HTLCs that aim to
// settle an invoice, enabling a subscribed client to modify certain aspects of
// those HTLCs.
type HtlcModificationInterceptor struct {
started atomic.Bool
stopped atomic.Bool
// callback is the wrapped client callback function that is called when
// an invoice is intercepted. This function gives the client the ability
// to determine how the invoice should be settled.
callback *safeCallback
// quit is a channel that is closed when the interceptor is stopped.
quit chan struct{}
}
// NewHtlcModificationInterceptor creates a new HtlcModificationInterceptor.
func NewHtlcModificationInterceptor() *HtlcModificationInterceptor {
return &HtlcModificationInterceptor{
callback: &safeCallback{},
quit: make(chan struct{}),
}
}
// Intercept generates a new intercept session for the given invoice. The call
// blocks until the client has responded to the request or an error occurs. The
// response callback is only called if a session was created in the first place,
// which is only the case if a client is registered.
func (s *HtlcModificationInterceptor) Intercept(clientRequest HtlcModifyRequest,
responseCallback func(HtlcModifyResponse)) error {
// If there is no client callback set we will not handle the invoice
// further.
if !s.callback.IsConnected() {
log.Debugf("Not intercepting invoice with circuit key %v, no "+
"intercept client connected",
clientRequest.ExitHtlcCircuitKey)
return nil
}
// We'll block until the client has responded to the request or an error
// occurs.
var (
responseChan = make(chan *HtlcModifyResponse, 1)
errChan = make(chan error, 1)
)
// The callback function will block at the client's discretion. We will
// therefore execute it in a separate goroutine. We don't need a wait
// group because we wait for the response directly below. The caller
// needs to make sure they don't block indefinitely, by selecting on the
// quit channel they receive when registering the callback.
go func() {
log.Debugf("Waiting for client response from invoice HTLC "+
"interceptor session with circuit key %v",
clientRequest.ExitHtlcCircuitKey)
// By this point, we've already checked that the client callback
// is set. However, if the client disconnected since that check
// then Exec will return an error.
result, err := s.callback.Exec(clientRequest)
if err != nil {
_ = fn.SendOrQuit(errChan, err, s.quit)
return
}
_ = fn.SendOrQuit(responseChan, result, s.quit)
}()
// Wait for the client to respond or an error to occur.
select {
case response := <-responseChan:
responseCallback(*response)
return nil
case err := <-errChan:
log.Errorf("Error from invoice HTLC interceptor session: %v",
err)
return err
case <-s.quit:
return ErrInterceptorClientDisconnected
}
}
// RegisterInterceptor sets the client callback function that will be called
// when an invoice is intercepted. If a callback is already set, an error is
// returned. The returned function must be used to reset the callback to nil
// once the client is done or disconnects.
func (s *HtlcModificationInterceptor) RegisterInterceptor(
callback HtlcModifyCallback) (func(), <-chan struct{}, error) {
done, err := s.callback.Set(callback)
return done, s.quit, err
}
// Start starts the service.
func (s *HtlcModificationInterceptor) Start() error {
log.Info("HtlcModificationInterceptor starting...")
if !s.started.CompareAndSwap(false, true) {
return fmt.Errorf("HtlcModificationInterceptor started more" +
"than once")
}
log.Debugf("HtlcModificationInterceptor started")
return nil
}
// Stop stops the service.
func (s *HtlcModificationInterceptor) Stop() error {
log.Info("HtlcModificationInterceptor stopping...")
if !s.stopped.CompareAndSwap(false, true) {
return fmt.Errorf("HtlcModificationInterceptor stopped more" +
"than once")
}
close(s.quit)
log.Debug("HtlcModificationInterceptor stopped")
return nil
}
// Ensure that HtlcModificationInterceptor implements the HtlcInterceptor and
// HtlcModifier interfaces.
var _ HtlcInterceptor = (*HtlcModificationInterceptor)(nil)
var _ HtlcModifier = (*HtlcModificationInterceptor)(nil)