Skip to content

Commit 503dd29

Browse files
committed
sweeper: implement TriggerSweep
1 parent 574c093 commit 503dd29

File tree

3 files changed

+262
-2
lines changed

3 files changed

+262
-2
lines changed

sweep/mock_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func (m *MockBumper) Broadcast(req *BumpRequest) <-chan *BumpResult {
291291
return nil
292292
}
293293

294-
return args.Get(0).(chan *BumpResult)
294+
return args.Get(0).(<-chan *BumpResult)
295295
}
296296

297297
// MockFeeFunction is a mock implementation of the FeeFunction interface.

sweep/sweeper.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,12 @@ type updateResp struct {
326326
err error
327327
}
328328

329+
// triggerSweepReq is an internal message we'll use to represent an external
330+
// caller's intent to trigger an immediate sweep of all pending inputs.
331+
type triggerSweepReq struct {
332+
respChan chan int
333+
}
334+
329335
// UtxoSweeper is responsible for sweeping outputs back into the wallet
330336
type UtxoSweeper struct {
331337
started uint32 // To be used atomically.
@@ -349,6 +355,10 @@ type UtxoSweeper struct {
349355
// callers who wish to bump the fee rate of a given input.
350356
updateReqs chan *updateReq
351357

358+
// triggerSweepReqs is a channel that will be sent requests by external
359+
// callers who wish to trigger an immediate sweep of all pending inputs.
360+
triggerSweepReqs chan *triggerSweepReq
361+
352362
// inputs is the total set of inputs the UtxoSweeper has been requested
353363
// to sweep.
354364
inputs InputsMap
@@ -451,6 +461,7 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
451461
spendChan: make(chan *chainntnfs.SpendDetail),
452462
updateReqs: make(chan *updateReq),
453463
pendingSweepsReqs: make(chan *pendingSweepsReq),
464+
triggerSweepReqs: make(chan *triggerSweepReq),
454465
quit: make(chan struct{}),
455466
inputs: make(InputsMap),
456467
bumpRespChan: make(chan *bumpResp, 100),
@@ -711,6 +722,25 @@ func (s *UtxoSweeper) collector() {
711722
err)
712723
}
713724

725+
// A new external request has been received to trigger an
726+
// immediate sweep of all pending inputs.
727+
case req := <-s.triggerSweepReqs:
728+
// Update the inputs with the latest height.
729+
inputs := s.updateSweeperInputs()
730+
731+
// Mark all inputs as immediate so they are broadcast
732+
// right away. This is necessary for testing where we
733+
// want to deterministically trigger sweeps.
734+
for _, inp := range inputs {
735+
inp.params.Immediate = true
736+
}
737+
738+
// Attempt to sweep any pending inputs.
739+
s.sweepPendingInputs(inputs)
740+
741+
// Send back the number of inputs we attempted to sweep.
742+
req.respChan <- len(inputs)
743+
714744
// A new block comes in, update the bestHeight, perform a check
715745
// over all pending inputs and publish sweeping txns if needed.
716746
case beat := <-s.BlockbeatChan:
@@ -1203,6 +1233,23 @@ func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
12031233
return s.cfg.Store.ListSweeps()
12041234
}
12051235

1236+
// TriggerSweep triggers an immediate attempt to create and broadcast sweep
1237+
// transactions for all pending inputs. This is useful for testing to
1238+
// deterministically control when sweeps are broadcast. This method is
1239+
// thread-safe as it sends a message to the collector goroutine's event loop.
1240+
func (s *UtxoSweeper) TriggerSweep() int {
1241+
req := &triggerSweepReq{
1242+
respChan: make(chan int, 1),
1243+
}
1244+
1245+
select {
1246+
case s.triggerSweepReqs <- req:
1247+
return <-req.respChan
1248+
case <-s.quit:
1249+
return 0
1250+
}
1251+
}
1252+
12061253
// mempoolLookup takes an input's outpoint and queries the mempool to see
12071254
// whether it's already been spent in a transaction found in the mempool.
12081255
// Returns the transaction if found.

sweep/sweeper_test.go

Lines changed: 214 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,76 @@ func createMockInput(t *testing.T, s *UtxoSweeper,
7070
return inp
7171
}
7272

73+
// setupMatureMockInput configures a mock input to be mature and ready for
74+
// sweeping. This centralizes the common pattern of setting up RequiredLockTime,
75+
// BlocksToMaturity, HeightHint, and OutPoint mocks.
76+
func setupMatureMockInput(inp *input.MockInput, currentHeight int32,
77+
index uint32) {
78+
79+
inp.On("RequiredLockTime").Return(
80+
uint32(currentHeight), false).Maybe()
81+
inp.On("BlocksToMaturity").Return(uint32(0)).Maybe()
82+
inp.On("HeightHint").Return(uint32(currentHeight)).Maybe()
83+
inp.On("OutPoint").Return(wire.OutPoint{Index: index}).Maybe()
84+
}
85+
86+
// newTestSweeperConfig returns a UtxoSweeperConfig with common test settings.
87+
// This avoids repeating the GenSweepScript setup across multiple tests.
88+
func newTestSweeperConfig(wallet Wallet, aggregator UtxoAggregator,
89+
publisher Bumper) *UtxoSweeperConfig {
90+
91+
return &UtxoSweeperConfig{
92+
Wallet: wallet,
93+
Aggregator: aggregator,
94+
Publisher: publisher,
95+
GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] {
96+
return fn.Ok(lnwallet.AddrWithKey{
97+
DeliveryAddress: testPubKey.SerializeCompressed(),
98+
})
99+
},
100+
NoDeadlineConfTarget: uint32(DefaultDeadlineDelta),
101+
}
102+
}
103+
104+
// mockInputSet sets up a standard input set mock with common expectations.
105+
// This centralizes the repetitive pattern of mocking input set methods.
106+
func mockInputSet(set *MockInputSet, inputs []input.Input) {
107+
set.On("Inputs").Return(inputs).Maybe()
108+
set.On("NeedWalletInput").Return(false).Once()
109+
set.On("DeadlineHeight").Return(int32(testHeight)).Once()
110+
set.On("Budget").Return(btcutil.Amount(1)).Once()
111+
set.On("StartingFeeRate").Return(
112+
fn.None[chainfee.SatPerKWeight]()).Once()
113+
set.On("Immediate").Return(true).Once()
114+
}
115+
116+
// mockAggregatorCluster sets up a standard ClusterInputs mock expectation.
117+
func mockAggregatorCluster(aggregator *mockUtxoAggregator,
118+
sets []InputSet) {
119+
120+
aggregator.On("ClusterInputs", mock.Anything).Return(sets).Once()
121+
}
122+
123+
// mockPublisherBroadcast sets up a standard Broadcast mock expectation.
124+
func mockPublisherBroadcast(publisher *MockBumper) {
125+
publisher.On("Broadcast", mock.Anything).Return(
126+
make(<-chan *BumpResult)).Once()
127+
}
128+
129+
// setupMockInputSetForSweep configures a MockInputSet with the standard mocks
130+
// needed for sweep operations, allowing customization of the immediate flag.
131+
func setupMockInputSetForSweep(set *MockInputSet, inputs []input.Input,
132+
immediate bool) {
133+
134+
set.On("Inputs").Return(inputs).Maybe()
135+
set.On("NeedWalletInput").Return(false).Once()
136+
set.On("DeadlineHeight").Return(int32(testHeight)).Once()
137+
set.On("Budget").Return(btcutil.Amount(1)).Once()
138+
set.On("StartingFeeRate").Return(
139+
fn.None[chainfee.SatPerKWeight]()).Once()
140+
set.On("Immediate").Return(immediate).Once()
141+
}
142+
73143
// TestMarkInputsPendingPublish checks that given a list of inputs with
74144
// different states, only the non-terminal state will be marked as `Published`.
75145
func TestMarkInputsPendingPublish(t *testing.T) {
@@ -79,7 +149,6 @@ func TestMarkInputsPendingPublish(t *testing.T) {
79149

80150
// Create a test sweeper.
81151
s := New(&UtxoSweeperConfig{})
82-
83152
// Create a mock input set.
84153
set := &MockInputSet{}
85154
defer set.AssertExpectations(t)
@@ -1446,3 +1515,147 @@ func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) {
14461515
// Assert the state of the input is updated.
14471516
require.Equal(t, PublishFailed, s.inputs[op2].state)
14481517
}
1518+
1519+
// TestTriggerSweep checks that the `TriggerSweep` method correctly triggers an
1520+
// immediate sweep of all pending inputs and returns the count of inputs
1521+
// attempted.
1522+
func TestTriggerSweep(t *testing.T) {
1523+
t.Parallel()
1524+
1525+
require := require.New(t)
1526+
1527+
wallet := &MockWallet{}
1528+
defer wallet.AssertExpectations(t)
1529+
aggregator := &mockUtxoAggregator{}
1530+
defer aggregator.AssertExpectations(t)
1531+
publisher := &MockBumper{}
1532+
defer publisher.AssertExpectations(t)
1533+
1534+
s := New(newTestSweeperConfig(wallet, aggregator, publisher))
1535+
1536+
inp1 := &input.MockInput{}
1537+
defer inp1.AssertExpectations(t)
1538+
inp2 := &input.MockInput{}
1539+
defer inp2.AssertExpectations(t)
1540+
inp3 := &input.MockInput{}
1541+
defer inp3.AssertExpectations(t)
1542+
1543+
setupMatureMockInput(inp1, s.currentHeight, 1)
1544+
setupMatureMockInput(inp2, s.currentHeight, 2)
1545+
setupMatureMockInput(inp3, s.currentHeight, 3)
1546+
1547+
input1 := &SweeperInput{state: Init, Input: inp1}
1548+
input2 := &SweeperInput{state: PublishFailed, Input: inp2}
1549+
input3 := &SweeperInput{state: PendingPublish, Input: inp3}
1550+
1551+
s.inputs = map[wire.OutPoint]*SweeperInput{
1552+
{Index: 1}: input1,
1553+
{Index: 2}: input2,
1554+
{Index: 3}: input3,
1555+
}
1556+
1557+
s.wg.Add(1)
1558+
go s.collector()
1559+
1560+
set := &MockInputSet{}
1561+
defer set.AssertExpectations(t)
1562+
setupMockInputSetForSweep(set, []input.Input{inp1, inp2}, true)
1563+
mockAggregatorCluster(aggregator, []InputSet{set})
1564+
mockPublisherBroadcast(publisher)
1565+
1566+
numInputs := s.TriggerSweep()
1567+
1568+
// Only Init and PublishFailed states are sweepable.
1569+
require.Equal(2, numInputs)
1570+
1571+
close(s.quit)
1572+
s.wg.Wait()
1573+
}
1574+
1575+
// TestTriggerSweepNoInputs checks that `TriggerSweep` returns 0 when there are
1576+
// no pending inputs to sweep.
1577+
func TestTriggerSweepNoInputs(t *testing.T) {
1578+
t.Parallel()
1579+
1580+
require := require.New(t)
1581+
1582+
aggregator := &mockUtxoAggregator{}
1583+
defer aggregator.AssertExpectations(t)
1584+
1585+
s := New(&UtxoSweeperConfig{Aggregator: aggregator})
1586+
mockAggregatorCluster(aggregator, []InputSet{})
1587+
1588+
s.wg.Add(1)
1589+
go s.collector()
1590+
1591+
numInputs := s.TriggerSweep()
1592+
1593+
require.Equal(0, numInputs)
1594+
1595+
close(s.quit)
1596+
s.wg.Wait()
1597+
}
1598+
1599+
// TestTriggerSweepSweeperShutdown checks that `TriggerSweep` returns 0 when
1600+
// the sweeper is shutting down.
1601+
func TestTriggerSweepSweeperShutdown(t *testing.T) {
1602+
t.Parallel()
1603+
1604+
require := require.New(t)
1605+
1606+
s := New(&UtxoSweeperConfig{})
1607+
close(s.quit)
1608+
1609+
numInputs := s.TriggerSweep()
1610+
1611+
require.Equal(0, numInputs)
1612+
}
1613+
1614+
// TestTriggerSweepMarksImmediate checks that `TriggerSweep` marks all inputs
1615+
// as immediate before sweeping.
1616+
func TestTriggerSweepMarksImmediate(t *testing.T) {
1617+
t.Parallel()
1618+
1619+
require := require.New(t)
1620+
1621+
aggregator := &mockUtxoAggregator{}
1622+
defer aggregator.AssertExpectations(t)
1623+
publisher := &MockBumper{}
1624+
defer publisher.AssertExpectations(t)
1625+
1626+
s := New(newTestSweeperConfig(nil, aggregator, publisher))
1627+
1628+
inp := &input.MockInput{}
1629+
defer inp.AssertExpectations(t)
1630+
setupMatureMockInput(inp, s.currentHeight, 1)
1631+
1632+
input1 := &SweeperInput{
1633+
state: Init,
1634+
Input: inp,
1635+
params: Params{
1636+
Immediate: false,
1637+
},
1638+
}
1639+
1640+
s.inputs = map[wire.OutPoint]*SweeperInput{
1641+
{Index: 1}: input1,
1642+
}
1643+
1644+
s.wg.Add(1)
1645+
go s.collector()
1646+
1647+
set := &MockInputSet{}
1648+
defer set.AssertExpectations(t)
1649+
setupMockInputSetForSweep(set, []input.Input{inp}, true)
1650+
mockAggregatorCluster(aggregator, []InputSet{set})
1651+
mockPublisherBroadcast(publisher)
1652+
1653+
numInputs := s.TriggerSweep()
1654+
1655+
// Verify TriggerSweep marked the input as immediate.
1656+
require.True(input1.params.Immediate)
1657+
require.Equal(1, numInputs)
1658+
1659+
close(s.quit)
1660+
s.wg.Wait()
1661+
}

0 commit comments

Comments
 (0)