diff --git a/docs/release-notes/release-notes-0.20.1.md b/docs/release-notes/release-notes-0.20.1.md index 844ae3089ec..f1e7023bf67 100644 --- a/docs/release-notes/release-notes-0.20.1.md +++ b/docs/release-notes/release-notes-0.20.1.md @@ -31,6 +31,11 @@ * Fix a bug where [repeated network addresses](https://github.com/lightningnetwork/lnd/pull/10341) were added to the node announcement and `getinfo` output. + +* [Fix source node race + condition](https://github.com/lightningnetwork/lnd/pull/10371) which could + prevent a node from starting up if two goroutines attempt to update the + node's announcement at the same time. * [Fix a startup issue in LND when encountering a deserialization issue](https://github.com/lightningnetwork/lnd/pull/10383) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index e4a8c7faea9..590c077d608 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -396,6 +396,63 @@ func TestSourceNode(t *testing.T) { compareNodes(t, testNode, sourceNode) } +// TestSetSourceNodeSameTimestamp tests that SetSourceNode accepts updates +// with the same timestamp. This is necessary because multiple code paths +// (setSelfNode, createNewHiddenService, RPC updates) can race during startup, +// reading the same old timestamp and independently incrementing it to the same +// new value. For our own node, we want parameter changes to persist even with +// timestamp collisions (unlike network gossip where same timestamp means same +// content). +func TestSetSourceNodeSameTimestamp(t *testing.T) { + t.Parallel() + ctx := t.Context() + + graph := MakeTestGraph(t) + + // Create and set the initial source node. + testNode := createTestVertex(t) + require.NoError(t, graph.SetSourceNode(ctx, testNode)) + + // Verify the source node was set correctly. + sourceNode, err := graph.SourceNode(ctx) + require.NoError(t, err) + compareNodes(t, testNode, sourceNode) + + // Create a modified version of the node with the same timestamp but + // different parameters (e.g., different alias and color). This + // simulates the race condition where multiple goroutines read the + // same old timestamp, independently increment it, and try to update + // with different changes. + modifiedNode := models.NewV1Node( + testNode.PubKeyBytes, &models.NodeV1Fields{ + // Same timestamp. + LastUpdate: testNode.LastUpdate, + // Different alias. + Alias: "different-alias", + Color: color.RGBA{R: 100, G: 200, B: 50, A: 0}, + Addresses: testNode.Addresses, + Features: testNode.Features.RawFeatureVector, + AuthSigBytes: testNode.AuthSigBytes, + }, + ) + + // Attempt to set the source node with the same timestamp but + // different parameters. This should now succeed for both SQL and KV + // stores. The SQL store uses UpsertSourceNode which removes the + // strict timestamp constraint, allowing last-write-wins semantics. + require.NoError(t, graph.SetSourceNode(ctx, modifiedNode)) + + // Verify that the parameter changes actually persisted. + updatedNode, err := graph.SourceNode(ctx) + require.NoError(t, err) + require.Equal(t, "different-alias", updatedNode.Alias.UnwrapOr("")) + require.Equal( + t, color.RGBA{R: 100, G: 200, B: 50, A: 0}, + updatedNode.Color.UnwrapOr(color.RGBA{}), + ) + require.Equal(t, testNode.LastUpdate, updatedNode.LastUpdate) +} + // TestEdgeInsertionDeletion tests the basic CRUD operations for channel edges. func TestEdgeInsertionDeletion(t *testing.T) { t.Parallel() diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 3578fd945ae..65077072619 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -42,6 +42,7 @@ type SQLQueries interface { Node queries. */ UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error) + UpsertSourceNode(ctx context.Context, arg sqlc.UpsertSourceNodeParams) (int64, error) GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.GraphNode, error) GetNodesByIDs(ctx context.Context, ids []int64) ([]sqlc.GraphNode, error) GetNodeIDByPubKey(ctx context.Context, arg sqlc.GetNodeIDByPubKeyParams) (int64, error) @@ -521,7 +522,14 @@ func (s *SQLStore) SetSourceNode(ctx context.Context, node *models.Node) error { return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { - id, err := upsertNode(ctx, db, node) + // For the source node, we use a less strict upsert that allows + // updates even when the timestamp hasn't changed. This handles + // the race condition where multiple goroutines (e.g., + // setSelfNode, createNewHiddenService, RPC updates) read the + // same old timestamp, independently increment it, and try to + // write concurrently. We want all parameter changes to persist, + // even if timestamps collide. + id, err := upsertSourceNode(ctx, db, node) if err != nil { return fmt.Errorf("unable to upsert source node: %w", err) @@ -3599,46 +3607,138 @@ func getNodeFeatures(ctx context.Context, db SQLQueries, return features, nil } -// upsertNode upserts the node record into the database. If the node already -// exists, then the node's information is updated. If the node doesn't exist, -// then a new node is created. The node's features, addresses and extra TLV -// types are also updated. The node's DB ID is returned. -func upsertNode(ctx context.Context, db SQLQueries, - node *models.Node) (int64, error) { +// upsertNodeAncillaryData updates the node's features, addresses, and extra +// signed fields. This is common logic shared by upsertNode and +// upsertSourceNode. +func upsertNodeAncillaryData(ctx context.Context, db SQLQueries, + nodeID int64, node *models.Node) error { - params := sqlc.UpsertNodeParams{ - Version: int16(lnwire.GossipVersion1), - PubKey: node.PubKeyBytes[:], + // Update the node's features. + err := upsertNodeFeatures(ctx, db, nodeID, node.Features) + if err != nil { + return fmt.Errorf("inserting node features: %w", err) } - if node.HaveAnnouncement() { - switch node.Version { - case lnwire.GossipVersion1: - params.LastUpdate = sqldb.SQLInt64( - node.LastUpdate.Unix(), - ) + // Update the node's addresses. + err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses) + if err != nil { + return fmt.Errorf("inserting node addresses: %w", err) + } - case lnwire.GossipVersion2: + // Convert the flat extra opaque data into a map of TLV types to + // values. + extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData) + if err != nil { + return fmt.Errorf("unable to marshal extra opaque data: %w", + err) + } - default: - return 0, fmt.Errorf("unknown gossip version: %d", - node.Version) - } + // Update the node's extra signed fields. + err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra) + if err != nil { + return fmt.Errorf("inserting node extra TLVs: %w", err) + } + + return nil +} + +// populateNodeParams populates the common node parameters from a models.Node. +// This is a helper for building UpsertNodeParams and UpsertSourceNodeParams. +func populateNodeParams(node *models.Node, + setParams func(lastUpdate sql.NullInt64, alias, + colorStr sql.NullString, signature []byte)) error { + + if !node.HaveAnnouncement() { + return nil + } + + switch node.Version { + case lnwire.GossipVersion1: + lastUpdate := sqldb.SQLInt64(node.LastUpdate.Unix()) + var alias, colorStr sql.NullString node.Color.WhenSome(func(rgba color.RGBA) { - params.Color = sqldb.SQLStrValid(EncodeHexColor(rgba)) + colorStr = sqldb.SQLStrValid(EncodeHexColor(rgba)) }) node.Alias.WhenSome(func(s string) { - params.Alias = sqldb.SQLStrValid(s) + alias = sqldb.SQLStrValid(s) }) - params.Signature = node.AuthSigBytes + setParams(lastUpdate, alias, colorStr, node.AuthSigBytes) + + case lnwire.GossipVersion2: + // No-op for now. + + default: + return fmt.Errorf("unknown gossip version: %d", node.Version) } - nodeID, err := db.UpsertNode(ctx, params) + return nil +} + +// buildNodeUpsertParams builds the parameters for upserting a node using the +// strict UpsertNode query (requires timestamp to be increasing). +func buildNodeUpsertParams(node *models.Node) (sqlc.UpsertNodeParams, error) { + params := sqlc.UpsertNodeParams{ + Version: int16(lnwire.GossipVersion1), + PubKey: node.PubKeyBytes[:], + } + + err := populateNodeParams( + node, func(lastUpdate sql.NullInt64, alias, + colorStr sql.NullString, + signature []byte) { + + params.LastUpdate = lastUpdate + params.Alias = alias + params.Color = colorStr + params.Signature = signature + }) + + return params, err +} + +// buildSourceNodeUpsertParams builds the parameters for upserting the source +// node using the lenient UpsertSourceNode query (allows same timestamp). +func buildSourceNodeUpsertParams(node *models.Node) ( + sqlc.UpsertSourceNodeParams, error) { + + params := sqlc.UpsertSourceNodeParams{ + Version: int16(lnwire.GossipVersion1), + PubKey: node.PubKeyBytes[:], + } + + err := populateNodeParams( + node, func(lastUpdate sql.NullInt64, alias, + colorStr sql.NullString, signature []byte) { + + params.LastUpdate = lastUpdate + params.Alias = alias + params.Color = colorStr + params.Signature = signature + }, + ) + + return params, err +} + +// upsertSourceNode upserts the source node record into the database using a +// less strict upsert that allows updates even when the timestamp hasn't +// changed. This is necessary to handle concurrent updates to our own node +// during startup and runtime. The node's features, addresses and extra TLV +// types are also updated. The node's DB ID is returned. +func upsertSourceNode(ctx context.Context, db SQLQueries, + node *models.Node) (int64, error) { + + params, err := buildSourceNodeUpsertParams(node) if err != nil { - return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes, - err) + return 0, err + } + + nodeID, err := db.UpsertSourceNode(ctx, params) + if err != nil { + return 0, fmt.Errorf("upserting source node(%x): %w", + node.PubKeyBytes, err) } // We can exit here if we don't have the announcement yet. @@ -3646,30 +3746,42 @@ func upsertNode(ctx context.Context, db SQLQueries, return nodeID, nil } - // Update the node's features. - err = upsertNodeFeatures(ctx, db, nodeID, node.Features) + // Update the ancillary node data (features, addresses, extra fields). + err = upsertNodeAncillaryData(ctx, db, nodeID, node) if err != nil { - return 0, fmt.Errorf("inserting node features: %w", err) + return 0, err } - // Update the node's addresses. - err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses) + return nodeID, nil +} + +// upsertNode upserts the node record into the database. If the node already +// exists, then the node's information is updated. If the node doesn't exist, +// then a new node is created. The node's features, addresses and extra TLV +// types are also updated. The node's DB ID is returned. +func upsertNode(ctx context.Context, db SQLQueries, + node *models.Node) (int64, error) { + + params, err := buildNodeUpsertParams(node) if err != nil { - return 0, fmt.Errorf("inserting node addresses: %w", err) + return 0, err } - // Convert the flat extra opaque data into a map of TLV types to - // values. - extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData) + nodeID, err := db.UpsertNode(ctx, params) if err != nil { - return 0, fmt.Errorf("unable to marshal extra opaque data: %w", + return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes, err) } - // Update the node's extra signed fields. - err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra) + // We can exit here if we don't have the announcement yet. + if !node.HaveAnnouncement() { + return nodeID, nil + } + + // Update the ancillary node data (features, addresses, extra fields). + err = upsertNodeAncillaryData(ctx, db, nodeID, node) if err != nil { - return 0, fmt.Errorf("inserting node extra TLVs: %w", err) + return 0, err } return nodeID, nil diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 9c270273758..8ed9333d72a 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -3735,6 +3735,51 @@ func (q *Queries) UpsertPruneLogEntry(ctx context.Context, arg UpsertPruneLogEnt return err } +const upsertSourceNode = `-- name: UpsertSourceNode :one +INSERT INTO graph_nodes ( + version, pub_key, alias, last_update, color, signature +) VALUES ( + $1, $2, $3, $4, $5, $6 +) +ON CONFLICT (pub_key, version) + -- Update the following fields if a conflict occurs on pub_key + -- and version. + DO UPDATE SET + alias = EXCLUDED.alias, + last_update = EXCLUDED.last_update, + color = EXCLUDED.color, + signature = EXCLUDED.signature +WHERE graph_nodes.last_update IS NULL + OR EXCLUDED.last_update >= graph_nodes.last_update +RETURNING id +` + +type UpsertSourceNodeParams struct { + Version int16 + PubKey []byte + Alias sql.NullString + LastUpdate sql.NullInt64 + Color sql.NullString + Signature []byte +} + +// We use a separate upsert for our own node since we want to be less strict +// about the last_update field. For our own node, we always want to +// update the record even if the last_update is the same as what we have. +func (q *Queries) UpsertSourceNode(ctx context.Context, arg UpsertSourceNodeParams) (int64, error) { + row := q.db.QueryRowContext(ctx, upsertSourceNode, + arg.Version, + arg.PubKey, + arg.Alias, + arg.LastUpdate, + arg.Color, + arg.Signature, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + const upsertZombieChannel = `-- name: UpsertZombieChannel :exec /* ───────────────────────────────────────────── graph_zombie_channels table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 0087559be8f..7b7b0649596 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -147,6 +147,10 @@ type Querier interface { UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTypeParams) error UpsertPruneLogEntry(ctx context.Context, arg UpsertPruneLogEntryParams) error + // We use a separate upsert for our own node since we want to be less strict + // about the last_update field. For our own node, we always want to + // update the record even if the last_update is the same as what we have. + UpsertSourceNode(ctx context.Context, arg UpsertSourceNodeParams) (int64, error) UpsertZombieChannel(ctx context.Context, arg UpsertZombieChannelParams) error } diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 19087fc1bdd..b9bee182232 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -21,6 +21,27 @@ WHERE graph_nodes.last_update IS NULL OR EXCLUDED.last_update > graph_nodes.last_update RETURNING id; +-- We use a separate upsert for our own node since we want to be less strict +-- about the last_update field. For our own node, we always want to +-- update the record even if the last_update is the same as what we have. +-- name: UpsertSourceNode :one +INSERT INTO graph_nodes ( + version, pub_key, alias, last_update, color, signature +) VALUES ( + $1, $2, $3, $4, $5, $6 +) +ON CONFLICT (pub_key, version) + -- Update the following fields if a conflict occurs on pub_key + -- and version. + DO UPDATE SET + alias = EXCLUDED.alias, + last_update = EXCLUDED.last_update, + color = EXCLUDED.color, + signature = EXCLUDED.signature +WHERE graph_nodes.last_update IS NULL + OR EXCLUDED.last_update >= graph_nodes.last_update +RETURNING id; + -- name: GetNodesByIDs :many SELECT * FROM graph_nodes