From 6b6799d4186850e0410a8eb91f6368c8423544f3 Mon Sep 17 00:00:00 2001 From: Julien Harbulot Date: Wed, 1 Apr 2026 21:54:40 +1200 Subject: [PATCH 1/5] Prevent HTTP handler goroutines from blocking forever --- pkgs/operator/instance.go | 13 ++++++++- pkgs/operator/instances_manager.go | 14 ++++++++-- pkgs/operator/state.go | 10 ++++++- pkgs/operator/state_timeout_test.go | 42 +++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 pkgs/operator/state_timeout_test.go diff --git a/pkgs/operator/instance.go b/pkgs/operator/instance.go index c1a88fd3..a5ff1167 100644 --- a/pkgs/operator/instance.go +++ b/pkgs/operator/instance.go @@ -1,6 +1,7 @@ package operator import ( + "context" "crypto/rsa" "fmt" @@ -72,5 +73,15 @@ func (iw *instWrapper) ProcessMessages(msg *wire.MultipleSignedTransports) ([]by return nil, fmt.Errorf("process message: failed to process dkg message: %w", err) } } - return <-iw.respChan, nil + ctx, cancel := context.WithTimeout(context.Background(), MaxInstanceTime) + defer cancel() + select { + case resp, ok := <-iw.respChan: + if !ok { + return nil, fmt.Errorf("process message: response channel closed") + } + return resp, nil + case <-ctx.Done(): + return nil, fmt.Errorf("process message: timed out waiting for response: %w", ctx.Err()) + } } diff --git a/pkgs/operator/instances_manager.go b/pkgs/operator/instances_manager.go index b9acb274..bd37b2e8 100644 --- a/pkgs/operator/instances_manager.go +++ b/pkgs/operator/instances_manager.go @@ -2,6 +2,7 @@ package operator import ( "bytes" + "context" "crypto/rsa" "encoding/hex" "fmt" @@ -76,8 +77,17 @@ func (s *Switch) CreateInstance(reqID [24]byte, operators []*spec.Operator, mess if err := owner.Broadcast(resp); err != nil { return nil, nil, err } - res := <-bchan - return &instWrapper{owner, initiatorPublicKey, bchan}, res, nil + ctx, cancel := context.WithTimeout(context.Background(), MaxInstanceTime) + defer cancel() + select { + case res, ok := <-bchan: + if !ok { + return nil, nil, fmt.Errorf("create instance: response channel closed") + } + return &instWrapper{owner, initiatorPublicKey, bchan}, res, nil + case <-ctx.Done(): + return nil, nil, fmt.Errorf("create instance: timed out waiting for initial response: %w", ctx.Err()) + } } // InitInstance creates a LocalOwner instance and DKG public key message (Exchange) diff --git a/pkgs/operator/state.go b/pkgs/operator/state.go index 7f40474e..7013b066 100644 --- a/pkgs/operator/state.go +++ b/pkgs/operator/state.go @@ -6,6 +6,7 @@ import ( "crypto/rsa" "encoding/hex" "encoding/json" + "errors" "fmt" "sync" "time" @@ -131,7 +132,14 @@ func (s *Switch) ProcessMessage(dkgMsg []byte) ([]byte, error) { if !ok { return nil, utils.ErrMissingInstance } - return inst.ProcessMessages(st) + resp, err := inst.ProcessMessages(st) + if err != nil && errors.Is(err, context.DeadlineExceeded) { + s.Mtx.Lock() + delete(s.Instances, id) + delete(s.InstanceInitTime, id) + s.Mtx.Unlock() + } + return resp, err } func (s *Switch) MarshallAndSign(msg wire.SSZMarshaller, msgType wire.TransportType, operatorID uint64, id [24]byte) ([]byte, error) { diff --git a/pkgs/operator/state_timeout_test.go b/pkgs/operator/state_timeout_test.go new file mode 100644 index 00000000..ca1728bb --- /dev/null +++ b/pkgs/operator/state_timeout_test.go @@ -0,0 +1,42 @@ +package operator + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ssvlabs/ssv-dkg/pkgs/wire" +) + +type timeoutInstance struct{} + +func (timeoutInstance) ProcessMessages(*wire.MultipleSignedTransports) ([]byte, error) { + return nil, fmt.Errorf("timed out: %w", context.DeadlineExceeded) +} + +func (timeoutInstance) VerifyInitiatorMessage([]byte, []byte) error { return nil } + +func TestProcessMessage_DeletesInstanceOnTimeout(t *testing.T) { + var instanceID InstanceID + copy(instanceID[:], "test-instance-id-0123456") + + s := &Switch{ + Instances: map[InstanceID]Instance{instanceID: timeoutInstance{}}, + InstanceInitTime: map[InstanceID]time.Time{instanceID: time.Now()}, + } + + st := &wire.MultipleSignedTransports{Identifier: [24]byte(instanceID)} + raw, err := st.MarshalSSZ() + require.NoError(t, err) + + _, err = s.ProcessMessage(raw) + require.Error(t, err) + + s.Mtx.RLock() + _, ok := s.Instances[instanceID] + s.Mtx.RUnlock() + require.False(t, ok, "expected instance to be removed on timeout") +} From 467866174ce51f0fde1db264aa5cd1c7740c7131 Mon Sep 17 00:00:00 2001 From: Julien Harbulot Date: Sun, 5 Apr 2026 16:46:55 +1200 Subject: [PATCH 2/5] Timeout board channel writes and guard instance cleanup --- pkgs/dkg/board_channel_timeout_test.go | 27 +++++++++++ pkgs/dkg/drand.go | 65 ++++++++++++++++++++++++-- pkgs/operator/instance.go | 2 +- pkgs/operator/instances_manager.go | 3 +- pkgs/operator/state.go | 10 +++- 5 files changed, 98 insertions(+), 9 deletions(-) create mode 100644 pkgs/dkg/board_channel_timeout_test.go diff --git a/pkgs/dkg/board_channel_timeout_test.go b/pkgs/dkg/board_channel_timeout_test.go new file mode 100644 index 00000000..1cb9e3f9 --- /dev/null +++ b/pkgs/dkg/board_channel_timeout_test.go @@ -0,0 +1,27 @@ +package dkg + +import ( + "context" + "testing" + "time" + + kyber_dkg "github.com/drand/kyber/share/dkg" + "github.com/stretchr/testify/require" + + "github.com/ssvlabs/ssv-dkg/pkgs/board" +) + +func TestLocalOwner_SendDealBundle_TimesOut(t *testing.T) { + o := &LocalOwner{ + board: &board.Board{ + DealC: make(chan kyber_dkg.DealBundle), + ResponseC: make(chan kyber_dkg.ResponseBundle), + JustificationC: make(chan kyber_dkg.JustificationBundle), + }, + boardChanTimeout: 10 * time.Millisecond, + } + + err := o.sendDealBundle(kyber_dkg.DealBundle{}) + require.Error(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) +} diff --git a/pkgs/dkg/drand.go b/pkgs/dkg/drand.go index 0149ff54..579beed6 100644 --- a/pkgs/dkg/drand.go +++ b/pkgs/dkg/drand.go @@ -2,9 +2,11 @@ package dkg import ( "bytes" + "context" "crypto/rsa" "encoding/json" "fmt" + "time" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/drand/kyber" @@ -47,6 +49,7 @@ type OwnerOpts struct { Signer crypto.Signer EncryptFunc func([]byte) ([]byte, error) DecryptFunc func([]byte) ([]byte, error) + BoardChanTimeout time.Duration InitiatorPublicKey *rsa.PublicKey OperatorSecretKey *rsa.PrivateKey Owner [20]byte @@ -63,6 +66,7 @@ type LocalOwner struct { ID uint64 data *DKGdata board *board.Board + boardChanTimeout time.Duration Suite pairing.Suite broadcastF func([]byte) error exchanges map[uint64]*wire.Exchange @@ -84,6 +88,7 @@ func New(opts *OwnerOpts) *LocalOwner { startedDKG: make(chan struct{}, 1), ID: opts.ID, broadcastF: opts.BroadcastF, + boardChanTimeout: opts.BoardChanTimeout, exchanges: make(map[uint64]*wire.Exchange), deals: make(map[uint64]*kyber_dkg.DealBundle), signer: opts.Signer, @@ -98,6 +103,46 @@ func New(opts *OwnerOpts) *LocalOwner { return owner } +func (o *LocalOwner) channelTimeout() time.Duration { + if o.boardChanTimeout > 0 { + return o.boardChanTimeout + } + return time.Minute +} + +func (o *LocalOwner) sendDealBundle(bundle kyber_dkg.DealBundle) error { + ctx, cancel := context.WithTimeout(context.Background(), o.channelTimeout()) + defer cancel() + select { + case o.board.DealC <- bundle: + return nil + case <-ctx.Done(): + return fmt.Errorf("timed out sending deal bundle to dkg protocol: %w", ctx.Err()) + } +} + +func (o *LocalOwner) sendResponseBundle(bundle kyber_dkg.ResponseBundle) error { + ctx, cancel := context.WithTimeout(context.Background(), o.channelTimeout()) + defer cancel() + select { + case o.board.ResponseC <- bundle: + return nil + case <-ctx.Done(): + return fmt.Errorf("timed out sending response bundle to dkg protocol: %w", ctx.Err()) + } +} + +func (o *LocalOwner) sendJustificationBundle(bundle kyber_dkg.JustificationBundle) error { + ctx, cancel := context.WithTimeout(context.Background(), o.channelTimeout()) + defer cancel() + select { + case o.board.JustificationC <- bundle: + return nil + case <-ctx.Done(): + return fmt.Errorf("timed out sending justification bundle to dkg protocol: %w", ctx.Err()) + } +} + // StartDKG initializes and starts DKG protocol func (o *LocalOwner) StartDKG() error { o.Logger.Info("Starting DKG") @@ -332,21 +377,27 @@ func (o *LocalOwner) processDKG(from uint64, msg *wire.Transport) error { return err } o.Logger.Debug("operator: received deal bundle from", zap.Uint64("ID", from)) - o.board.DealC <- *b + if err := o.sendDealBundle(*b); err != nil { + return err + } case wire.KyberResponseBundleMessageType: b, err := wire.DecodeResponseBundle(kyberMsg.Data) if err != nil { return err } o.Logger.Debug("operator: received response bundle from", zap.Uint64("ID", from)) - o.board.ResponseC <- *b + if err := o.sendResponseBundle(*b); err != nil { + return err + } case wire.KyberJustificationBundleMessageType: b, err := wire.DecodeJustificationBundle(kyberMsg.Data, o.Suite.G1().(kyber_dkg.Suite)) if err != nil { return err } o.Logger.Debug("operator: received justification bundle from", zap.Uint64("ID", from)) - o.board.JustificationC <- *b + if err := o.sendJustificationBundle(*b); err != nil { + return err + } default: return fmt.Errorf("unknown kyber message type") } @@ -784,7 +835,9 @@ func (o *LocalOwner) StartReshareDKGNewNodes() error { } for _, b := range o.deals { o.Logger.Info("Pushing deal", zap.Any("Deal", *b)) - o.board.DealC <- *b + if err := o.sendDealBundle(*b); err != nil { + return err + } } go func(p *kyber_dkg.Protocol, postF func(res *kyber_dkg.OptionResult) error) { res := <-p.WaitEnd() @@ -799,7 +852,9 @@ func (o *LocalOwner) StartReshareDKGNewNodes() error { func (o *LocalOwner) PushDealsOldNodes() error { for _, b := range o.deals { - o.board.DealC <- *b + if err := o.sendDealBundle(*b); err != nil { + return err + } } return nil } diff --git a/pkgs/operator/instance.go b/pkgs/operator/instance.go index a5ff1167..365d04df 100644 --- a/pkgs/operator/instance.go +++ b/pkgs/operator/instance.go @@ -73,7 +73,7 @@ func (iw *instWrapper) ProcessMessages(msg *wire.MultipleSignedTransports) ([]by return nil, fmt.Errorf("process message: failed to process dkg message: %w", err) } } - ctx, cancel := context.WithTimeout(context.Background(), MaxInstanceTime) + ctx, cancel := context.WithTimeout(context.Background(), MaxInstancePhaseTimeout) defer cancel() select { case resp, ok := <-iw.respChan: diff --git a/pkgs/operator/instances_manager.go b/pkgs/operator/instances_manager.go index bd37b2e8..d7854ea9 100644 --- a/pkgs/operator/instances_manager.go +++ b/pkgs/operator/instances_manager.go @@ -41,6 +41,7 @@ func (s *Switch) CreateInstance(reqID [24]byte, operators []*spec.Operator, mess Signer: crypto.RSASigner(s.PrivateKey), EncryptFunc: s.Encrypt, DecryptFunc: s.Decrypt, + BoardChanTimeout: MaxInstancePhaseTimeout, Suite: kyber_bls12381.NewBLS12381Suite(), ID: operatorID, InitiatorPublicKey: initiatorPublicKey, @@ -77,7 +78,7 @@ func (s *Switch) CreateInstance(reqID [24]byte, operators []*spec.Operator, mess if err := owner.Broadcast(resp); err != nil { return nil, nil, err } - ctx, cancel := context.WithTimeout(context.Background(), MaxInstanceTime) + ctx, cancel := context.WithTimeout(context.Background(), MaxInstancePhaseTimeout) defer cancel() select { case res, ok := <-bchan: diff --git a/pkgs/operator/state.go b/pkgs/operator/state.go index 7013b066..4c8b9877 100644 --- a/pkgs/operator/state.go +++ b/pkgs/operator/state.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "sync" "time" @@ -28,6 +29,7 @@ import ( const MaxInstances = 1024 const MaxInstanceTime = 1 * time.Minute +const MaxInstancePhaseTimeout = MaxInstanceTime // InstanceID each new DKG ceremony has a unique random ID that we can identify messages and be able to process them in parallel type InstanceID [24]byte @@ -135,8 +137,12 @@ func (s *Switch) ProcessMessage(dkgMsg []byte) ([]byte, error) { resp, err := inst.ProcessMessages(st) if err != nil && errors.Is(err, context.DeadlineExceeded) { s.Mtx.Lock() - delete(s.Instances, id) - delete(s.InstanceInitTime, id) + current, ok := s.Instances[id] + instType := reflect.TypeOf(inst) + if ok && instType != nil && reflect.TypeOf(current) == instType && instType.Comparable() && current == inst { + delete(s.Instances, id) + delete(s.InstanceInitTime, id) + } s.Mtx.Unlock() } return resp, err From 285494ef1bce4021facbd6d576687978b3cabf3b Mon Sep 17 00:00:00 2001 From: Julien Harbulot Date: Mon, 6 Apr 2026 02:03:28 +1200 Subject: [PATCH 3/5] Simplify instance timeout cleanup identity check --- pkgs/operator/state.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkgs/operator/state.go b/pkgs/operator/state.go index 4c8b9877..852dac05 100644 --- a/pkgs/operator/state.go +++ b/pkgs/operator/state.go @@ -8,7 +8,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "sync" "time" @@ -138,8 +137,7 @@ func (s *Switch) ProcessMessage(dkgMsg []byte) ([]byte, error) { if err != nil && errors.Is(err, context.DeadlineExceeded) { s.Mtx.Lock() current, ok := s.Instances[id] - instType := reflect.TypeOf(inst) - if ok && instType != nil && reflect.TypeOf(current) == instType && instType.Comparable() && current == inst { + if ok && current == inst { delete(s.Instances, id) delete(s.InstanceInitTime, id) } From 6b035f8e656bfbd105ec210aa3e54830a89dd0a5 Mon Sep 17 00:00:00 2001 From: Julien Harbulot Date: Wed, 8 Apr 2026 19:43:23 +1200 Subject: [PATCH 4/5] Simplify timeout cleanup condition --- pkgs/operator/state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/operator/state.go b/pkgs/operator/state.go index 852dac05..fb0b7cb2 100644 --- a/pkgs/operator/state.go +++ b/pkgs/operator/state.go @@ -134,7 +134,7 @@ func (s *Switch) ProcessMessage(dkgMsg []byte) ([]byte, error) { return nil, utils.ErrMissingInstance } resp, err := inst.ProcessMessages(st) - if err != nil && errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, context.DeadlineExceeded) { s.Mtx.Lock() current, ok := s.Instances[id] if ok && current == inst { From b983922e1a7b1c1b5689217294893273d70eeeff Mon Sep 17 00:00:00 2001 From: Julien Harbulot Date: Mon, 13 Apr 2026 02:27:03 +1200 Subject: [PATCH 5/5] Upgrade Go toolchain to 1.25.9 --- .github/workflows/ci.yml | 2 +- .github/workflows/releases.yml | 2 +- Dockerfile | 2 +- README.md | 8 ++++---- go.mod | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6aad17b2..e5207528 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ concurrency: cancel-in-progress: true env: - GO_VERSION: "1.25.8" + GO_VERSION: "1.25.9" CGO_ENABLED: "1" jobs: diff --git a/.github/workflows/releases.yml b/.github/workflows/releases.yml index 9fba5794..2098efd2 100644 --- a/.github/workflows/releases.yml +++ b/.github/workflows/releases.yml @@ -13,7 +13,7 @@ concurrency: cancel-in-progress: false env: - GO_VERSION: "1.25.8" + GO_VERSION: "1.25.9" jobs: release: diff --git a/Dockerfile b/Dockerfile index 217b287d..3a8d2ce9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.25.8-alpine@sha256:8e02eb337d9e0ea459e041f1ee5eece41cbb61f1d83e7d883a3e2fb4862063fa AS build +FROM golang:1.25.9-alpine@sha256:7a00384194cf2cb68924bbb918d675f1517357433c8541bac0ab2f929b9d5447 AS build ARG VERSION=dev diff --git a/README.md b/README.md index 175ecdf3..a0337378 100644 --- a/README.md +++ b/README.md @@ -200,11 +200,11 @@ Due to Windows operating system's [limitation](https://learn.microsoft.com/en-us #### Build from source -To build from source you'll need to have Go version 1.25 installed on your system +To build from source you'll need to have Go version 1.25.9 installed on your system ##### Build -A prerequisite for this is to have `go` version 1.25 installed on the system, and an optional requirement is to have the `make` tool installed as well (alternatively you could run the corresponding command defined in the `Makefile`). +A prerequisite for this is to have `go` version 1.25.9 installed on the system, and an optional requirement is to have the `make` tool installed as well (alternatively you could run the corresponding command defined in the `Makefile`). ```sh make install @@ -433,11 +433,11 @@ You can, of course, change the configuration above to one that suits you better, #### Build from source -To build from source you'll need to have Go version 1.25 installed on your system +To build from source you'll need to have Go version 1.25.9 installed on your system ##### Build -A prerequisite for this is to have `go` version 1.25 installed on the system, and an optional requirement is to have the `make` tool installed as well (alternatively you could run the corresponding command defined in the `Makefile`). +A prerequisite for this is to have `go` version 1.25.9 installed on the system, and an optional requirement is to have the `make` tool installed as well (alternatively you could run the corresponding command defined in the `Makefile`). ```sh make install diff --git a/go.mod b/go.mod index d85f148a..fc0f3faa 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ssvlabs/ssv-dkg -go 1.25.8 +go 1.25.9 require ( github.com/aquasecurity/table v1.11.0