Skip to content

Commit

Permalink
[RIVER-1738] Update pg store to use advisory locking instead of compa…
Browse files Browse the repository at this point in the history
…ring UUIDs per transaction (#1739)

This PR adds the usage of a session-wide `pg_advisory_lock`s to ensure
unique read/write access to the database schema for this node while the
node is up and available, and removes the per-transaction check on the
singlenodekey table, improving query performance of the running node at
the cost of neglegible amount of additional startup latency to take the
schema lock.

In order to maintain the session lock we acquire a connection from the
pgx pool, take the lock on that connection, and continuously maintain
the connection by polling it with pings. If a ping fails, the node will
close the connection and attempt to acquire a new connection and
re-establish the lock on the schema. If the node is unable to establish
a healthy connection after about 20s of retries, or the node is unable
to retake the schema lock, the node will exit.

As before, we still detect the entrance of other nodes vying for the
same schema on the network by monitoring updates to the singlenodekey
table. The node updates the singlenodekey table on startup before taking
the schema lock, and new nodes will trigger old nodes to release their
schema lock and exit. When a node detects that a schema lock is
unavailable, it will use the existing StartupDelay config parameter to
give the exiting node some time to clean up after itself.

Tests:
- an existing unit test confirms that the existing node exits whenever a
new node starts, properly responding to the new entry in the
singlenodekey table. The success of the test case implies that the first
node is properly releasing the schema lock.
- manual local testing, using toxiproxy:
- the node is able to recover the schema lock after temporary connection
interruptions
- the node will shut down after maximum retries to re-establish the
connection and retake the schema lock if connectivity is not restored
- the node will shut down if it is not able to retake the schema lock if
network connectivity to pg is restored before it exhausts reconnection
retries.
  • Loading branch information
clemire authored Dec 16, 2024
1 parent 1d12b0b commit 006b8d7
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 118 deletions.
11 changes: 9 additions & 2 deletions core/node/crypto/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,17 @@ func (NoopChainMonitor) OnStopped(OnChainMonitorStoppedCallback) {}
// Run individual tests with -run to find specific leaking tests.
func TestMainForLeaksIgnoreGeth() {
// Geth's simulated backend leaks a lot of goroutines.
// Unfortunately goleak doesn't have optiosn to ignore by module or package,
// Unfortunately goleak doesn't have options to ignore by module or package,
// so some custom error string parsing is required to filter them out.

// pgx also sometimes has a "leaked" goroutine that takes about 500ms to terminate after
// a pool is closed. Here we can configure ignoring this as it is a specific function.
ignorePgxPoolHealthCheck := goleak.IgnoreAnyFunction(
"github.com/jackc/pgx/v5/pgxpool.(*Pool).triggerHealthCheck.func1",
)

now := time.Now()
err := goleak.Find()
err := goleak.Find(ignorePgxPoolHealthCheck)
elapsed := time.Since(now)
if err != nil {
msg := err.Error()
Expand Down
10 changes: 8 additions & 2 deletions core/node/events/miniblock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ func (p *miniblockProducer) TestMakeMiniblock(

err = SleepWithContext(ctx, 10*time.Millisecond)
if err != nil {
return nil, err
return nil, AsRiverError(err, Err_INTERNAL).
Func("TestMakeMiniblock").
Message("Timed out while waiting for make_miniblock job to be scheduled").
Tag("streamId", streamId)
}
}

Expand All @@ -288,7 +291,10 @@ func (p *miniblockProducer) TestMakeMiniblock(

err = SleepWithContext(ctx, 10*time.Millisecond)
if err != nil {
return nil, err
return nil, AsRiverError(err, Err_INTERNAL).
Func("TestMakeMiniblock").
Message("Timed out while waiting for make_miniblock job to terminate").
Tag("streamId", streamId)
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ func (s *streamImpl) addEventLocked(ctx context.Context, event *ParsedEvent) err
// TODO: for some classes of errors, it's not clear if event was added or not
// for those, perhaps entire Stream structure should be scrapped and reloaded
if err != nil {
return err
return AsRiverError(err, Err_DB_OPERATION_FAILURE).
Tag("inMemoryBlocks", len(s.view().blocks))
}

s.setView(newSV)
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (s *streamCacheImpl) tryLoadStreamRecord(
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, AsRiverError(ctx.Err(), Err_INTERNAL).Message("Timeout waiting for cache record to be created")
case <-time.After(delay):
stream, _ := s.cache.Load(streamId)
if stream != nil {
Expand Down
35 changes: 25 additions & 10 deletions core/node/rpc/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/gammazero/workerpool"
"github.com/stretchr/testify/assert"

"github.com/river-build/river/core/contracts/river"
Expand Down Expand Up @@ -44,7 +45,11 @@ func fillUserSettingsStreamWithData(
err,
Err_INTERNAL,
).Message("Failed to add event to stream").
Func("fillUserSettingsStreamWithData")
Func("fillUserSettingsStreamWithData").
Tag("streamId", streamId).
Tag("miniblockNum", i).
Tag("mbEventNum", j).
Tag("numMbs", numMBs)
}
}
prevMB, err = makeMiniblock(ctx, client, streamId, false, prevMB.Num)
Expand All @@ -53,7 +58,10 @@ func fillUserSettingsStreamWithData(
err,
Err_INTERNAL,
).Message("Failed to create miniblock").
Func("fillUserSettingsStreamWithData")
Func("fillUserSettingsStreamWithData").
Tag("streamId", streamId).
Tag("miniblockNum", i).
Tag("numMbs", numMBs)
}
}
return prevMB, nil
Expand All @@ -70,12 +78,10 @@ func createUserSettingsStreamsWithData(
streamIds := make([]StreamId, numStreams)
errChan := make(chan error, numStreams)

var wg sync.WaitGroup
wg.Add(numStreams)
wp := workerpool.New(10)

for i := 0; i < numStreams; i++ {
go func(i int) {
defer wg.Done()
wp.Submit(func() {
wallet, err := crypto.NewWallet(ctx)
if err != nil {
errChan <- err
Expand All @@ -90,20 +96,29 @@ func createUserSettingsStreamsWithData(
&StreamSettings{DisableMiniblockCreation: true},
)
if err != nil {
errChan <- AsRiverError(err, Err_INTERNAL).Message("Failed to create stream").Func("createUserSettingsStreamsWithData")
errChan <- AsRiverError(err, Err_INTERNAL).
Message("Failed to create stream").
Func("createUserSettingsStreamsWithData").
Tag("streamNum", i).
Tag("streamId", streamId)
return
}
streamIds[i] = streamId

_, err = fillUserSettingsStreamWithData(ctx, streamId, wallet, client, numMBs, numEventsPerMB, mbRef)
if err != nil {
errChan <- AsRiverError(err, Err_INTERNAL).Message("Failed to fill stream with data").Func("createUserSettingsStreamsWithData")
errChan <- AsRiverError(err, Err_INTERNAL).
Message("Failed to fill stream with data").
Func("createUserSettingsStreamsWithData").
Tag("streamNum", i).
Tag("streamId", streamId)
return
}
}(i)
})
}

wg.Wait()
wp.StopWait()

if len(errChan) > 0 {
return nil, nil, <-errChan
}
Expand Down
18 changes: 10 additions & 8 deletions core/node/rpc/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/river-build/river/core/node/testutils/testcert"
)

var notificationDeliveryDelay = 30 * time.Second

// TestNotifications is designed in such a way that all tests are run in parallel
// and share the same set of nodes, notification service and client.
func TestNotifications(t *testing.T) {
Expand Down Expand Up @@ -120,7 +122,7 @@ func testGDMAPNNotificationAfterUnsubscribe(
notificationsForEvent := nc.ApnPushNotifications[eventHash]

return cmp.Equal(notificationsForEvent, expectedUsersToReceiveNotification)
}, 10*time.Second, 2500*time.Millisecond, "Didn't receive expected notifications")
}, notificationDeliveryDelay, 2500*time.Millisecond, "Didn't receive expected notifications for stream %s", test.gdmStreamID)

// userA unsubscribes and userB subscribes using the same device.
// for tests the deviceToken is the users wallet address, in this case
Expand Down Expand Up @@ -163,7 +165,7 @@ func testGDMAPNNotificationAfterUnsubscribe(
notificationsForEvent := nc.ApnPushNotifications[eventHash]

return len(notificationsForEvent) != 0
}, 10*time.Second, 2500*time.Millisecond, "Receive unexpected notification")
}, notificationDeliveryDelay, 2500*time.Millisecond, "Receive unexpected notification")
}

func testGDMMessageWithNoMentionsRepliesAndReaction(
Expand Down Expand Up @@ -232,7 +234,7 @@ func testGDMMessageWithNoMentionsRepliesAndReaction(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, 10*time.Second, 2500*time.Millisecond, "Didn't receive expected notifications")
}, notificationDeliveryDelay, 2500*time.Millisecond, "Didn't receive expected notifications for stream %s", test.gdmStreamID)

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -278,7 +280,7 @@ func testGDMReactionMessage(
defer nc.WebPushNotificationsMu.Unlock()

return cmp.Equal(nc.WebPushNotifications[eventHash], expectedUsersToReceiveNotification)
}, 15*time.Second, 100*time.Millisecond, "user A Didn't receive expected notification")
}, notificationDeliveryDelay, 100*time.Millisecond, "user A Didn't receive expected notification for stream %s", test.gdmStreamID)

// ensure that user B and C never get a notification
test.req.Never(func() bool {
Expand Down Expand Up @@ -426,7 +428,7 @@ func testDMMessageWithDefaultUserNotificationsPreferences(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.dmStreamID[:])

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -560,7 +562,7 @@ func testSpaceChannelPlainMessage(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID[:])

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -639,7 +641,7 @@ func testSpaceChannelAtChannelTag(

return cmp.Equal(webNotifications, expectedUsersToReceiveNotification) &&
cmp.Equal(apnNotifications, expectedUsersToReceiveNotification)
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID[:])

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down Expand Up @@ -720,7 +722,7 @@ func testSpaceChannelMentionTag(

return webCount == len(expectedUsersToReceiveNotification) ||
apnCount == len(expectedUsersToReceiveNotification)
}, 20*time.Second, 100*time.Millisecond, "Didn't receive expected notifications")
}, notificationDeliveryDelay, 100*time.Millisecond, "Didn't receive expected notifications for stream %s", test.channelID)

// Wait a bit to ensure that no more notifications come in
test.req.Never(func() bool {
Expand Down
6 changes: 5 additions & 1 deletion core/node/storage/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ func TestMigrateExistingDb(t *testing.T) {

testParams := setupStreamStorageTest(t)
ctx := testParams.ctx
defer testParams.closer()

// Tear down the store and defer remaining cleanup
testParams.pgStreamStore.Close(ctx)
defer testParams.ctxCloser()
defer testParams.schemaDeleter()

pool, err := CreateAndValidatePgxPool(
ctx,
Expand Down
Loading

0 comments on commit 006b8d7

Please sign in to comment.