diff --git a/itest/assets_test.go b/itest/assets_test.go index cbe6b0731..49ac0ca71 100644 --- a/itest/assets_test.go +++ b/itest/assets_test.go @@ -1033,6 +1033,7 @@ func sendAssetKeySendPayment(t *testing.T, src, dst *HarnessNode, amt uint64, DestCustomRecords: customRecords, PaymentHash: hash[:], TimeoutSeconds: int32(PaymentTimeout.Seconds()), + MaxParts: cfg.maxShards, } request := &tchrpc.SendPaymentRequest{ @@ -1090,7 +1091,7 @@ func sendKeySendPayment(t *testing.T, src, dst *HarnessNode, stream, err := src.RouterClient.SendPaymentV2(ctxt, req) require.NoError(t, err) - result, err := getPaymentResult(stream) + result, err := getFinalPaymentResult(stream) require.NoError(t, err) require.Equal(t, lnrpc.Payment_SUCCEEDED, result.Status) } @@ -1135,6 +1136,12 @@ func createAndPayNormalInvoice(t *testing.T, src, rfqPeer, dst *HarnessNode, func payInvoiceWithSatoshi(t *testing.T, payer *HarnessNode, invoice *lnrpc.AddInvoiceResponse, opts ...payOpt) { + payPayReqWithSatoshi(t, payer, invoice.PaymentRequest, opts...) +} + +func payPayReqWithSatoshi(t *testing.T, payer *HarnessNode, payReq string, + opts ...payOpt) { + cfg := defaultPayConfig() for _, opt := range opts { opt(cfg) @@ -1145,15 +1152,30 @@ func payInvoiceWithSatoshi(t *testing.T, payer *HarnessNode, defer cancel() sendReq := &routerrpc.SendPaymentRequest{ - PaymentRequest: invoice.PaymentRequest, - TimeoutSeconds: int32(PaymentTimeout.Seconds()), - MaxShardSizeMsat: 80_000_000, - FeeLimitMsat: 1_000_000, + PaymentRequest: payReq, + TimeoutSeconds: int32(PaymentTimeout.Seconds()), + FeeLimitMsat: 1_000_000, + MaxParts: cfg.maxShards, + } + + if cfg.smallShards { + sendReq.MaxShardSizeMsat = 80_000_000 } + stream, err := payer.RouterClient.SendPaymentV2(ctxt, sendReq) require.NoError(t, err) - result, err := getPaymentResult(stream) + if cfg.payStatus == lnrpc.Payment_IN_FLIGHT { + t.Logf("Waiting for initial stream response...") + result, err := stream.Recv() + require.NoError(t, err) + + require.Equal(t, cfg.payStatus, result.Status) + + return + } + + result, err := getFinalPaymentResult(stream) if cfg.errSubStr != "" { require.ErrorContains(t, err, cfg.errSubStr) } else { @@ -1212,6 +1234,7 @@ func payInvoiceWithSatoshiLastHop(t *testing.T, payer *HarnessNode, type payConfig struct { smallShards bool + maxShards uint32 errSubStr string allowOverpay bool feeLimit lnwire.MilliSatoshi @@ -1240,6 +1263,12 @@ func withSmallShards() payOpt { } } +func withMaxShards(maxShards uint32) payOpt { + return func(c *payConfig) { + c.maxShards = maxShards + } +} + func withPayErrSubStr(errSubStr string) payOpt { return func(c *payConfig) { c.errSubStr = errSubStr @@ -1310,6 +1339,7 @@ func payInvoiceWithAssets(t *testing.T, payer, rfqPeer *HarnessNode, TimeoutSeconds: int32(PaymentTimeout.Seconds()), FeeLimitMsat: int64(cfg.feeLimit), DestCustomRecords: cfg.destCustomRecords, + MaxParts: cfg.maxShards, } if cfg.smallShards { @@ -1394,8 +1424,9 @@ func payInvoiceWithAssets(t *testing.T, payer, rfqPeer *HarnessNode, } type invoiceConfig struct { - errSubStr string - groupKey []byte + errSubStr string + groupKey []byte + routeHints []*lnrpc.RouteHint } func defaultInvoiceConfig() *invoiceConfig { @@ -2292,6 +2323,158 @@ func macFromBytes(macBytes []byte) (grpc.DialOption, error) { return grpc.WithPerRPCCredentials(cred), nil } +func assertMinNumHtlcs(t *testing.T, node *HarnessNode, expected int) { + t.Helper() + + ctxb := context.Background() + + err := wait.NoError(func() error { + listChansRequest := &lnrpc.ListChannelsRequest{} + listChansResp, err := node.ListChannels(ctxb, listChansRequest) + if err != nil { + return err + } + + var numHtlcs int + for _, channel := range listChansResp.Channels { + numHtlcs += len(channel.PendingHtlcs) + } + + if numHtlcs < expected { + return fmt.Errorf("expected %v HTLCs, got %v, %v", + expected, numHtlcs, + toProtoJSON(t, listChansResp)) + } + + return nil + }, defaultTimeout) + require.NoError(t, err) +} + +type subscribeEventsClient = routerrpc.Router_SubscribeHtlcEventsClient + +type htlcEventConfig struct { + timeout time.Duration + numEvents int + withLinkFailure bool + withForwardFailure bool + withFailureDetail routerrpc.FailureDetail +} + +func defaultHtlcEventConfig() *htlcEventConfig { + return &htlcEventConfig{ + timeout: defaultTimeout, + } +} + +type htlcEventOpt func(*htlcEventConfig) + +func withTimeout(timeout time.Duration) htlcEventOpt { + return func(config *htlcEventConfig) { + config.timeout = timeout + } +} + +func withNumEvents(numEvents int) htlcEventOpt { + return func(config *htlcEventConfig) { + config.numEvents = numEvents + } +} + +func withLinkFailure(detail routerrpc.FailureDetail) htlcEventOpt { + return func(config *htlcEventConfig) { + config.withLinkFailure = true + config.withFailureDetail = detail + } +} + +func withForwardFailure() htlcEventOpt { + return func(config *htlcEventConfig) { + config.withForwardFailure = true + } +} + +func assertHtlcEvents(t *testing.T, c subscribeEventsClient, + opts ...htlcEventOpt) { + + t.Helper() + + cfg := defaultHtlcEventConfig() + for _, opt := range opts { + opt(cfg) + } + + timeout := time.After(cfg.timeout) + events := make(chan *routerrpc.HtlcEvent) + + go func() { + defer close(events) + + for { + evt, err := c.Recv() + if err != nil { + t.Logf("Received HTLC event error: %v", err) + return + } + + select { + case events <- evt: + case <-timeout: + t.Logf("Htlc event receive timeout") + return + } + } + }() + + var numEvents int + for { + type ( + linkFailEvent = *routerrpc.HtlcEvent_LinkFailEvent + forwardFailEvent = *routerrpc.HtlcEvent_ForwardFailEvent + ) + + select { + case evt, ok := <-events: + if !ok { + t.Fatalf("Htlc event stream closed") + return + } + + if cfg.withLinkFailure { + linkEvent, ok := evt.Event.(linkFailEvent) + if !ok { + // We only count link failure events. + continue + } + + if linkEvent.LinkFailEvent.FailureDetail != + cfg.withFailureDetail { + + continue + } + } + + if cfg.withForwardFailure { + _, ok := evt.Event.(forwardFailEvent) + if !ok { + // We only count link failure events. + continue + } + } + + numEvents++ + + if numEvents == cfg.numEvents { + return + } + + case <-timeout: + t.Fatalf("Htlc event receive timeout") + return + } + } +} + func assertNumHtlcs(t *testing.T, node *HarnessNode, expected int) { t.Helper() diff --git a/itest/litd_accounts_test.go b/itest/litd_accounts_test.go index 891f4fb3a..f8729d977 100644 --- a/itest/litd_accounts_test.go +++ b/itest/litd_accounts_test.go @@ -416,12 +416,12 @@ func payNode(invoiceCtx, paymentCtx context.Context, t *harnessTest, stream, err := from.SendPaymentV2(paymentCtx, sendReq) require.NoError(t.t, err) - result, err := getPaymentResult(stream) + result, err := getFinalPaymentResult(stream) require.NoError(t.t, err) require.Equal(t.t, result.Status, lnrpc.Payment_SUCCEEDED) } -func getPaymentResult(stream routerrpc.Router_SendPaymentV2Client) ( +func getFinalPaymentResult(stream routerrpc.Router_SendPaymentV2Client) ( *lnrpc.Payment, error) { for { diff --git a/itest/litd_custom_channels_test.go b/itest/litd_custom_channels_test.go index 10449e518..c51470136 100644 --- a/itest/litd_custom_channels_test.go +++ b/itest/litd_custom_channels_test.go @@ -1,8 +1,8 @@ package itest import ( - "bytes" "context" + "crypto/rand" "fmt" "math" "math/big" @@ -28,9 +28,11 @@ import ( "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" + "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" "github.com/lightningnetwork/lnd/lntest/port" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/require" @@ -632,8 +634,8 @@ func testCustomChannels(ctx context.Context, net *NetworkHarness, // to rounding errors that happened when sending multiple shards with // MPP, we need to do some slight adjustments. charlieAssetBalance += 1 - erinAssetBalance += 4 - fabiaAssetBalance -= 4 + erinAssetBalance += 3 + fabiaAssetBalance -= 3 yaraAssetBalance -= 1 itest.AssertBalances( t.t, charlieTap, charlieAssetBalance, @@ -1074,8 +1076,8 @@ func testCustomChannelsGroupedAsset(ctx context.Context, net *NetworkHarness, // MPP, we need to do some slight adjustments. charlieAssetBalance += 2 daveAssetBalance -= 1 - erinAssetBalance += 4 - fabiaAssetBalance -= 4 + erinAssetBalance += 3 + fabiaAssetBalance -= 3 yaraAssetBalance -= 1 itest.AssertBalances( t.t, charlieTap, charlieAssetBalance, @@ -2239,12 +2241,12 @@ func testCustomChannelsBreach(ctx context.Context, net *NetworkHarness, t.Logf("Charlie balance after breach: %d", charlieBalance) } -// testCustomChannelsLiquidtyEdgeCasesCore is the core logic of the liquidity +// testCustomChannelsLiquidityEdgeCasesCore is the core logic of the liquidity // edge cases. This test goes through certain scenarios that expose edge cases // and behaviors that proved to be buggy in the past and have been directly // addressed. It accepts an extra parameter which dictates whether it should use // group keys or asset IDs. -func testCustomChannelsLiquidtyEdgeCasesCore(ctx context.Context, +func testCustomChannelsLiquidityEdgeCasesCore(ctx context.Context, net *NetworkHarness, t *harnessTest, groupMode bool) { lndArgs := slices.Clone(lndArgsTemplate) @@ -2719,29 +2721,128 @@ func testCustomChannelsLiquidtyEdgeCasesCore(ctx context.Context, // We now manually add the invoice in order to inject the above, // manually generated, quote. - iResp, err := charlie.AddInvoice(ctx, &lnrpc.Invoice{ - Memo: "", - Value: 200_000, - RPreimage: bytes.Repeat([]byte{11}, 32), - CltvExpiry: 60, - RouteHints: []*lnrpc.RouteHint{{ - HopHints: []*lnrpc.HopHint{{ - NodeId: dave.PubKeyStr, - ChanId: quote.AcceptedQuote.Scid, + hint := &lnrpc.HopHint{ + NodeId: dave.PubKeyStr, + ChanId: quote.AcceptedQuote.Scid, + CltvExpiryDelta: 80, + FeeBaseMsat: 1000, + FeeProportionalMillionths: 1, + } + var preimage lntypes.Preimage + _, _ = rand.Read(preimage[:]) + payHash = preimage.Hash() + iResp, err := charlie.AddHoldInvoice( + ctx, &invoicesrpc.AddHoldInvoiceRequest{ + Memo: "", + Value: 200_000, + Hash: payHash[:], + RouteHints: []*lnrpc.RouteHint{{ + HopHints: []*lnrpc.HopHint{hint}, }}, - }}, - }) + }, + ) + require.NoError(t.t, err) + + htlcStream, err := dave.RouterClient.SubscribeHtlcEvents( + ctx, &routerrpc.SubscribeHtlcEventsRequest{}, + ) require.NoError(t.t, err) // Now Erin tries to pay the invoice. Since rfq quote cannot satisfy the // total amount of the invoice this payment will fail. - payInvoiceWithSatoshi( - t.t, erin, iResp, withPayErrSubStr("context deadline exceeded"), - withFailure(lnrpc.Payment_FAILED, failureNone), - withGroupKey(groupID), + payPayReqWithSatoshi( + t.t, erin, iResp.PaymentRequest, + withFailure(lnrpc.Payment_IN_FLIGHT, failureNone), + withGroupKey(groupID), withMaxShards(4), + ) + + t.Logf("Asserting number of HTLCs on each node...") + assertMinNumHtlcs(t.t, dave, 2) + + t.Logf("Asserting HTLC events on Dave...") + assertHtlcEvents( + t.t, htlcStream, withNumEvents(1), withForwardFailure(), ) + _, err = charlie.InvoicesClient.CancelInvoice( + ctx, &invoicesrpc.CancelInvoiceMsg{ + PaymentHash: payHash[:], + }, + ) + require.NoError(t.t, err) + + assertNumHtlcs(t.t, dave, 0) + logBalance(t.t, nodes, assetID, "after small manual rfq") + + _ = htlcStream.CloseSend() + _, _ = erin.RouterClient.ResetMissionControl( + context.Background(), &routerrpc.ResetMissionControlRequest{}, + ) + + // Edge case: Fabia creates an invoice which Erin cannot satisfy with + // his side of asset liquidity. This tests that Erin will not try to + // add an HTLC with more asset units than what his local balance is. To + // validate that the channel is still healthy, we follow up with a + // smaller invoice payment which is meant to succeed. + + // We now create a hodl invoice on Fabia, for 125k assets. + hodlInv = createAssetHodlInvoice(t.t, erin, fabia, 125_000, assetID) + + htlcStream, err = erin.RouterClient.SubscribeHtlcEvents( + ctx, &routerrpc.SubscribeHtlcEventsRequest{}, + ) + require.NoError(t.t, err) + + // Charlie tries to pay, this is not meant to succeed, as Erin does not + // have enough assets to forward to Fabia. + payInvoiceWithAssets( + t.t, charlie, dave, hodlInv.payReq, assetID, + withFailure(lnrpc.Payment_IN_FLIGHT, failureNone), + ) + + // Let's check that at least 2 HTLCs were added on the Erin->Fabia link, + // which means that Erin would have an extra incoming HTLC for each + // outgoing one. So we expect a minimum of 4 HTLCs present on Erin. + assertMinNumHtlcs(t.t, erin, 4) + + // We also want to make sure that at least one failure occurred that + // hinted at the problem (not enough assets to forward). + assertHtlcEvents( + t.t, htlcStream, withNumEvents(1), + withLinkFailure(routerrpc.FailureDetail_INSUFFICIENT_BALANCE), + ) + + logBalance(t.t, nodes, assetID, "with min 4 present HTLCs") + + // Now Fabia cancels the invoice, this is meant to cancel back any + // locked in HTLCs and reset Erin's local balance back to its original + // value. + payHash = hodlInv.preimage.Hash() + _, err = fabia.InvoicesClient.CancelInvoice( + ctx, &invoicesrpc.CancelInvoiceMsg{ + PaymentHash: payHash[:], + }, + ) + require.NoError(t.t, err) + + // Let's assert that Erin cancelled all his HTLCs. + assertNumHtlcs(t.t, erin, 0) + + logBalance(t.t, nodes, assetID, "after hodl cancel & 0 present HTLCs") + + // Now let's create a smaller invoice and pay it, to validate that the + // channel is still healthy. + invoiceResp = createAssetInvoice(t.t, erin, fabia, 50_000, assetID) + + _, _ = charlie.RouterClient.ResetMissionControl( + context.Background(), &routerrpc.ResetMissionControlRequest{}, + ) + payInvoiceWithAssets( + t.t, charlie, dave, invoiceResp.PaymentRequest, assetID, + ) + + logBalance(t.t, nodes, assetID, "after safe asset htlc failure") } // testCustomChannelsLiquidityEdgeCases is a test that runs through some @@ -2751,7 +2852,7 @@ func testCustomChannelsLiquidityEdgeCases(ctx context.Context, // Run liquidity edge cases and only use single asset IDs for invoices // and payments. - testCustomChannelsLiquidtyEdgeCasesCore(ctx, net, t, false) + testCustomChannelsLiquidityEdgeCasesCore(ctx, net, t, false) } // testCustomChannelsLiquidityEdgeCasesGroup is a test that runs through some @@ -2761,7 +2862,7 @@ func testCustomChannelsLiquidityEdgeCasesGroup(ctx context.Context, // Run liquidity edge cases and only use group keys for invoices and // payments. - testCustomChannelsLiquidtyEdgeCasesCore(ctx, net, t, true) + testCustomChannelsLiquidityEdgeCasesCore(ctx, net, t, true) } // testCustomChannelsStrictForwarding is a test that tests the strict forwarding @@ -4090,14 +4191,18 @@ func testCustomChannelsForwardBandwidth(ctx context.Context, // We now manually add the invoice in order to inject the above, // manually generated, quote. + hopHint := &lnrpc.HopHint{ + NodeId: erin.PubKeyStr, + ChanId: quote.AcceptedQuote.Scid, + CltvExpiryDelta: 80, + FeeBaseMsat: 1000, + FeeProportionalMillionths: 1, + } invoiceResp2, err := fabia.AddInvoice(ctx, &lnrpc.Invoice{ Memo: "too small invoice", ValueMsat: int64(oneUnitMilliSat - 1), RouteHints: []*lnrpc.RouteHint{{ - HopHints: []*lnrpc.HopHint{{ - NodeId: erin.PubKeyStr, - ChanId: quote.AcceptedQuote.Scid, - }}, + HopHints: []*lnrpc.HopHint{hopHint}, }}, }) require.NoError(t.t, err)