Skip to content

Commit d50a707

Browse files
authored
fix: add cleanup in multiplexer to stop started apps (#15)
1 parent 426aefe commit d50a707

File tree

2 files changed

+83
-18
lines changed

2 files changed

+83
-18
lines changed

abci/multiplexer.go

+46-18
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
const flagGRPCAddress = "grpc.address"
2626

27-
type multiplexer struct {
27+
type Multiplexer struct {
2828
mu sync.Mutex
2929

3030
currentHeight, lastHeight int64
@@ -39,7 +39,7 @@ type multiplexer struct {
3939

4040
// NewMultiplexer creates a new ABCI wrapper for multiplexing
4141
func NewMultiplexer(latestApp servertypes.ABCI, versions Versions, v *viper.Viper, home string) (abci.Application, error) {
42-
wrapper := &multiplexer{
42+
wrapper := &Multiplexer{
4343
latestApp: latestApp,
4444
versions: versions,
4545
}
@@ -101,7 +101,7 @@ func NewMultiplexer(latestApp servertypes.ABCI, versions Versions, v *viper.Vipe
101101
return wrapper, nil
102102
}
103103

104-
func (m *multiplexer) getLatestHeight(rootDir string, v *viper.Viper) (int64, error) {
104+
func (m *Multiplexer) getLatestHeight(rootDir string, v *viper.Viper) (int64, error) {
105105
dataDir := filepath.Join(rootDir, "data")
106106
db, err := dbm.NewDB("application", server.GetAppDBBackend(v), dataDir)
107107
if err != nil {
@@ -114,7 +114,7 @@ func (m *multiplexer) getLatestHeight(rootDir string, v *viper.Viper) (int64, er
114114
}
115115

116116
// getAppForHeight gets the appropriate app based on height
117-
func (m *multiplexer) getAppForHeight(height int64) (servertypes.ABCI, error) {
117+
func (m *Multiplexer) getAppForHeight(height int64) (servertypes.ABCI, error) {
118118
m.mu.Lock()
119119
defer m.mu.Unlock()
120120

@@ -179,31 +179,59 @@ func (m *multiplexer) getAppForHeight(height int64) (servertypes.ABCI, error) {
179179
return NewRemoteABCIClient(m.conn), nil
180180
}
181181

182-
func (m *multiplexer) ApplySnapshotChunk(_ context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) {
182+
// Cleanup allows proper multiplexer termination.
183+
func (m *Multiplexer) Cleanup() error {
184+
m.mu.Lock()
185+
defer m.mu.Unlock()
186+
187+
var errs error
188+
189+
// stop any running app
190+
if m.activeVersion.Appd != nil && m.activeVersion.Appd.Pid() != appd.AppdStopped {
191+
log.Printf("Stopping app for height %d", m.activeVersion.UntilHeight)
192+
if err := m.activeVersion.Appd.Stop(); err != nil {
193+
errs = errors.Join(errs, fmt.Errorf("failed to stop app for height %d: %w", m.activeVersion.UntilHeight, err))
194+
}
195+
m.started = false
196+
m.activeVersion = Version{}
197+
}
198+
199+
// close gRPC connection
200+
if m.conn != nil {
201+
if err := m.conn.Close(); err != nil {
202+
errs = errors.Join(errs, fmt.Errorf("failed to close gRPC connection: %w", err))
203+
}
204+
m.conn = nil
205+
}
206+
207+
return errs
208+
}
209+
210+
func (m *Multiplexer) ApplySnapshotChunk(_ context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) {
183211
app, err := m.getAppForHeight(m.lastHeight)
184212
if err != nil {
185213
return nil, fmt.Errorf("failed to get app for height %d: %w", m.lastHeight, err)
186214
}
187215
return app.ApplySnapshotChunk(req)
188216
}
189217

190-
func (m *multiplexer) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
218+
func (m *Multiplexer) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
191219
app, err := m.getAppForHeight(m.lastHeight)
192220
if err != nil {
193221
return nil, fmt.Errorf("failed to get app for height %d: %w", m.lastHeight, err)
194222
}
195223
return app.CheckTx(req)
196224
}
197225

198-
func (m *multiplexer) Commit(context.Context, *abci.RequestCommit) (*abci.ResponseCommit, error) {
226+
func (m *Multiplexer) Commit(context.Context, *abci.RequestCommit) (*abci.ResponseCommit, error) {
199227
app, err := m.getAppForHeight(m.lastHeight)
200228
if err != nil {
201229
return nil, fmt.Errorf("failed to get app for height %d: %w", m.lastHeight, err)
202230
}
203231
return app.Commit()
204232
}
205233

206-
func (m *multiplexer) ExtendVote(ctx context.Context, req *abci.RequestExtendVote) (*abci.ResponseExtendVote, error) {
234+
func (m *Multiplexer) ExtendVote(ctx context.Context, req *abci.RequestExtendVote) (*abci.ResponseExtendVote, error) {
207235
m.lastHeight = req.Height
208236
app, err := m.getAppForHeight(req.Height)
209237
if err != nil {
@@ -212,7 +240,7 @@ func (m *multiplexer) ExtendVote(ctx context.Context, req *abci.RequestExtendVot
212240
return app.ExtendVote(ctx, req)
213241
}
214242

215-
func (m *multiplexer) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
243+
func (m *Multiplexer) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
216244
m.lastHeight = req.Height
217245
app, err := m.getAppForHeight(req.Height)
218246
if err != nil {
@@ -221,43 +249,43 @@ func (m *multiplexer) FinalizeBlock(_ context.Context, req *abci.RequestFinalize
221249
return app.FinalizeBlock(req)
222250
}
223251

224-
func (m *multiplexer) Info(_ context.Context, req *abci.RequestInfo) (*abci.ResponseInfo, error) {
252+
func (m *Multiplexer) Info(_ context.Context, req *abci.RequestInfo) (*abci.ResponseInfo, error) {
225253
return m.latestApp.Info(req) // Always use latest app for Info
226254
}
227255

228-
func (m *multiplexer) InitChain(_ context.Context, req *abci.RequestInitChain) (*abci.ResponseInitChain, error) {
256+
func (m *Multiplexer) InitChain(_ context.Context, req *abci.RequestInitChain) (*abci.ResponseInitChain, error) {
229257
app, err := m.getAppForHeight(0)
230258
if err != nil {
231259
return nil, fmt.Errorf("failed to get app for genesis: %w", err)
232260
}
233261
return app.InitChain(req)
234262
}
235263

236-
func (m *multiplexer) ListSnapshots(_ context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) {
264+
func (m *Multiplexer) ListSnapshots(_ context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) {
237265
app, err := m.getAppForHeight(m.lastHeight)
238266
if err != nil {
239267
return nil, fmt.Errorf("failed to get app for height %d: %w", m.lastHeight, err)
240268
}
241269
return app.ListSnapshots(req)
242270
}
243271

244-
func (m *multiplexer) LoadSnapshotChunk(_ context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) {
272+
func (m *Multiplexer) LoadSnapshotChunk(_ context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) {
245273
app, err := m.getAppForHeight(int64(req.Height))
246274
if err != nil {
247275
return nil, fmt.Errorf("failed to get app for height %d: %w", req.Height, err)
248276
}
249277
return app.LoadSnapshotChunk(req)
250278
}
251279

252-
func (m *multiplexer) OfferSnapshot(_ context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) {
280+
func (m *Multiplexer) OfferSnapshot(_ context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) {
253281
app, err := m.getAppForHeight(m.lastHeight)
254282
if err != nil {
255283
return nil, fmt.Errorf("failed to get app for height %d: %w", m.lastHeight, err)
256284
}
257285
return app.OfferSnapshot(req)
258286
}
259287

260-
func (m *multiplexer) PrepareProposal(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
288+
func (m *Multiplexer) PrepareProposal(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
261289
m.lastHeight = req.Height
262290
app, err := m.getAppForHeight(req.Height)
263291
if err != nil {
@@ -266,7 +294,7 @@ func (m *multiplexer) PrepareProposal(_ context.Context, req *abci.RequestPrepar
266294
return app.PrepareProposal(req)
267295
}
268296

269-
func (m *multiplexer) ProcessProposal(_ context.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
297+
func (m *Multiplexer) ProcessProposal(_ context.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
270298
m.lastHeight = req.Height
271299
app, err := m.getAppForHeight(req.Height)
272300
if err != nil {
@@ -275,15 +303,15 @@ func (m *multiplexer) ProcessProposal(_ context.Context, req *abci.RequestProces
275303
return app.ProcessProposal(req)
276304
}
277305

278-
func (m *multiplexer) Query(ctx context.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
306+
func (m *Multiplexer) Query(ctx context.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
279307
app, err := m.getAppForHeight(req.Height)
280308
if err != nil {
281309
return nil, fmt.Errorf("failed to get app for height %d: %w", req.Height, err)
282310
}
283311
return app.Query(ctx, req)
284312
}
285313

286-
func (m *multiplexer) VerifyVoteExtension(_ context.Context, req *abci.RequestVerifyVoteExtension) (*abci.ResponseVerifyVoteExtension, error) {
314+
func (m *Multiplexer) VerifyVoteExtension(_ context.Context, req *abci.RequestVerifyVoteExtension) (*abci.ResponseVerifyVoteExtension, error) {
287315
m.lastHeight = req.Height
288316
app, err := m.getAppForHeight(req.Height)
289317
if err != nil {

start.go

+37
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ import (
66
"io"
77
"net"
88
"os"
9+
"os/signal"
910
"path/filepath"
11+
"syscall"
1012

1113
"github.com/01builders/nova/abci"
1214
cometserver "github.com/cometbft/cometbft/abci/server"
15+
cmtabci "github.com/cometbft/cometbft/abci/types"
1316
cmtcfg "github.com/cometbft/cometbft/config"
1417
"github.com/cometbft/cometbft/node"
1518
"github.com/cometbft/cometbft/p2p"
@@ -95,6 +98,9 @@ func startStandAlone(versions abci.Versions, svrCtx *server.Context, svrCfg serv
9598
return err
9699
}
97100

101+
// register cleanup handler for remote apps
102+
setupRemoteAppCleanup(cmtApp, svrCtx)
103+
98104
svr, err := cometserver.NewServer(addr, transport, cmtApp)
99105
if err != nil {
100106
return fmt.Errorf("error creating listener: %v", err)
@@ -216,6 +222,9 @@ func startCmtNode(
216222
return nil, cleanupFn, err
217223
}
218224

225+
// Register cleanup handler for remote apps
226+
setupRemoteAppCleanup(cmtApp, svrCtx)
227+
219228
tmNode, err = node.NewNodeWithContext(
220229
ctx,
221230
cfg,
@@ -240,11 +249,39 @@ func startCmtNode(
240249
if tmNode != nil && tmNode.IsRunning() {
241250
_ = tmNode.Stop()
242251
}
252+
253+
// also ensure we stop any remote apps
254+
if multiplexer, ok := cmtApp.(*abci.Multiplexer); ok {
255+
_ = multiplexer.Cleanup()
256+
}
243257
}
244258

245259
return tmNode, cleanupFn, nil
246260
}
247261

262+
// setupRemoteAppCleanup ensures that remote app processes are terminated when the main process receives termination signals
263+
func setupRemoteAppCleanup(cmtApp cmtabci.Application, svrCtx *server.Context) {
264+
if multiplexer, ok := cmtApp.(*abci.Multiplexer); ok {
265+
sigCh := make(chan os.Signal, 1)
266+
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
267+
268+
go func() {
269+
sig := <-sigCh
270+
svrCtx.Logger.Info("Received signal, stopping remote apps...", "signal", sig)
271+
272+
if err := multiplexer.Cleanup(); err != nil {
273+
svrCtx.Logger.Error("Error stopping remote apps", "err", err)
274+
} else {
275+
svrCtx.Logger.Info("Successfully stopped remote apps")
276+
}
277+
278+
// Re-send the signal to allow the normal process termination
279+
signal.Reset(os.Interrupt, syscall.SIGTERM)
280+
syscall.Kill(os.Getpid(), sig.(syscall.Signal))
281+
}()
282+
}
283+
}
284+
248285
func getAndValidateConfig(svrCtx *server.Context) (serverconfig.Config, error) {
249286
config, err := serverconfig.GetConfig(svrCtx.Viper)
250287
if err != nil {

0 commit comments

Comments
 (0)