Skip to content

Commit 83c726b

Browse files
committed
graph/db: fix SetSourceNode race with lenient upsert
This commit fixes a race condition where multiple goroutines call SetSourceNode concurrently during startup, causing sql.ErrNoRows errors. The race occurs when multiple code paths (setSelfNode, createNewHiddenService, RPC updates) read the same old timestamp, independently increment it to the same new value (T+1), and race to write. The fix uses the new UpsertSourceNode SQL query (without strict timestamp constraint) instead of UpsertNode. This allows last-write-wins semantics for our own node, ensuring all parameter changes persist even when timestamps collide. Refactored sql_store.go for reusability: - upsertNodeAncillaryData: common logic for features/addresses/extras - populateNodeParams: common parameter building with callback pattern - buildNodeUpsertParams: builds params for strict UpsertNode - buildSourceNodeUpsertParams: builds params for lenient UpsertSourceNode - upsertSourceNode: new function using lenient query Updated TestSetSourceNodeSameTimestamp to verify that concurrent updates with the same timestamp now succeed and parameter changes persist. Fixes the itest error: "unable to upsert source node: upserting node(...): sql: no rows in result set"
1 parent 578abf8 commit 83c726b

File tree

2 files changed

+176
-64
lines changed

2 files changed

+176
-64
lines changed

graph/db/graph_test.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"crypto/sha256"
7-
"database/sql"
87
"encoding/hex"
98
"errors"
109
"fmt"
@@ -397,19 +396,19 @@ func TestSourceNode(t *testing.T) {
397396
compareNodes(t, testNode, sourceNode)
398397
}
399398

400-
// TestSetSourceNodeSameTimestamp demonstrates that SetSourceNode can return an
401-
// error when called with the same last update timestamp. Calling SetSourceNode
402-
// with the same timestamp should be allowed (unlike AddNode), as it is
403-
// possible that our own node announcement may change quickly. This will be
404-
// fixed in an upcoming commit.
399+
// TestSetSourceNodeSameTimestamp tests that SetSourceNode accepts updates
400+
// with the same timestamp. This is necessary because multiple code paths
401+
// (setSelfNode, createNewHiddenService, RPC updates) can race during startup,
402+
// reading the same old timestamp and independently incrementing it to the same
403+
// new value. For our own node, we want parameter changes to persist even with
404+
// timestamp collisions (unlike network gossip where same timestamp means same
405+
// content).
405406
func TestSetSourceNodeSameTimestamp(t *testing.T) {
406407
t.Parallel()
407408
ctx := t.Context()
408409

409410
graph := MakeTestGraph(t)
410411

411-
_, isSQLStore := graph.V1Store.(*SQLStore)
412-
413412
// Create and set the initial source node.
414413
testNode := createTestVertex(t)
415414
require.NoError(t, graph.SetSourceNode(ctx, testNode))
@@ -421,8 +420,9 @@ func TestSetSourceNodeSameTimestamp(t *testing.T) {
421420

422421
// Create a modified version of the node with the same timestamp but
423422
// different parameters (e.g., different alias and color). This
424-
// could well be the case for our own node announcement (unlike other
425-
// announcements where same timestamp means same parameters).
423+
// simulates the race condition where multiple goroutines read the
424+
// same old timestamp, independently increment it, and try to update
425+
// with different changes.
426426
modifiedNode := models.NewV1Node(
427427
testNode.PubKeyBytes, &models.NodeV1Fields{
428428
// Same timestamp.
@@ -437,20 +437,20 @@ func TestSetSourceNodeSameTimestamp(t *testing.T) {
437437
)
438438

439439
// Attempt to set the source node with the same timestamp but
440-
// different parameters.
441-
err = graph.SetSourceNode(ctx, modifiedNode)
442-
443-
// The SQL store will return sql.ErrNoRows because the UPDATE clause
444-
// in the upsert query requires the new timestamp to be strictly
445-
// greater than the existing one. When this condition is not met, no
446-
// rows are updated and the SQL query returns ErrNoRows. The bbolt KV
447-
// store, on the other hand, silently ignores stale updates and returns
448-
// no error.
449-
if isSQLStore {
450-
require.ErrorIs(t, err, sql.ErrNoRows)
451-
} else {
452-
require.NoError(t, err)
453-
}
440+
// different parameters. This should now succeed for both SQL and KV
441+
// stores. The SQL store uses UpsertSourceNode which removes the
442+
// strict timestamp constraint, allowing last-write-wins semantics.
443+
require.NoError(t, graph.SetSourceNode(ctx, modifiedNode))
444+
445+
// Verify that the parameter changes actually persisted.
446+
updatedNode, err := graph.SourceNode(ctx)
447+
require.NoError(t, err)
448+
require.Equal(t, "different-alias", updatedNode.Alias.UnwrapOr(""))
449+
require.Equal(
450+
t, color.RGBA{R: 100, G: 200, B: 50, A: 0},
451+
updatedNode.Color.UnwrapOr(color.RGBA{}),
452+
)
453+
require.Equal(t, testNode.LastUpdate, updatedNode.LastUpdate)
454454
}
455455

456456
// TestEdgeInsertionDeletion tests the basic CRUD operations for channel edges.

graph/db/sql_store.go

Lines changed: 152 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type SQLQueries interface {
4242
Node queries.
4343
*/
4444
UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
45+
UpsertSourceNode(ctx context.Context, arg sqlc.UpsertSourceNodeParams) (int64, error)
4546
GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.GraphNode, error)
4647
GetNodesByIDs(ctx context.Context, ids []int64) ([]sqlc.GraphNode, error)
4748
GetNodeIDByPubKey(ctx context.Context, arg sqlc.GetNodeIDByPubKeyParams) (int64, error)
@@ -521,7 +522,14 @@ func (s *SQLStore) SetSourceNode(ctx context.Context,
521522
node *models.Node) error {
522523

523524
return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
524-
id, err := upsertNode(ctx, db, node)
525+
// For the source node, we use a less strict upsert that allows
526+
// updates even when the timestamp hasn't changed. This handles
527+
// the race condition where multiple goroutines (e.g.,
528+
// setSelfNode, createNewHiddenService, RPC updates) read the
529+
// same old timestamp, independently increment it, and try to
530+
// write concurrently. We want all parameter changes to persist,
531+
// even if timestamps collide.
532+
id, err := upsertSourceNode(ctx, db, node)
525533
if err != nil {
526534
return fmt.Errorf("unable to upsert source node: %w",
527535
err)
@@ -3599,77 +3607,181 @@ func getNodeFeatures(ctx context.Context, db SQLQueries,
35993607
return features, nil
36003608
}
36013609

3602-
// upsertNode upserts the node record into the database. If the node already
3603-
// exists, then the node's information is updated. If the node doesn't exist,
3604-
// then a new node is created. The node's features, addresses and extra TLV
3605-
// types are also updated. The node's DB ID is returned.
3606-
func upsertNode(ctx context.Context, db SQLQueries,
3607-
node *models.Node) (int64, error) {
3610+
// upsertNodeAncillaryData updates the node's features, addresses, and extra
3611+
// signed fields. This is common logic shared by upsertNode and
3612+
// upsertSourceNode.
3613+
func upsertNodeAncillaryData(ctx context.Context, db SQLQueries,
3614+
nodeID int64, node *models.Node) error {
36083615

3609-
params := sqlc.UpsertNodeParams{
3610-
Version: int16(lnwire.GossipVersion1),
3611-
PubKey: node.PubKeyBytes[:],
3616+
// Update the node's features.
3617+
err := upsertNodeFeatures(ctx, db, nodeID, node.Features)
3618+
if err != nil {
3619+
return fmt.Errorf("inserting node features: %w", err)
36123620
}
36133621

3614-
if node.HaveAnnouncement() {
3615-
switch node.Version {
3616-
case lnwire.GossipVersion1:
3617-
params.LastUpdate = sqldb.SQLInt64(
3618-
node.LastUpdate.Unix(),
3619-
)
3622+
// Update the node's addresses.
3623+
err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
3624+
if err != nil {
3625+
return fmt.Errorf("inserting node addresses: %w", err)
3626+
}
36203627

3621-
case lnwire.GossipVersion2:
3628+
// Convert the flat extra opaque data into a map of TLV types to
3629+
// values.
3630+
extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
3631+
if err != nil {
3632+
return fmt.Errorf("unable to marshal extra opaque data: %w",
3633+
err)
3634+
}
36223635

3623-
default:
3624-
return 0, fmt.Errorf("unknown gossip version: %d",
3625-
node.Version)
3626-
}
3636+
// Update the node's extra signed fields.
3637+
err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
3638+
if err != nil {
3639+
return fmt.Errorf("inserting node extra TLVs: %w", err)
3640+
}
3641+
3642+
return nil
3643+
}
3644+
3645+
// populateNodeParams populates the common node parameters from a models.Node.
3646+
// This is a helper for building UpsertNodeParams and UpsertSourceNodeParams.
3647+
func populateNodeParams(node *models.Node,
3648+
setParams func(lastUpdate sql.NullInt64, alias,
3649+
colorStr sql.NullString, signature []byte)) error {
3650+
3651+
if !node.HaveAnnouncement() {
3652+
return nil
3653+
}
3654+
3655+
switch node.Version {
3656+
case lnwire.GossipVersion1:
3657+
lastUpdate := sqldb.SQLInt64(node.LastUpdate.Unix())
3658+
var alias, colorStr sql.NullString
36273659

36283660
node.Color.WhenSome(func(rgba color.RGBA) {
3629-
params.Color = sqldb.SQLStrValid(EncodeHexColor(rgba))
3661+
colorStr = sqldb.SQLStrValid(EncodeHexColor(rgba))
36303662
})
36313663
node.Alias.WhenSome(func(s string) {
3632-
params.Alias = sqldb.SQLStrValid(s)
3664+
alias = sqldb.SQLStrValid(s)
36333665
})
36343666

3635-
params.Signature = node.AuthSigBytes
3667+
setParams(lastUpdate, alias, colorStr, node.AuthSigBytes)
3668+
3669+
case lnwire.GossipVersion2:
3670+
// No-op for now.
3671+
3672+
default:
3673+
return fmt.Errorf("unknown gossip version: %d", node.Version)
36363674
}
36373675

3638-
nodeID, err := db.UpsertNode(ctx, params)
3676+
return nil
3677+
}
3678+
3679+
// buildNodeUpsertParams builds the parameters for upserting a node using the
3680+
// strict UpsertNode query (requires timestamp to be increasing).
3681+
func buildNodeUpsertParams(node *models.Node) (sqlc.UpsertNodeParams, error) {
3682+
params := sqlc.UpsertNodeParams{
3683+
Version: int16(lnwire.GossipVersion1),
3684+
PubKey: node.PubKeyBytes[:],
3685+
}
3686+
3687+
err := populateNodeParams(
3688+
node, func(lastUpdate sql.NullInt64, alias,
3689+
colorStr sql.NullString,
3690+
signature []byte) {
3691+
3692+
params.LastUpdate = lastUpdate
3693+
params.Alias = alias
3694+
params.Color = colorStr
3695+
params.Signature = signature
3696+
})
3697+
3698+
return params, err
3699+
}
3700+
3701+
// buildSourceNodeUpsertParams builds the parameters for upserting the source
3702+
// node using the lenient UpsertSourceNode query (allows same timestamp).
3703+
func buildSourceNodeUpsertParams(node *models.Node) (
3704+
sqlc.UpsertSourceNodeParams, error) {
3705+
3706+
params := sqlc.UpsertSourceNodeParams{
3707+
Version: int16(lnwire.GossipVersion1),
3708+
PubKey: node.PubKeyBytes[:],
3709+
}
3710+
3711+
err := populateNodeParams(
3712+
node, func(lastUpdate sql.NullInt64, alias,
3713+
colorStr sql.NullString, signature []byte) {
3714+
3715+
params.LastUpdate = lastUpdate
3716+
params.Alias = alias
3717+
params.Color = colorStr
3718+
params.Signature = signature
3719+
},
3720+
)
3721+
3722+
return params, err
3723+
}
3724+
3725+
// upsertSourceNode upserts the source node record into the database using a
3726+
// less strict upsert that allows updates even when the timestamp hasn't
3727+
// changed. This is necessary to handle concurrent updates to our own node
3728+
// during startup and runtime. The node's features, addresses and extra TLV
3729+
// types are also updated. The node's DB ID is returned.
3730+
func upsertSourceNode(ctx context.Context, db SQLQueries,
3731+
node *models.Node) (int64, error) {
3732+
3733+
params, err := buildSourceNodeUpsertParams(node)
36393734
if err != nil {
3640-
return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
3641-
err)
3735+
return 0, err
3736+
}
3737+
3738+
nodeID, err := db.UpsertSourceNode(ctx, params)
3739+
if err != nil {
3740+
return 0, fmt.Errorf("upserting source node(%x): %w",
3741+
node.PubKeyBytes, err)
36423742
}
36433743

36443744
// We can exit here if we don't have the announcement yet.
36453745
if !node.HaveAnnouncement() {
36463746
return nodeID, nil
36473747
}
36483748

3649-
// Update the node's features.
3650-
err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
3749+
// Update the ancillary node data (features, addresses, extra fields).
3750+
err = upsertNodeAncillaryData(ctx, db, nodeID, node)
36513751
if err != nil {
3652-
return 0, fmt.Errorf("inserting node features: %w", err)
3752+
return 0, err
36533753
}
36543754

3655-
// Update the node's addresses.
3656-
err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
3755+
return nodeID, nil
3756+
}
3757+
3758+
// upsertNode upserts the node record into the database. If the node already
3759+
// exists, then the node's information is updated. If the node doesn't exist,
3760+
// then a new node is created. The node's features, addresses and extra TLV
3761+
// types are also updated. The node's DB ID is returned.
3762+
func upsertNode(ctx context.Context, db SQLQueries,
3763+
node *models.Node) (int64, error) {
3764+
3765+
params, err := buildNodeUpsertParams(node)
36573766
if err != nil {
3658-
return 0, fmt.Errorf("inserting node addresses: %w", err)
3767+
return 0, err
36593768
}
36603769

3661-
// Convert the flat extra opaque data into a map of TLV types to
3662-
// values.
3663-
extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
3770+
nodeID, err := db.UpsertNode(ctx, params)
36643771
if err != nil {
3665-
return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
3772+
return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
36663773
err)
36673774
}
36683775

3669-
// Update the node's extra signed fields.
3670-
err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
3776+
// We can exit here if we don't have the announcement yet.
3777+
if !node.HaveAnnouncement() {
3778+
return nodeID, nil
3779+
}
3780+
3781+
// Update the ancillary node data (features, addresses, extra fields).
3782+
err = upsertNodeAncillaryData(ctx, db, nodeID, node)
36713783
if err != nil {
3672-
return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
3784+
return 0, err
36733785
}
36743786

36753787
return nodeID, nil

0 commit comments

Comments
 (0)