diff --git a/graph/builder.go b/graph/builder.go index 59e9b19d5d..aa92173d07 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -499,6 +499,26 @@ func (b *Builder) IsZombieChannel(updateTime1, return e1Zombie && e2Zombie } +// IsZombieByAge checks if a channel is a zombie by its age. It uses the +// timestamp of the block of the transaction that opened the channel. We use +// this only for channels that have no edge policies, as we can't use the last +// update timestamp to determine if the channel is a zombie. +func (b *Builder) IsZombieByAge(scid uint64) (bool, error) { + blockHeight := lnwire.NewShortChanIDFromInt(scid).BlockHeight + + blockhash, err := b.cfg.Chain.GetBlockHash(int64(blockHeight)) + if err != nil { + return false, err + } + + header, err := b.cfg.Chain.GetBlockHeader(blockhash) + if err != nil { + return false, err + } + + return time.Since(header.Timestamp) >= b.cfg.ChannelPruneExpiry, nil +} + // pruneZombieChans is a method that will be called periodically to prune out // any "zombie" channels. We consider channels zombies if *both* edges haven't // been updated since our zombie horizon. If AssumeChannelValid is present, @@ -536,6 +556,28 @@ func (b *Builder) pruneZombieChans() error { return nil } + // If both edges are nil, then we'll check if the channel is a + // zombie that has been opened for long and never received a + // policy update. + if e1 == nil && e2 == nil { + isZombie, err := b.IsZombieByAge(info.ChannelID) + if err != nil { + return fmt.Errorf("unable to check if "+ + "channel is a zombie: %w", err) + } + + if isZombie { + log.Trace("Channel with chan_id=%v is zombie", + info.ChannelID) + + chansToPrune[info.ChannelID] = struct{}{} + } + + // We've handled channels with no policies, so we can + // exit early to process the next channel. + return nil + } + e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2) if e1Zombie { diff --git a/graph/builder_test.go b/graph/builder_test.go index 0461c4fdc6..063e57552e 100644 --- a/graph/builder_test.go +++ b/graph/builder_test.go @@ -884,6 +884,82 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) { } } +// TestIsZombieByAge tests that we can properly determine if a channel with no +// edge policies is a zombie or not using the block timestamp that the +// transaction that opened the channel was included in. +// +//nolint:ll +func TestIsZombieByAge(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + blockTimestamp time.Time + channelPruneExpiry time.Duration + expectedPrune bool + }{ + { + name: "old chan", + blockTimestamp: time.Now().Add(-30 * 24 * time.Hour), + channelPruneExpiry: 14 * 24 * time.Hour, + expectedPrune: true, + }, + { + name: "recent channel", + blockTimestamp: time.Now().Add(-7 * 24 * time.Hour), + channelPruneExpiry: 14 * 24 * time.Hour, + expectedPrune: false, + }, + { + name: "chan at threshold", + blockTimestamp: time.Now().Add(-14 * 24 * time.Hour), + channelPruneExpiry: 14 * 24 * time.Hour, + expectedPrune: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create mock chain with a starting height. + const startingHeight = 100 + mockChain := newMockChain(startingHeight) + + // Create a block with the desired timestamp. + block := &wire.MsgBlock{ + Header: wire.BlockHeader{ + Timestamp: tc.blockTimestamp, + }, + } + + // Add the block at a specific height. + const channelBlockHeight = 101 + mockChain.addBlock(block, channelBlockHeight, 0) + + scid := lnwire.ShortChannelID{ + BlockHeight: channelBlockHeight, + TxIndex: 0, + TxPosition: 0, + } + + // Create a minimal builder config that consist the mock + // chain and the channel prune expiry we set. + cfg := &Config{ + Chain: mockChain, + ChannelPruneExpiry: tc.channelPruneExpiry, + } + builder := &Builder{ + cfg: cfg, + } + + // Test the method to see we are able to determine if + // the channel is a zombie or not. + isZombie, err := builder.IsZombieByAge(scid.ToUint64()) + require.NoError(t, err) + require.Equal(t, tc.expectedPrune, isZombie) + }) + } +} + // TestPruneChannelGraphDoubleDisabled test that we can properly prune channels // with both edges disabled from our channel graph. func TestPruneChannelGraphDoubleDisabled(t *testing.T) { diff --git a/graph/db/graph_sql_test.go b/graph/db/graph_sql_test.go new file mode 100644 index 0000000000..4b83d7d4ed --- /dev/null +++ b/graph/db/graph_sql_test.go @@ -0,0 +1,138 @@ +//go:build test_db_postgres || test_db_sqlite + +package graphdb + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/stretchr/testify/require" +) + +// TestChanUpdatesInHorizonWithNoPolicies tests that we're able to properly +// retrieve channels with no policies within the time range. +func TestChanUpdatesInHorizonWithNoPolicies(t *testing.T) { + t.Parallel() + ctx := t.Context() + + graph := MakeTestGraph(t) + + // We'll start by creating two nodes which will seed our test graph. + node1 := createTestVertex(t) + require.NoError(t, graph.AddNode(ctx, node1)) + + node2 := createTestVertex(t) + require.NoError(t, graph.AddNode(ctx, node2)) + + // Note: startTime and endTime only works if the channels have + // policies. If not, the channels are included irrespective of the + // time range. + startTime := time.Unix(1234, 0) + endTime := startTime + edges := make([]ChannelEdge, 0, 10) + + // We'll now create 10 channels between the two nodes, with no policies. + const numChans = 10 + for i := range numChans { + channel, chanID := createEdge( + uint32(i*10), 0, 0, 0, node1, node2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, &channel)) + + // The first 5 channels will have no policies. + if i < numChans/2 { + edges = append(edges, ChannelEdge{ + Info: &channel, + }) + + continue + } + + edge1UpdateTime := endTime + edge2UpdateTime := edge1UpdateTime.Add(time.Second) + endTime = endTime.Add(time.Second * 10) + + edge1 := newEdgePolicy( + chanID.ToUint64(), edge1UpdateTime.Unix(), + ) + edge1.ChannelFlags = 0 + edge1.ToNode = node2.PubKeyBytes + edge1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge1)) + + edge2 := newEdgePolicy( + chanID.ToUint64(), edge2UpdateTime.Unix(), + ) + edge2.ChannelFlags = 1 + edge2.ToNode = node1.PubKeyBytes + edge2.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge2)) + + edges = append(edges, ChannelEdge{ + Info: &channel, + Policy1: edge1, + Policy2: edge2, + }) + } + + // With our channels loaded, we'll now start our series of queries. + queryCases := []struct { + start time.Time + end time.Time + resp []ChannelEdge + }{ + // If we query for a time range that's strictly below our set + // of updates, then we'll get only the 5 channels with no + // policies. + { + start: time.Unix(100, 0), + end: time.Unix(200, 0), + resp: edges[:5], + }, + + // If we query for a time range that's well beyond our set of + // updates, we should get only the 5 channels with no + // policies. + { + start: time.Unix(99999, 0), + end: time.Unix(999999, 0), + resp: edges[:5], + }, + + // If we query for the start time, and 10 seconds directly + // after it, we should only get the 5 channels with no + // policies and one channel with a policy. + { + start: time.Unix(1234, 0), + end: startTime.Add(time.Second * 10), + resp: edges[:6], + }, + + // If we use the start and end time as is, we should get the + // entire range. + { + start: startTime, + end: endTime, + + resp: edges[:10], + }, + } + + for _, queryCase := range queryCases { + respIter := graph.ChanUpdatesInHorizon( + queryCase.start, queryCase.end, + ) + + resp, err := fn.CollectErr(respIter) + require.NoError(t, err) + require.Equal(t, len(resp), len(queryCase.resp)) + + for i := range len(resp) { + chanExp := queryCase.resp[i] + chanRet := resp[i] + + require.Equal(t, chanExp.Info, chanRet.Info) + } + } +} diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f67894e4ce..8bec6594ff 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1098,6 +1098,11 @@ func (s *SQLStore) updateChanCacheBatch(edgesToCache map[uint64]ChannelEdge) { // 5. Update cache after successful batch // 6. Repeat with updated pagination cursor until no more results // +// Note: Ideally each channel should have at least one policy. However, if a +// channel is created and never updated, it will not have any policies. +// In this case, we'll return the channel with no policies at all regardless of +// the time range. This helps us prune zombie channels with no policies. +// // NOTE: This is part of the V1Store interface. func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time, opts ...IteratorOption) iter.Seq2[ChannelEdge, error] { diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 9c27027375..c62bacd5de 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -1142,6 +1142,10 @@ WHERE c.version = $1 (cp1.last_update >= $2 AND cp1.last_update < $3) OR (cp2.last_update >= $2 AND cp2.last_update < $3) + -- TODO(abdulkbk): see the potential of adding a created_at to channel + -- table to extend this query to include channels created in a time range. + OR + (cp1.last_update IS NULL AND cp2.last_update IS NULL) ) -- Pagination using compound cursor (max_update_time, id). -- We use COALESCE with -1 as sentinel since timestamps are always positive. diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 19087fc1bd..4a42cbbeac 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -471,6 +471,10 @@ WHERE c.version = @version (cp1.last_update >= @start_time AND cp1.last_update < @end_time) OR (cp2.last_update >= @start_time AND cp2.last_update < @end_time) + -- TODO(abdulkbk): see the potential of adding a created_at to channel + -- table to extend this query to include channels created in a time range. + OR + (cp1.last_update IS NULL AND cp2.last_update IS NULL) ) -- Pagination using compound cursor (max_update_time, id). -- We use COALESCE with -1 as sentinel since timestamps are always positive.