Skip to content

Commit 0bbbf18

Browse files
authored
Handle sigterm (#150)
### TL;DR Added graceful shutdown support to all orchestrator components using context cancellation. ### What changed? - Modified all orchestrator components (ChainTracker, Committer, FailureRecoverer, Poller, ReorgHandler) to accept a context parameter - Added signal handling for SIGTERM and SIGINT in the main orchestrator - Implemented proper cleanup with `defer ticker.Stop()` in all components - Added new shutdown method to the orchestrator - Added test coverage for graceful shutdown scenarios ### How to test? 1. Run the application 2. Send a SIGTERM signal (`kill -TERM <pid>`) 3. Verify that all components log their shutdown messages 4. Confirm that the application exits cleanly without hanging 5. Run the test suite to verify the new shutdown behavior tests pass ### Why make this change? This change ensures proper resource cleanup and graceful shutdown when the application needs to stop, preventing resource leaks and allowing for clean container orchestration. It's particularly important for cloud deployments where proper shutdown handling is crucial for container lifecycle management.
2 parents 30be13b + a4e99ae commit 0bbbf18

11 files changed

+235
-81
lines changed

cmd/api.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package cmd
22

33
import (
4+
"context"
45
"net/http"
6+
"os/signal"
7+
"syscall"
8+
"time"
59

610
"github.com/gin-gonic/gin"
711
"github.com/rs/zerolog/log"
@@ -40,6 +44,9 @@ var (
4044
func RunApi(cmd *cobra.Command, args []string) {
4145
docs.SwaggerInfo.Host = config.Cfg.API.Host
4246

47+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
48+
defer stop()
49+
4350
r := gin.New()
4451
r.Use(gin.Logger())
4552
r.Use(gin.Recovery())
@@ -83,5 +90,33 @@ func RunApi(cmd *cobra.Command, args []string) {
8390
c.String(http.StatusOK, "ok")
8491
})
8592

86-
r.Run(":3000")
93+
srv := &http.Server{
94+
Addr: ":3000",
95+
Handler: r,
96+
}
97+
98+
// Initializing the server in a goroutine so that
99+
// it won't block the graceful shutdown handling below
100+
go func() {
101+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
102+
log.Fatal().Err(err).Msg("listen: %s\n")
103+
}
104+
}()
105+
106+
// Listen for the interrupt signal.
107+
<-ctx.Done()
108+
109+
// Restore default behavior on the interrupt signal and notify user of shutdown.
110+
stop()
111+
log.Info().Msg("shutting down API gracefully")
112+
113+
// The context is used to inform the server it has 5 seconds to finish
114+
// the request it is currently handling
115+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
116+
defer cancel()
117+
if err := srv.Shutdown(ctx); err != nil {
118+
log.Fatal().Err(err).Msg("API server forced to shutdown")
119+
}
120+
121+
log.Info().Msg("API server exiting")
87122
}

go.mod

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ require (
2727
github.com/beorn7/perks v1.0.1 // indirect
2828
github.com/bits-and-blooms/bitset v1.10.0 // indirect
2929
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
30-
github.com/bytedance/sonic v1.12.3 // indirect
31-
github.com/bytedance/sonic/loader v0.2.0 // indirect
30+
github.com/bytedance/sonic v1.12.6 // indirect
31+
github.com/bytedance/sonic/loader v0.2.1 // indirect
3232
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3333
github.com/cloudwego/base64x v0.1.4 // indirect
3434
github.com/cloudwego/iasm v0.2.0 // indirect
@@ -42,7 +42,7 @@ require (
4242
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
4343
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
4444
github.com/fsnotify/fsnotify v1.7.0 // indirect
45-
github.com/gabriel-vasile/mimetype v1.4.5 // indirect
45+
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
4646
github.com/gin-contrib/sse v0.1.0 // indirect
4747
github.com/go-faster/city v1.0.1 // indirect
4848
github.com/go-faster/errors v0.7.1 // indirect
@@ -53,8 +53,8 @@ require (
5353
github.com/go-openapi/swag v0.23.0 // indirect
5454
github.com/go-playground/locales v0.14.1 // indirect
5555
github.com/go-playground/universal-translator v0.18.1 // indirect
56-
github.com/go-playground/validator/v10 v10.22.1 // indirect
57-
github.com/goccy/go-json v0.10.3 // indirect
56+
github.com/go-playground/validator/v10 v10.23.0 // indirect
57+
github.com/goccy/go-json v0.10.4 // indirect
5858
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
5959
github.com/google/uuid v1.6.0 // indirect
6060
github.com/gorilla/websocket v1.4.2 // indirect
@@ -64,7 +64,7 @@ require (
6464
github.com/josharian/intern v1.0.0 // indirect
6565
github.com/json-iterator/go v1.1.12 // indirect
6666
github.com/klauspost/compress v1.17.11 // indirect
67-
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
67+
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
6868
github.com/leodido/go-urn v1.4.0 // indirect
6969
github.com/magiconair/properties v1.8.7 // indirect
7070
github.com/mailru/easyjson v0.7.7 // indirect
@@ -104,15 +104,15 @@ require (
104104
go.opentelemetry.io/otel v1.26.0 // indirect
105105
go.opentelemetry.io/otel/trace v1.26.0 // indirect
106106
go.uber.org/multierr v1.11.0 // indirect
107-
golang.org/x/arch v0.10.0 // indirect
107+
golang.org/x/arch v0.12.0 // indirect
108108
golang.org/x/crypto v0.32.0 // indirect
109109
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
110110
golang.org/x/net v0.34.0 // indirect
111111
golang.org/x/sync v0.10.0 // indirect
112112
golang.org/x/sys v0.29.0 // indirect
113113
golang.org/x/text v0.21.0 // indirect
114114
golang.org/x/tools v0.25.0 // indirect
115-
google.golang.org/protobuf v1.34.2 // indirect
115+
google.golang.org/protobuf v1.36.1 // indirect
116116
gopkg.in/ini.v1 v1.67.0 // indirect
117117
gopkg.in/yaml.v3 v3.0.1 // indirect
118118
rsc.io/tmplfunc v0.0.3 // indirect

go.sum

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurT
2020
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
2121
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
2222
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
23-
github.com/bytedance/sonic v1.12.3 h1:W2MGa7RCU1QTeYRTPE3+88mVC0yXmsRQRChiyVocVjU=
24-
github.com/bytedance/sonic v1.12.3/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
23+
github.com/bytedance/sonic v1.12.6 h1:/isNmCUF2x3Sh8RAp/4mh4ZGkcFAX/hLrzrK3AvpRzk=
24+
github.com/bytedance/sonic v1.12.6/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
2525
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
26-
github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM=
27-
github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
26+
github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iCjjJv3+E=
27+
github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
2828
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
2929
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3030
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
@@ -77,8 +77,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
7777
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
7878
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
7979
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
80-
github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4=
81-
github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4=
80+
github.com/gabriel-vasile/mimetype v1.4.7 h1:SKFKl7kD0RiPdbht0s7hFtjl489WcQ1VyPW8ZzUMYCA=
81+
github.com/gabriel-vasile/mimetype v1.4.7/go.mod h1:GDlAgAyIRT27BhFl53XNAFtfjzOkLaF35JdEG0P7LtU=
8282
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI=
8383
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww=
8484
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
@@ -110,12 +110,12 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
110110
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
111111
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
112112
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
113-
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
114-
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
113+
github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o=
114+
github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
115115
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
116116
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
117-
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
118-
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
117+
github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM=
118+
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
119119
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
120120
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
121121
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
@@ -169,8 +169,8 @@ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e
169169
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
170170
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
171171
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
172-
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
173-
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
172+
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
173+
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
174174
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
175175
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
176176
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -333,8 +333,8 @@ go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2L
333333
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
334334
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
335335
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
336-
golang.org/x/arch v0.10.0 h1:S3huipmSclq3PJMNe76NGwkBR504WFkQ5dhzWzP8ZW8=
337-
golang.org/x/arch v0.10.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
336+
golang.org/x/arch v0.12.0 h1:UsYJhbzPYGsT0HbEdmYcqtCv8UNGvnaL561NnIUvaKg=
337+
golang.org/x/arch v0.12.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
338338
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
339339
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
340340
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -409,8 +409,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
409409
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
410410
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
411411
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
412-
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
413-
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
412+
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
413+
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
414414
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
415415
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
416416
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

internal/orchestrator/chain_tracker.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package orchestrator
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/rs/zerolog/log"
@@ -22,13 +23,18 @@ func NewChainTracker(rpc rpc.IRPCClient) *ChainTracker {
2223
}
2324
}
2425

25-
func (ct *ChainTracker) Start() {
26+
func (ct *ChainTracker) Start(ctx context.Context) {
2627
interval := time.Duration(ct.triggerIntervalMs) * time.Millisecond
2728
ticker := time.NewTicker(interval)
29+
defer ticker.Stop()
2830

2931
log.Debug().Msgf("Chain tracker running")
30-
go func() {
31-
for range ticker.C {
32+
for {
33+
select {
34+
case <-ctx.Done():
35+
log.Info().Msg("Chain tracker shutting down")
36+
return
37+
case <-ticker.C:
3238
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
3339
if err != nil {
3440
log.Error().Err(err).Msg("Error getting latest block number")
@@ -37,8 +43,5 @@ func (ct *ChainTracker) Start() {
3743
latestBlockNumberFloat, _ := latestBlockNumber.Float64()
3844
metrics.ChainHead.Set(latestBlockNumberFloat)
3945
}
40-
}()
41-
42-
// Keep the program running (otherwise it will exit)
43-
select {}
46+
}
4447
}

internal/orchestrator/committer.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package orchestrator
22

33
import (
4+
"context"
45
"fmt"
56
"math/big"
67
"sort"
@@ -44,13 +45,18 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4445
}
4546
}
4647

47-
func (c *Committer) Start() {
48+
func (c *Committer) Start(ctx context.Context) {
4849
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
4950
ticker := time.NewTicker(interval)
51+
defer ticker.Stop()
5052

5153
log.Debug().Msgf("Committer running")
52-
go func() {
53-
for range ticker.C {
54+
for {
55+
select {
56+
case <-ctx.Done():
57+
log.Info().Msg("Committer shutting down")
58+
return
59+
case <-ticker.C:
5460
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
5561
if err != nil {
5662
log.Error().Err(err).Msg("Error getting block data to commit")
@@ -64,10 +70,7 @@ func (c *Committer) Start() {
6470
log.Error().Err(err).Msg("Error committing blocks")
6571
}
6672
}
67-
}()
68-
69-
// Keep the program running (otherwise it will exit)
70-
select {}
73+
}
7174
}
7275

7376
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {

internal/orchestrator/committer_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package orchestrator
22

33
import (
4+
"context"
45
"math/big"
56
"testing"
67
"time"
@@ -207,12 +208,61 @@ func TestStartCommitter(t *testing.T) {
207208
mockStagingStorage.On("DeleteStagingData", &blockData).Return(nil)
208209

209210
// Start the committer in a goroutine
210-
go committer.Start()
211+
go committer.Start(context.Background())
211212

212213
// Wait for a short time to allow the committer to run
213214
time.Sleep(200 * time.Millisecond)
214215
}
215216

217+
func TestCommitterRespectsSIGTERM(t *testing.T) {
218+
mockRPC := mocks.NewMockIRPCClient(t)
219+
mockMainStorage := mocks.NewMockIMainStorage(t)
220+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
221+
mockStorage := storage.IStorage{
222+
MainStorage: mockMainStorage,
223+
StagingStorage: mockStagingStorage,
224+
}
225+
226+
committer := NewCommitter(mockRPC, mockStorage)
227+
committer.triggerIntervalMs = 100 // Short interval for testing
228+
229+
chainID := big.NewInt(1)
230+
mockRPC.EXPECT().GetChainID().Return(chainID)
231+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
232+
233+
blockData := []common.BlockData{
234+
{Block: common.Block{Number: big.NewInt(101)}},
235+
{Block: common.Block{Number: big.NewInt(102)}},
236+
}
237+
mockStagingStorage.On("GetStagingData", mock.Anything).Return(&blockData, nil)
238+
mockMainStorage.On("InsertBlockData", &blockData).Return(nil)
239+
mockStagingStorage.On("DeleteStagingData", &blockData).Return(nil)
240+
241+
// Create a context that we can cancel
242+
ctx, cancel := context.WithCancel(context.Background())
243+
244+
// Start the committer in a goroutine
245+
done := make(chan struct{})
246+
go func() {
247+
committer.Start(ctx)
248+
close(done)
249+
}()
250+
251+
// Wait a bit to ensure the committer is running
252+
time.Sleep(200 * time.Millisecond)
253+
254+
// Cancel the context (simulating SIGTERM)
255+
cancel()
256+
257+
// Wait for the committer to stop with a timeout
258+
select {
259+
case <-done:
260+
// Success - committer stopped
261+
case <-time.After(2 * time.Second):
262+
t.Fatal("Committer did not stop within timeout period after receiving cancel signal")
263+
}
264+
}
265+
216266
func TestHandleMissingStagingData(t *testing.T) {
217267
defer func() { config.Cfg = config.Config{} }()
218268
config.Cfg.Committer.BlocksPerCommit = 5

internal/orchestrator/failure_recoverer.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package orchestrator
22

33
import (
4+
"context"
45
"fmt"
56
"math/big"
67
"time"
@@ -41,13 +42,19 @@ func NewFailureRecoverer(rpc rpc.IRPCClient, storage storage.IStorage) *FailureR
4142
}
4243
}
4344

44-
func (fr *FailureRecoverer) Start() {
45+
func (fr *FailureRecoverer) Start(ctx context.Context) {
4546
interval := time.Duration(fr.triggerIntervalMs) * time.Millisecond
4647
ticker := time.NewTicker(interval)
48+
defer ticker.Stop()
4749

4850
log.Debug().Msgf("Failure Recovery running")
49-
go func() {
50-
for range ticker.C {
51+
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
log.Info().Msg("Failure recoverer shutting down")
56+
return
57+
case <-ticker.C:
5158
blockFailures, err := fr.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{
5259
ChainId: fr.rpc.GetChainID(),
5360
Limit: fr.failuresPerPoll,
@@ -75,10 +82,7 @@ func (fr *FailureRecoverer) Start() {
7582
metrics.FailureRecovererLastTriggeredBlock.Set(float64(blockFailures[len(blockFailures)-1].BlockNumber.Int64()))
7683
metrics.FirstBlocknumberInFailureRecovererBatch.Set(float64(blockFailures[0].BlockNumber.Int64()))
7784
}
78-
}()
79-
80-
// Keep the program running (otherwise it will exit)
81-
select {}
85+
}
8286
}
8387

8488
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) {

0 commit comments

Comments
 (0)