Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions apps/evm/single/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"errors"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -36,9 +38,24 @@ func InitCmd() *cobra.Command {
return fmt.Errorf("error validating config: %w", err)
}

passphrase, err := cmd.Flags().GetString(rollconf.FlagSignerPassphrase)
// Get passphrase file path
passphraseFile, err := cmd.Flags().GetString(rollconf.FlagSignerPassphraseFile)
if err != nil {
return fmt.Errorf("error reading passphrase flag: %w", err)
return fmt.Errorf("failed to get '%s' flag: %w", rollconf.FlagSignerPassphraseFile, err)
}

var passphrase string
if passphraseFile != "" {
// Read passphrase from file
passphraseBytes, err := os.ReadFile(passphraseFile)
if err != nil {
return fmt.Errorf("failed to read passphrase from file '%s': %w", passphraseFile, err)
}
passphrase = string(strings.TrimSpace(string(passphraseBytes)))

if passphrase == "" {
return fmt.Errorf("passphrase file '%s' is empty", passphraseFile)
}
}

proposerAddress, err := rollcmd.CreateSigner(&cfg, homePath, passphrase)
Expand Down
26 changes: 23 additions & 3 deletions apps/evm/single/cmd/run.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cmd

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"

"github.com/evstack/ev-node/core/da"
Expand Down Expand Up @@ -119,10 +121,28 @@ func createExecutionClient(cmd *cobra.Command) (execution.Executor, error) {
if err != nil {
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEngineURL, err)
}
jwtSecret, err := cmd.Flags().GetString(evm.FlagEvmJWTSecret)

// Get JWT secret file path
jwtSecretFile, err := cmd.Flags().GetString(evm.FlagEvmJWTSecretFile)
if err != nil {
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmJWTSecret, err)
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmJWTSecretFile, err)
}

if jwtSecretFile == "" {
return nil, fmt.Errorf("JWT secret file must be provided via --evm.jwt-secret-file")
}

// Read JWT secret from file
secretBytes, err := os.ReadFile(jwtSecretFile)
if err != nil {
return nil, fmt.Errorf("failed to read JWT secret from file '%s': %w", jwtSecretFile, err)
}
jwtSecret := string(bytes.TrimSpace(secretBytes))

if jwtSecret == "" {
return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile)
}

genesisHashStr, err := cmd.Flags().GetString(evm.FlagEvmGenesisHash)
if err != nil {
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmGenesisHash, err)
Expand All @@ -143,7 +163,7 @@ func createExecutionClient(cmd *cobra.Command) (execution.Executor, error) {
func addFlags(cmd *cobra.Command) {
cmd.Flags().String(evm.FlagEvmEthURL, "http://localhost:8545", "URL of the Ethereum JSON-RPC endpoint")
cmd.Flags().String(evm.FlagEvmEngineURL, "http://localhost:8551", "URL of the Engine API endpoint")
cmd.Flags().String(evm.FlagEvmJWTSecret, "", "The JWT secret for authentication with the execution client")
cmd.Flags().String(evm.FlagEvmJWTSecretFile, "", "Path to file containing the JWT secret for authentication")
cmd.Flags().String(evm.FlagEvmGenesisHash, "", "Hash of the genesis block")
cmd.Flags().String(evm.FlagEvmFeeRecipient, "", "Address that will receive transaction fees")
}
21 changes: 19 additions & 2 deletions apps/grpc/single/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"errors"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -38,9 +40,24 @@ This will create the necessary configuration structure in the specified root dir
return fmt.Errorf("error validating config: %w", err)
}

passphrase, err := cmd.Flags().GetString(rollconf.FlagSignerPassphrase)
// Get passphrase file path
passphraseFile, err := cmd.Flags().GetString(rollconf.FlagSignerPassphraseFile)
if err != nil {
return fmt.Errorf("error reading passphrase flag: %w", err)
return fmt.Errorf("failed to get '%s' flag: %w", rollconf.FlagSignerPassphraseFile, err)
}

var passphrase string
if passphraseFile != "" {
// Read passphrase from file
passphraseBytes, err := os.ReadFile(passphraseFile)
if err != nil {
return fmt.Errorf("failed to read passphrase from file '%s': %w", passphraseFile, err)
}
passphrase = string(strings.TrimSpace(string(passphraseBytes)))

if passphrase == "" {
return fmt.Errorf("passphrase file '%s' is empty", passphraseFile)
}
}

proposerAddress, err := rollcmd.CreateSigner(&cfg, homePath, passphrase)
Expand Down
21 changes: 19 additions & 2 deletions apps/testapp/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"errors"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -36,9 +38,24 @@ func InitCmd() *cobra.Command {
return fmt.Errorf("error validating config: %w", err)
}

passphrase, err := cmd.Flags().GetString(rollconf.FlagSignerPassphrase)
// Get passphrase file path
passphraseFile, err := cmd.Flags().GetString(rollconf.FlagSignerPassphraseFile)
if err != nil {
return fmt.Errorf("error reading passphrase flag: %w", err)
return fmt.Errorf("failed to get '%s' flag: %w", rollconf.FlagSignerPassphraseFile, err)
}

var passphrase string
if passphraseFile != "" {
// Read passphrase from file
passphraseBytes, err := os.ReadFile(passphraseFile)
if err != nil {
return fmt.Errorf("failed to read passphrase from file '%s': %w", passphraseFile, err)
}
passphrase = string(strings.TrimSpace(string(passphraseBytes)))

if passphrase == "" {
return fmt.Errorf("passphrase file '%s' is empty", passphraseFile)
}
}

proposerAddress, err := rollcmd.CreateSigner(&cfg, homePath, passphrase)
Expand Down
6 changes: 5 additions & 1 deletion apps/testapp/cmd/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ func TestInitCommand(t *testing.T) {
// Register all persistent flags from root command
rollconf.AddGlobalFlags(cmd, "testapp")

// Create passphrase file
passphraseFile := filepath.Join(dir, "passphrase")
require.NoError(t, os.WriteFile(passphraseFile, []byte("test"), 0600))

// Set home flag to the test directory
cmd.SetArgs([]string{"init", "--home", dir, "--rollkit.node.aggregator", "--rollkit.signer.passphrase", "test"})
cmd.SetArgs([]string{"init", "--home", dir, "--evnode.node.aggregator", "--evnode.signer.passphrase_file", passphraseFile})

// Execute the command
err = cmd.Execute()
Expand Down
3 changes: 3 additions & 0 deletions block/internal/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ var (

// ErrHeightFromFutureStr is the error message for height from future returned by da
ErrHeightFromFutureStr = errors.New("given height is from the future")

// ErrOversizedItem is an unrecoverable error indicating a single item exceeds DA blob size limit
ErrOversizedItem = errors.New("single item exceeds DA blob size limit")
)
28 changes: 26 additions & 2 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ import (
"github.com/evstack/ev-node/types"
)

const defaultMaxBlobSize = 1.5 * 1024 * 1024 // 1.5MB fallback blob size limit

var (
// errBlockValidation indicates a permanent validation error that won't resolve on retry
errBlockValidation = errors.New("block validation failed")
)

// Executor handles block production, transaction processing, and state management
type Executor struct {
// Core components
Expand Down Expand Up @@ -281,6 +288,14 @@ func (e *Executor) executionLoop() {

if err := e.produceBlock(); err != nil {
e.logger.Error().Err(err).Msg("failed to produce block")
// Check for permanent validation errors that won't resolve on retry
if errors.Is(err, errBlockValidation) {
e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production")
if e.cancel != nil {
e.cancel()
}
return
}
}
txsAvailable = false
// Always reset block timer to keep ticking
Expand All @@ -290,6 +305,14 @@ func (e *Executor) executionLoop() {
e.logger.Debug().Msg("Lazy timer triggered block production")
if err := e.produceBlock(); err != nil {
e.logger.Error().Err(err).Msg("failed to produce block from lazy timer")
// Check for permanent validation errors that won't resolve on retry
if errors.Is(err, errBlockValidation) {
e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production")
if e.cancel != nil {
e.cancel()
}
return
}
}
// Reset lazy timer
lazyTimer.Reset(e.config.Node.LazyBlockInterval.Duration)
Expand Down Expand Up @@ -389,7 +412,7 @@ func (e *Executor) produceBlock() error {

if err := e.validateBlock(currentState, header, data); err != nil {
e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err))
return fmt.Errorf("failed to validate block: %w", err)
return fmt.Errorf("%w: %w", errBlockValidation, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that looks strange, wrapping 2 errors into one. You can use errors join.

}

batch, err := e.store.NewBatch(e.ctx)
Expand Down Expand Up @@ -439,7 +462,8 @@ func (e *Executor) produceBlock() error {
// retrieveBatch gets the next batch of transactions from the sequencer
func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {
req := coresequencer.GetNextBatchRequest{
Id: []byte(e.genesis.ChainID),
Id: []byte(e.genesis.ChainID),
MaxBytes: defaultMaxBlobSize,
}

res, err := e.sequencer.GetNextBatch(ctx, req)
Expand Down
57 changes: 48 additions & 9 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ import (
"github.com/evstack/ev-node/pkg/store"
)

// DefaultInterval is the default reaper interval
const DefaultInterval = 1 * time.Second
const (
// DefaultInterval is the default reaper interval
DefaultInterval = 1 * time.Second
// MaxBackoffInterval is the maximum backoff interval for retries
MaxBackoffInterval = 30 * time.Second
// BackoffMultiplier is the multiplier for exponential backoff
BackoffMultiplier = 2
)

// Reaper is responsible for periodically retrieving transactions from the executor,
// filtering out already seen transactions, and submitting new transactions to the sequencer.
Expand Down Expand Up @@ -76,7 +82,7 @@ func NewReaper(
func (r *Reaper) Start(ctx context.Context) error {
r.ctx, r.cancel = context.WithCancel(ctx)

// Start repear loop
// Start reaper loop
r.wg.Add(1)
go func() {
defer r.wg.Done()
Expand All @@ -91,12 +97,35 @@ func (r *Reaper) reaperLoop() {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()

consecutiveFailures := 0

for {
select {
case <-r.ctx.Done():
return
case <-ticker.C:
r.SubmitTxs()
err := r.SubmitTxs()
if err != nil {
// Increment failure counter and apply exponential backoff
consecutiveFailures++
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5)) // Cap at 2^5 = 32x
backoff = min(backoff, MaxBackoffInterval)
r.logger.Warn().
Err(err).
Int("consecutive_failures", consecutiveFailures).
Dur("next_retry_in", backoff).
Msg("reaper encountered error, applying backoff")

// Reset ticker with backoff interval
ticker.Reset(backoff)
} else {
// Reset failure counter and backoff on success
if consecutiveFailures > 0 {
r.logger.Info().Msg("reaper recovered from errors, resetting backoff")
consecutiveFailures = 0
ticker.Reset(r.interval)
}
}
}
}
}
Expand All @@ -113,34 +142,42 @@ func (r *Reaper) Stop() error {
}

// SubmitTxs retrieves transactions from the executor and submits them to the sequencer.
func (r *Reaper) SubmitTxs() {
// Returns an error if any critical operation fails.
func (r *Reaper) SubmitTxs() error {
txs, err := r.exec.GetTxs(r.ctx)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get txs from executor")
return
return fmt.Errorf("failed to get txs from executor: %w", err)
}
if len(txs) == 0 {
r.logger.Debug().Msg("no new txs")
return
return nil
}

var newTxs [][]byte
var seenStoreErrors int
for _, tx := range txs {
txHash := hashTx(tx)
key := ds.NewKey(txHash)
has, err := r.seenStore.Has(r.ctx, key)
if err != nil {
r.logger.Error().Err(err).Msg("failed to check seenStore")
seenStoreErrors++
continue
}
if !has {
newTxs = append(newTxs, tx)
}
}

// If all transactions failed seenStore check, return error
if seenStoreErrors > 0 && len(newTxs) == 0 {
return fmt.Errorf("failed to check seenStore for all %d transactions", seenStoreErrors)
}

if len(newTxs) == 0 {
r.logger.Debug().Msg("no new txs to submit")
return
return nil
}

r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer")
Expand All @@ -151,13 +188,14 @@ func (r *Reaper) SubmitTxs() {
})
if err != nil {
r.logger.Error().Err(err).Msg("failed to submit txs to sequencer")
return
return fmt.Errorf("failed to submit txs to sequencer: %w", err)
}

for _, tx := range newTxs {
txHash := hashTx(tx)
key := ds.NewKey(txHash)
if err := r.seenStore.Put(r.ctx, key, []byte{1}); err != nil {
// Log but don't fail on persistence errors
r.logger.Error().Err(err).Str("txHash", txHash).Msg("failed to persist seen tx")
}
}
Expand All @@ -169,6 +207,7 @@ func (r *Reaper) SubmitTxs() {
}

r.logger.Debug().Msg("successfully submitted txs")
return nil
}

// SeenStore returns the datastore used to track seen transactions.
Expand Down
Loading
Loading