Skip to content

Commit 528bd94

Browse files
Merge pull request #678 from onflow/janez/close-clients
Close the AN clients
2 parents c98427b + 5b77d7b commit 528bd94

File tree

10 files changed

+134
-161
lines changed

10 files changed

+134
-161
lines changed

bootstrap/bootstrap.go

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,24 @@ import (
77
"math"
88
"time"
99

10-
"github.com/onflow/flow-go/module/component"
11-
1210
pebbleDB "github.com/cockroachdb/pebble"
13-
11+
"github.com/onflow/flow-evm-gateway/metrics"
1412
"github.com/onflow/flow-go-sdk/access"
1513
"github.com/onflow/flow-go-sdk/access/grpc"
1614
"github.com/onflow/flow-go-sdk/crypto"
1715
"github.com/onflow/flow-go/fvm/environment"
1816
"github.com/onflow/flow-go/fvm/evm"
1917
flowGo "github.com/onflow/flow-go/model/flow"
18+
"github.com/onflow/flow-go/module/component"
19+
flowMetrics "github.com/onflow/flow-go/module/metrics"
20+
"github.com/onflow/flow-go/module/util"
2021
gethTypes "github.com/onflow/go-ethereum/core/types"
2122
"github.com/rs/zerolog"
2223
"github.com/sethvargo/go-limiter/memorystore"
2324
grpcOpts "google.golang.org/grpc"
2425

2526
"github.com/onflow/flow-evm-gateway/api"
2627
"github.com/onflow/flow-evm-gateway/config"
27-
"github.com/onflow/flow-evm-gateway/metrics"
2828
"github.com/onflow/flow-evm-gateway/models"
2929
errs "github.com/onflow/flow-evm-gateway/models/errors"
3030
"github.com/onflow/flow-evm-gateway/services/ingestion"
@@ -57,9 +57,10 @@ type Bootstrap struct {
5757
publishers *Publishers
5858
collector metrics.Collector
5959
server *api.Server
60-
metrics *metrics.Server
60+
metrics *flowMetrics.Server
6161
events *ingestion.Engine
6262
profiler *api.ProfileServer
63+
db *pebbleDB.DB
6364
}
6465

6566
func New(config *config.Config) (*Bootstrap, error) {
@@ -72,7 +73,7 @@ func New(config *config.Config) (*Bootstrap, error) {
7273
return nil, err
7374
}
7475

75-
storages, err := setupStorage(config, client, logger)
76+
db, storages, err := setupStorage(config, client, logger)
7677
if err != nil {
7778
return nil, err
7879
}
@@ -83,6 +84,7 @@ func New(config *config.Config) (*Bootstrap, error) {
8384
Transaction: models.NewPublisher[*gethTypes.Transaction](),
8485
Logs: models.NewPublisher[[]*gethTypes.Log](),
8586
},
87+
db: db,
8688
storages: storages,
8789
logger: logger,
8890
config: config,
@@ -334,15 +336,14 @@ func (b *Bootstrap) StopAPIServer() {
334336
b.server.Stop()
335337
}
336338

337-
func (b *Bootstrap) StartMetricsServer(_ context.Context) error {
339+
func (b *Bootstrap) StartMetricsServer(ctx context.Context) error {
338340
b.logger.Info().Msg("bootstrap starting metrics server")
339341

340-
b.metrics = metrics.NewServer(b.logger, b.config.MetricsPort)
341-
started, err := b.metrics.Start()
342+
b.metrics = flowMetrics.NewServer(b.logger, uint(b.config.MetricsPort))
343+
err := util.WaitClosed(ctx, b.metrics.Ready())
342344
if err != nil {
343345
return fmt.Errorf("failed to start metrics server: %w", err)
344346
}
345-
<-started
346347

347348
return nil
348349
}
@@ -352,7 +353,7 @@ func (b *Bootstrap) StopMetricsServer() {
352353
return
353354
}
354355
b.logger.Warn().Msg("shutting down metrics server")
355-
b.metrics.Stop()
356+
<-b.metrics.Done()
356357
}
357358

358359
func (b *Bootstrap) StartProfilerServer(_ context.Context) error {
@@ -388,15 +389,25 @@ func (b *Bootstrap) StopProfilerServer() {
388389
}
389390

390391
func (b *Bootstrap) StopDB() {
391-
if b.storages == nil || b.storages.Storage == nil {
392+
if b.db == nil {
392393
return
393394
}
394-
err := b.storages.Storage.Close()
395+
err := b.db.Close()
395396
if err != nil {
396397
b.logger.Err(err).Msg("PebbleDB graceful shutdown failed")
397398
}
398399
}
399400

401+
func (b *Bootstrap) StopClient() {
402+
if b.client == nil {
403+
return
404+
}
405+
err := b.client.Close()
406+
if err != nil {
407+
b.logger.Err(err).Msg("CrossSporkClient graceful shutdown failed")
408+
}
409+
}
410+
400411
// StartEngine starts provided engine and panics if there are startup errors.
401412
func StartEngine(
402413
ctx context.Context,
@@ -466,12 +477,13 @@ func setupStorage(
466477
config *config.Config,
467478
client *requester.CrossSporkClient,
468479
logger zerolog.Logger,
469-
) (*Storages, error) {
480+
) (*pebbleDB.DB, *Storages, error) {
470481
// create pebble storage from the provided database root directory
471-
store, err := pebble.New(config.DatabaseDir, logger)
482+
db, err := pebble.OpenDB(config.DatabaseDir)
472483
if err != nil {
473-
return nil, err
484+
return nil, nil, err
474485
}
486+
store := pebble.New(db, logger)
475487

476488
blocks := pebble.NewBlocks(store, config.FlowNetworkID)
477489
storageAddress := evm.StorageAccountAddress(config.FlowNetworkID)
@@ -481,7 +493,7 @@ func setupStorage(
481493
if config.ForceStartCadenceHeight != 0 {
482494
logger.Warn().Uint64("height", config.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!")
483495
if err := blocks.SetLatestCadenceHeight(config.ForceStartCadenceHeight, nil); err != nil {
484-
return nil, err
496+
return nil, nil, err
485497
}
486498
}
487499

@@ -500,12 +512,12 @@ func setupStorage(
500512
evmBlokcHeight := uint64(0)
501513
cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight)
502514
if err != nil {
503-
return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
515+
return nil, nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
504516
}
505517

506518
snapshot, err := registerStore.GetSnapshotAt(evmBlokcHeight)
507519
if err != nil {
508-
return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
520+
return nil, nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
509521
}
510522

511523
delta := storage.NewRegisterDelta(snapshot)
@@ -516,16 +528,16 @@ func setupStorage(
516528
accountStatus.ToBytes(),
517529
)
518530
if err != nil {
519-
return nil, fmt.Errorf("could not set account status: %w", err)
531+
return nil, nil, fmt.Errorf("could not set account status: %w", err)
520532
}
521533

522534
err = registerStore.Store(delta.GetUpdates(), evmBlokcHeight, batch)
523535
if err != nil {
524-
return nil, fmt.Errorf("could not store register updates: %w", err)
536+
return nil, nil, fmt.Errorf("could not store register updates: %w", err)
525537
}
526538

527539
if err := blocks.InitHeights(cadenceHeight, cadenceBlock.ID, batch); err != nil {
528-
return nil, fmt.Errorf(
540+
return nil, nil, fmt.Errorf(
529541
"failed to init the database for block height: %d and ID: %s, with : %w",
530542
cadenceHeight,
531543
cadenceBlock.ID,
@@ -535,7 +547,7 @@ func setupStorage(
535547

536548
err = batch.Commit(pebbleDB.Sync)
537549
if err != nil {
538-
return nil, fmt.Errorf("could not commit register updates: %w", err)
550+
return nil, nil, fmt.Errorf("could not commit register updates: %w", err)
539551
}
540552

541553
logger.Info().
@@ -546,7 +558,7 @@ func setupStorage(
546558
// // TODO(JanezP): verify storage account owner is correct
547559
//}
548560

549-
return &Storages{
561+
return db, &Storages{
550562
Storage: store,
551563
Blocks: blocks,
552564
Registers: registerStore,
@@ -591,6 +603,7 @@ func Run(ctx context.Context, cfg *config.Config, ready component.ReadyFunc) err
591603
boot.StopEventIngestion()
592604
boot.StopMetricsServer()
593605
boot.StopAPIServer()
606+
boot.StopClient()
594607
boot.StopDB()
595608

596609
return nil

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.22
55
require (
66
github.com/cockroachdb/pebble v1.1.1
77
github.com/goccy/go-json v0.10.2
8+
github.com/hashicorp/go-multierror v1.1.1
89
github.com/onflow/atree v0.8.0
910
github.com/onflow/cadence v1.2.2
1011
github.com/onflow/flow-go v0.38.0-preview.0.0.20241125190444-25a8af57bea1
@@ -91,7 +92,6 @@ require (
9192
github.com/gorilla/websocket v1.5.0 // indirect
9293
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
9394
github.com/hashicorp/errwrap v1.1.0 // indirect
94-
github.com/hashicorp/go-multierror v1.1.1 // indirect
9595
github.com/hashicorp/golang-lru v1.0.2 // indirect
9696
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
9797
github.com/hashicorp/hcl v1.0.0 // indirect

metrics/server.go

Lines changed: 0 additions & 68 deletions
This file was deleted.

services/ingestion/engine_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,8 +636,9 @@ func defaultReplayerConfig() replayer.Config {
636636
}
637637

638638
func setupStore(t *testing.T) (*pebble.Storage, *pebble.RegisterStorage) {
639-
store, err := pebble.New(t.TempDir(), zerolog.Nop())
639+
db, err := pebble.OpenDB(t.TempDir())
640640
require.NoError(t, err)
641+
store := pebble.New(db, zerolog.Nop())
641642

642643
storageAddress := evm.StorageAccountAddress(flowGo.Emulator)
643644
registerStore := pebble.NewRegisterStorage(store, storageAddress)

services/replayer/blocks_provider_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,9 @@ func TestGetSnapshotAt(t *testing.T) {
279279

280280
func setupBlocksDB(t *testing.T) (*pebble.Storage, storage.BlockIndexer) {
281281
dir := t.TempDir()
282-
db, err := pebble.New(dir, zerolog.Nop())
282+
pebbleDB, err := pebble.OpenDB(dir)
283283
require.NoError(t, err)
284+
db := pebble.New(pebbleDB, zerolog.Nop())
284285
batch := db.NewBatch()
285286

286287
chainID := flowGo.Emulator

services/requester/cross-spork_client.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/hashicorp/go-multierror"
78
"github.com/onflow/cadence"
89
errs "github.com/onflow/flow-evm-gateway/models/errors"
910
"github.com/onflow/flow-go-sdk"
@@ -34,6 +35,10 @@ func (s *sporkClient) GetEventsForHeightRange(
3435
return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
3536
}
3637

38+
func (s *sporkClient) Close() error {
39+
return s.client.Close()
40+
}
41+
3742
type sporkClients []*sporkClient
3843

3944
// addSpork will add a new spork host defined by the first and last height boundary in that spork.
@@ -102,7 +107,7 @@ func (s *sporkClients) continuous() bool {
102107
// that shadows the original access Client function.
103108
type CrossSporkClient struct {
104109
logger zerolog.Logger
105-
sporkClients *sporkClients
110+
sporkClients sporkClients
106111
currentSporkFirstHeight uint64
107112
access.Client
108113
}
@@ -127,7 +132,7 @@ func NewCrossSporkClient(
127132
nodeRootBlockHeight = info.NodeRootBlockHeight
128133
}
129134

130-
clients := &sporkClients{}
135+
clients := sporkClients{}
131136
for _, c := range pastSporks {
132137
if err := clients.add(logger, c); err != nil {
133138
return nil, err
@@ -243,3 +248,19 @@ func (c *CrossSporkClient) GetEventsForHeightRange(
243248
}
244249
return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
245250
}
251+
252+
func (c *CrossSporkClient) Close() error {
253+
var merr *multierror.Error
254+
255+
for _, client := range c.sporkClients {
256+
if err := client.Close(); err != nil {
257+
merr = multierror.Append(merr, err)
258+
}
259+
}
260+
err := c.Client.Close()
261+
if err != nil {
262+
merr = multierror.Append(merr, err)
263+
}
264+
265+
return merr.ErrorOrNil()
266+
}

0 commit comments

Comments
 (0)