diff --git a/apps/evm/single/main.go b/apps/evm/single/main.go index a562ceff7c..a27872ea2e 100644 --- a/apps/evm/single/main.go +++ b/apps/evm/single/main.go @@ -23,11 +23,17 @@ func main() { // Add configuration flags to NetInfoCmd so it can read RPC address config.AddFlags(rollcmd.NetInfoCmd) + backupCmd := rollcmd.NewBackupCmd() + config.AddFlags(backupCmd) + restoreCmd := rollcmd.NewRestoreCmd() + config.AddFlags(restoreCmd) rootCmd.AddCommand( cmd.InitCmd(), cmd.RunCmd, cmd.NewRollbackCmd(), + backupCmd, + restoreCmd, rollcmd.VersionCmd, rollcmd.NetInfoCmd, rollcmd.StoreUnsafeCleanCmd, diff --git a/apps/testapp/main.go b/apps/testapp/main.go index cd8f019709..64ab9fc42e 100644 --- a/apps/testapp/main.go +++ b/apps/testapp/main.go @@ -6,6 +6,7 @@ import ( cmds "github.com/evstack/ev-node/apps/testapp/cmd" rollcmd "github.com/evstack/ev-node/pkg/cmd" + "github.com/evstack/ev-node/pkg/config" ) func main() { @@ -13,6 +14,12 @@ func main() { rootCmd := cmds.RootCmd initCmd := cmds.InitCmd() + // Add configuration flags to backup and restore commands + backupCmd := rollcmd.NewBackupCmd() + config.AddFlags(backupCmd) + restoreCmd := rollcmd.NewRestoreCmd() + config.AddFlags(restoreCmd) + // Add subcommands to the root command rootCmd.AddCommand( cmds.RunCmd, @@ -21,6 +28,8 @@ func main() { rollcmd.StoreUnsafeCleanCmd, rollcmd.KeysCmd(), cmds.NewRollbackCmd(), + backupCmd, + restoreCmd, initCmd, ) diff --git a/pkg/cmd/backup.go b/pkg/cmd/backup.go new file mode 100644 index 0000000000..cfc9eb62af --- /dev/null +++ b/pkg/cmd/backup.go @@ -0,0 +1,146 @@ +package cmd + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "github.com/spf13/cobra" + + clientrpc "github.com/evstack/ev-node/pkg/rpc/client" + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +// NewBackupCmd creates a cobra command that streams a datastore backup via the RPC client. +func NewBackupCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "backup", + Short: "Stream a datastore backup to a local file via RPC", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + nodeConfig, err := ParseConfig(cmd) + if err != nil { + return fmt.Errorf("error parsing config: %w", err) + } + + rpcAddress := strings.TrimSpace(nodeConfig.RPC.Address) + if rpcAddress == "" { + return fmt.Errorf("RPC address not found in node configuration") + } + + baseURL := rpcAddress + if !strings.HasPrefix(baseURL, "http://") && !strings.HasPrefix(baseURL, "https://") { + baseURL = fmt.Sprintf("http://%s", baseURL) + } + + outputPath, err := cmd.Flags().GetString("output") + if err != nil { + return err + } + + if outputPath == "" { + timestamp := time.Now().UTC().Format("20060102-150405") + outputPath = fmt.Sprintf("evnode-backup-%s.badger", timestamp) + } + + outputPath, err = filepath.Abs(outputPath) + if err != nil { + return fmt.Errorf("failed to resolve output path: %w", err) + } + + if err := os.MkdirAll(filepath.Dir(outputPath), 0o755); err != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + + force, err := cmd.Flags().GetBool("force") + if err != nil { + return err + } + + if !force { + if _, statErr := os.Stat(outputPath); statErr == nil { + return fmt.Errorf("output file %s already exists (use --force to overwrite)", outputPath) + } else if !errors.Is(statErr, os.ErrNotExist) { + return fmt.Errorf("failed to inspect output file: %w", statErr) + } + } + + file, err := os.OpenFile(outputPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return fmt.Errorf("failed to open output file: %w", err) + } + defer file.Close() + + writer := bufio.NewWriterSize(file, 1<<20) // 1 MiB buffer for fewer syscalls. + bytesCount := &countingWriter{} + streamWriter := io.MultiWriter(writer, bytesCount) + + sinceVersion, err := cmd.Flags().GetUint64("since-version") + if err != nil { + return err + } + + ctx := cmd.Context() + if ctx == nil { + ctx = context.Background() + } + + client := clientrpc.NewClient(baseURL) + + metadata, backupErr := client.Backup(ctx, &pb.BackupRequest{ + SinceVersion: sinceVersion, + }, streamWriter) + if backupErr != nil { + // Remove the partial file on failure to avoid keeping corrupt snapshots. + _ = writer.Flush() + _ = file.Close() + _ = os.Remove(outputPath) + return fmt.Errorf("backup failed: %w", backupErr) + } + + if err := writer.Flush(); err != nil { + _ = file.Close() + _ = os.Remove(outputPath) + return fmt.Errorf("failed to flush backup data: %w", err) + } + + if !metadata.GetCompleted() { + _ = file.Close() + _ = os.Remove(outputPath) + return fmt.Errorf("backup stream ended without completion metadata") + } + + cmd.Printf("Backup saved to %s (%d bytes)\n", outputPath, bytesCount.Bytes()) + cmd.Printf("Current height: %d\n", metadata.GetCurrentHeight()) + cmd.Printf("Since version: %d\n", metadata.GetSinceVersion()) + cmd.Printf("Last version: %d\n", metadata.GetLastVersion()) + + return nil + }, + } + + cmd.Flags().String("output", "", "Path to the backup file (defaults to ./evnode-backup-.badger)") + cmd.Flags().Uint64("since-version", 0, "Generate an incremental backup starting from the provided version") + cmd.Flags().Bool("force", false, "Overwrite the output file if it already exists") + + return cmd +} + +type countingWriter struct { + total int64 +} + +func (c *countingWriter) Write(p []byte) (int, error) { + c.total += int64(len(p)) + return len(p), nil +} + +func (c *countingWriter) Bytes() int64 { + return c.total +} diff --git a/pkg/cmd/backup_test.go b/pkg/cmd/backup_test.go new file mode 100644 index 0000000000..3875b3f64e --- /dev/null +++ b/pkg/cmd/backup_test.go @@ -0,0 +1,126 @@ +package cmd + +import ( + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/rs/zerolog" + "github.com/spf13/cobra" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/rpc/server" + "github.com/evstack/ev-node/test/mocks" + "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" +) + +func TestBackupCmd_Success(t *testing.T) { + t.Parallel() + + mockStore := mocks.NewMockStore(t) + + mockStore.On("Height", mock.Anything).Return(uint64(15), nil) + mockStore.On("Backup", mock.Anything, mock.Anything, uint64(9)).Run(func(args mock.Arguments) { + writer := args.Get(1).(io.Writer) + _, _ = writer.Write([]byte("chunk-1")) + _, _ = writer.Write([]byte("chunk-2")) + }).Return(uint64(21), nil) + + logger := zerolog.Nop() + storeServer := server.NewStoreServer(mockStore, logger) + mux := http.NewServeMux() + storePath, storeHandler := v1connect.NewStoreServiceHandler(storeServer) + mux.Handle(storePath, storeHandler) + + httpServer := httptest.NewServer(h2c.NewHandler(mux, &http2.Server{})) + defer httpServer.Close() + + tempDir, err := os.MkdirTemp("", "evnode-backup-*") + require.NoError(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(tempDir) + }) + + backupCmd := NewBackupCmd() + config.AddFlags(backupCmd) + + rootCmd := &cobra.Command{Use: "root"} + config.AddGlobalFlags(rootCmd, "test") + rootCmd.AddCommand(backupCmd) + + outPath := filepath.Join(tempDir, "snapshot.badger") + rpcAddr := strings.TrimPrefix(httpServer.URL, "http://") + + output, err := executeCommandC( + rootCmd, + "backup", + "--home="+tempDir, + "--evnode.rpc.address="+rpcAddr, + "--output", outPath, + "--since-version", "9", + ) + + require.NoError(t, err, "command failed: %s", output) + + data, readErr := os.ReadFile(outPath) + require.NoError(t, readErr) + require.Equal(t, "chunk-1chunk-2", string(data)) + + require.Contains(t, output, "Backup saved to") + require.Contains(t, output, "Current height: 15") + require.Contains(t, output, "Since version: 9") + require.Contains(t, output, "Last version: 21") + + mockStore.AssertExpectations(t) +} + +func TestBackupCmd_ExistingFileWithoutForce(t *testing.T) { + t.Parallel() + + mockStore := mocks.NewMockStore(t) + logger := zerolog.Nop() + storeServer := server.NewStoreServer(mockStore, logger) + mux := http.NewServeMux() + storePath, storeHandler := v1connect.NewStoreServiceHandler(storeServer) + mux.Handle(storePath, storeHandler) + + httpServer := httptest.NewServer(h2c.NewHandler(mux, &http2.Server{})) + defer httpServer.Close() + + tempDir, err := os.MkdirTemp("", "evnode-backup-existing-*") + require.NoError(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(tempDir) + }) + + outPath := filepath.Join(tempDir, "snapshot.badger") + require.NoError(t, os.WriteFile(outPath, []byte("existing"), 0o600)) + + backupCmd := NewBackupCmd() + config.AddFlags(backupCmd) + + rootCmd := &cobra.Command{Use: "root"} + config.AddGlobalFlags(rootCmd, "test") + rootCmd.AddCommand(backupCmd) + + rpcAddr := strings.TrimPrefix(httpServer.URL, "http://") + + output, err := executeCommandC( + rootCmd, + "backup", + "--home="+tempDir, + "--evnode.rpc.address="+rpcAddr, + "--output", outPath, + ) + + require.Error(t, err) + require.Contains(t, output, "already exists (use --force to overwrite)") +} diff --git a/pkg/cmd/restore.go b/pkg/cmd/restore.go new file mode 100644 index 0000000000..ff531833d6 --- /dev/null +++ b/pkg/cmd/restore.go @@ -0,0 +1,123 @@ +package cmd + +import ( + "bufio" + "context" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + + "github.com/evstack/ev-node/pkg/store" +) + +// NewRestoreCmd creates a cobra command that restores a datastore from a Badger backup file. +func NewRestoreCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "restore", + Short: "Restore a datastore from a Badger backup file", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + nodeConfig, err := ParseConfig(cmd) + if err != nil { + return fmt.Errorf("error parsing config: %w", err) + } + + inputPath, err := cmd.Flags().GetString("input") + if err != nil { + return err + } + + if inputPath == "" { + return fmt.Errorf("--input flag is required") + } + + inputPath, err = filepath.Abs(inputPath) + if err != nil { + return fmt.Errorf("failed to resolve input path: %w", err) + } + + // Check if input file exists + if _, err := os.Stat(inputPath); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("backup file not found: %s", inputPath) + } + return fmt.Errorf("failed to access backup file: %w", err) + } + + force, err := cmd.Flags().GetBool("force") + if err != nil { + return err + } + + // Check if datastore already exists + dbPath := filepath.Join(nodeConfig.RootDir, nodeConfig.DBPath) + if _, err := os.Stat(dbPath); err == nil && !force { + return fmt.Errorf("datastore already exists at %s (use --force to overwrite)", dbPath) + } + + // Remove existing datastore if force is enabled + if force { + if err := os.RemoveAll(dbPath); err != nil { + return fmt.Errorf("failed to remove existing datastore: %w", err) + } + } + + // Create the datastore directory + if err := os.MkdirAll(dbPath, 0o755); err != nil { + return fmt.Errorf("failed to create datastore directory: %w", err) + } + + appName, err := cmd.Flags().GetString("app-name") + if err != nil { + return err + } + + if appName == "" { + appName = "ev-node" + } + + // Open the datastore + kvStore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, appName) + if err != nil { + return fmt.Errorf("failed to open datastore: %w", err) + } + defer kvStore.Close() + + evStore := store.New(kvStore) + + // Open the backup file + file, err := os.Open(inputPath) + if err != nil { + return fmt.Errorf("failed to open backup file: %w", err) + } + defer file.Close() + + reader := bufio.NewReaderSize(file, 1<<20) // 1 MiB buffer + + ctx := cmd.Context() + if ctx == nil { + ctx = context.Background() + } + + cmd.Println("Restoring datastore from backup...") + + // Perform the restore + if err := evStore.Restore(ctx, reader); err != nil { + return fmt.Errorf("restore failed: %w", err) + } + + cmd.Printf("Restore completed successfully\n") + cmd.Printf("Datastore restored to: %s\n", dbPath) + + return nil + }, + } + + cmd.Flags().String("input", "", "Path to the backup file (required)") + cmd.Flags().String("app-name", "", "Application name for the datastore (default: ev-node)") + cmd.Flags().Bool("force", false, "Overwrite existing datastore if it exists") + + return cmd +} diff --git a/pkg/rpc/client/client.go b/pkg/rpc/client/client.go index 316b028f66..34cce46140 100644 --- a/pkg/rpc/client/client.go +++ b/pkg/rpc/client/client.go @@ -2,6 +2,8 @@ package client import ( "context" + "fmt" + "io" "net/http" "connectrpc.com/connect" @@ -92,6 +94,54 @@ func (c *Client) GetMetadata(ctx context.Context, key string) ([]byte, error) { return resp.Msg.Value, nil } +// Backup streams a datastore backup into the provided writer and returns the final metadata emitted by the server. +// The writer is not closed by this method. +func (c *Client) Backup(ctx context.Context, params *pb.BackupRequest, dst io.Writer) (*pb.BackupMetadata, error) { + if dst == nil { + return nil, fmt.Errorf("backup destination writer cannot be nil") + } + + if params == nil { + params = &pb.BackupRequest{} + } + + stream, err := c.storeClient.Backup(ctx, connect.NewRequest(params)) + if err != nil { + return nil, err + } + defer stream.Close() // Best effort; ignore close error to preserve primary result. + + var lastMetadata *pb.BackupMetadata + for stream.Receive() { + msg := stream.Msg() + if metadata := msg.GetMetadata(); metadata != nil { + lastMetadata = metadata + continue + } + + if chunk := msg.GetChunk(); chunk != nil { + if _, err := dst.Write(chunk); err != nil { + _ = stream.Close() + return lastMetadata, fmt.Errorf("failed to write backup chunk: %w", err) + } + } + } + + if err := stream.Err(); err != nil { + return lastMetadata, err + } + + if lastMetadata == nil { + return nil, fmt.Errorf("backup stream completed without metadata") + } + + if !lastMetadata.GetCompleted() { + return lastMetadata, fmt.Errorf("backup stream ended without completion metadata") + } + + return lastMetadata, nil +} + // GetPeerInfo returns information about the connected peers func (c *Client) GetPeerInfo(ctx context.Context) ([]*pb.PeerInfo, error) { req := connect.NewRequest(&emptypb.Empty{}) diff --git a/pkg/rpc/client/client_test.go b/pkg/rpc/client/client_test.go index 97c2475137..c5a49ca464 100644 --- a/pkg/rpc/client/client_test.go +++ b/pkg/rpc/client/client_test.go @@ -1,7 +1,9 @@ package client import ( + "bytes" "context" + "io" "net/http" "net/http/httptest" "testing" @@ -20,6 +22,7 @@ import ( "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/types" + pb "github.com/evstack/ev-node/types/pb/evnode/v1" rpc "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" ) @@ -173,6 +176,33 @@ func TestClientGetBlockByHash(t *testing.T) { mockStore.AssertExpectations(t) } +func TestClientBackup(t *testing.T) { + mockStore := mocks.NewMockStore(t) + mockP2P := mocks.NewMockP2PRPC(t) + + mockStore.On("Height", mock.Anything).Return(uint64(15), nil) + mockStore.On("Backup", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + writer := args.Get(1).(io.Writer) + _, _ = writer.Write([]byte("chunk-1")) + _, _ = writer.Write([]byte("chunk-2")) + }).Return(uint64(42), nil) + + testServer, client := setupTestServer(t, mockStore, mockP2P) + defer testServer.Close() + + var buf bytes.Buffer + metadata, err := client.Backup(context.Background(), &pb.BackupRequest{SinceVersion: 5}, &buf) + require.NoError(t, err) + require.NotNil(t, metadata) + require.Equal(t, "chunk-1chunk-2", buf.String()) + require.True(t, metadata.GetCompleted()) + require.Equal(t, uint64(15), metadata.GetCurrentHeight()) + require.Equal(t, uint64(5), metadata.GetSinceVersion()) + require.Equal(t, uint64(42), metadata.GetLastVersion()) + + mockStore.AssertExpectations(t) +} + func TestClientGetPeerInfo(t *testing.T) { // Create mocks mockStore := mocks.NewMockStore(t) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 290994be33..7f0ae5fffd 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "io" "net/http" "time" @@ -188,6 +189,156 @@ func (s *StoreServer) GetMetadata( }), nil } +// Backup streams a Badger backup of the datastore so it can be persisted externally. +func (s *StoreServer) Backup( + ctx context.Context, + req *connect.Request[pb.BackupRequest], + stream *connect.ServerStream[pb.BackupResponse], +) error { + since := req.Msg.GetSinceVersion() + + currentHeight, err := s.store.Height(ctx) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get current height: %w", err)) + } + + initialMetadata := &pb.BackupMetadata{ + CurrentHeight: currentHeight, + SinceVersion: since, + Completed: false, + LastVersion: 0, + } + + if err := stream.Send(&pb.BackupResponse{ + Response: &pb.BackupResponse_Metadata{ + Metadata: initialMetadata, + }, + }); err != nil { + return err + } + + writer := newBackupStreamWriter(stream, defaultBackupChunkSize) + version, err := s.store.Backup(ctx, writer, since) + if err != nil { + var connectErr *connect.Error + if errors.As(err, &connectErr) { + return connectErr + } + if errors.Is(err, context.Canceled) { + return connect.NewError(connect.CodeCanceled, err) + } + if errors.Is(err, context.DeadlineExceeded) { + return connect.NewError(connect.CodeDeadlineExceeded, err) + } + return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to execute backup: %w", err)) + } + + if err := writer.Flush(); err != nil { + var connectErr *connect.Error + if errors.As(err, &connectErr) { + return connectErr + } + if errors.Is(err, context.Canceled) { + return connect.NewError(connect.CodeCanceled, err) + } + if errors.Is(err, context.DeadlineExceeded) { + return connect.NewError(connect.CodeDeadlineExceeded, err) + } + return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to flush backup stream: %w", err)) + } + + completedMetadata := &pb.BackupMetadata{ + CurrentHeight: currentHeight, + SinceVersion: since, + LastVersion: version, + Completed: true, + } + + if err := stream.Send(&pb.BackupResponse{ + Response: &pb.BackupResponse_Metadata{ + Metadata: completedMetadata, + }, + }); err != nil { + return err + } + + return nil +} + +const defaultBackupChunkSize = 128 * 1024 + +var _ io.Writer = (*backupStreamWriter)(nil) + +type backupStreamWriter struct { + stream *connect.ServerStream[pb.BackupResponse] + buf []byte + chunkSize int +} + +func newBackupStreamWriter(stream *connect.ServerStream[pb.BackupResponse], chunkSize int) *backupStreamWriter { + if chunkSize <= 0 { + chunkSize = defaultBackupChunkSize + } + + return &backupStreamWriter{ + stream: stream, + buf: make([]byte, 0, chunkSize), + chunkSize: chunkSize, + } +} + +func (w *backupStreamWriter) Write(p []byte) (int, error) { + written := 0 + for len(p) > 0 { + space := w.chunkSize - len(w.buf) + if space == 0 { + if err := w.flush(); err != nil { + return written, err + } + space = w.chunkSize - len(w.buf) + } + + if space > len(p) { + space = len(p) + } + + w.buf = append(w.buf, p[:space]...) + p = p[space:] + written += space + + if len(w.buf) == w.chunkSize { + if err := w.flush(); err != nil { + return written, err + } + } + } + return written, nil +} + +func (w *backupStreamWriter) Flush() error { + return w.flush() +} + +func (w *backupStreamWriter) flush() error { + if len(w.buf) == 0 { + return nil + } + + chunk := make([]byte, len(w.buf)) + copy(chunk, w.buf) + + if err := w.stream.Send(&pb.BackupResponse{ + Response: &pb.BackupResponse_Chunk{ + Chunk: chunk, + }, + }); err != nil { + return err + } + + w.buf = w.buf[:0] + return nil +} + type ConfigServer struct { config config.Config signer []byte diff --git a/pkg/store/backup.go b/pkg/store/backup.go new file mode 100644 index 0000000000..361b90aad4 --- /dev/null +++ b/pkg/store/backup.go @@ -0,0 +1,45 @@ +package store + +import ( + "context" + "fmt" + "io" + + ds "github.com/ipfs/go-datastore" + badger4 "github.com/ipfs/go-ds-badger4" +) + +// Backup streams the underlying Badger datastore snapshot into the provided writer. +// The returned uint64 corresponds to the last version contained in the backup stream, +// which can be re-used to generate incremental backups via the since parameter. +func (s *DefaultStore) Backup(ctx context.Context, writer io.Writer, since uint64) (uint64, error) { + if err := ctx.Err(); err != nil { + return 0, err + } + + // Try direct badger4 cast first + if badgerDatastore, ok := s.db.(*badger4.Datastore); ok { + return backupBadger(badgerDatastore, writer, since) + } + + // Try to unwrap one level (e.g., PrefixTransform wrapper) + if shim, ok := s.db.(ds.Shim); ok { + children := shim.Children() + if len(children) > 0 { + if badgerDatastore, ok := children[0].(*badger4.Datastore); ok { + return backupBadger(badgerDatastore, writer, since) + } + } + } + + return 0, fmt.Errorf("backup is only supported for badger4 datastore") +} + +func backupBadger(badgerDatastore *badger4.Datastore, writer io.Writer, since uint64) (uint64, error) { + // `badger.DB.Backup` internally orchestrates a consistent snapshot without pausing writes. + version, err := badgerDatastore.DB.Backup(writer, since) + if err != nil { + return 0, fmt.Errorf("badger backup failed: %w", err) + } + return version, nil +} diff --git a/pkg/store/restore.go b/pkg/store/restore.go new file mode 100644 index 0000000000..96b8e4366b --- /dev/null +++ b/pkg/store/restore.go @@ -0,0 +1,66 @@ +package store + +import ( + "context" + "fmt" + "io" + + ds "github.com/ipfs/go-datastore" + badger4 "github.com/ipfs/go-ds-badger4" +) + +// Restore loads a Badger backup from the provided reader into the datastore. +// This operation will fail if the datastore already contains data, unless the datastore +// supports merging backups. The restore process is atomic and will either complete +// fully or leave the datastore unchanged. +func (s *DefaultStore) Restore(ctx context.Context, reader io.Reader) error { + if err := ctx.Err(); err != nil { + return err + } + + visited := make(map[ds.Datastore]struct{}) + current, ok := any(s.db).(ds.Datastore) + if !ok { + return fmt.Errorf("restore is not supported by the configured datastore") + } + + for { + // Try to leverage a native restore implementation if the underlying datastore exposes one. + type restorable interface { + Load(io.Reader) error + } + if dsRestore, ok := current.(restorable); ok { + if err := dsRestore.Load(reader); err != nil { + return fmt.Errorf("datastore restore failed: %w", err) + } + return nil + } + + // Default Badger datastore used across ev-node. + if badgerDatastore, ok := current.(*badger4.Datastore); ok { + // `badger.DB.Load` internally restores the backup atomically. + if err := badgerDatastore.DB.Load(reader, 16); err != nil { + return fmt.Errorf("badger restore failed: %w", err) + } + return nil + } + + // Attempt to unwrap shimmed datastores (e.g., prefix or mutex wrappers) to reach the backing store. + if _, seen := visited[current]; seen { + break + } + visited[current] = struct{}{} + + shim, ok := current.(ds.Shim) + if !ok { + break + } + children := shim.Children() + if len(children) == 0 { + break + } + current = children[0] + } + + return fmt.Errorf("restore is not supported by the configured datastore") +} diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index 93f299306e..f24e48f928 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "context" "encoding/binary" "errors" @@ -1192,3 +1193,24 @@ func TestRollbackDAIncludedHeightGetMetadataError(t *testing.T) { require.Contains(err.Error(), "failed to get DA included height") require.Contains(err.Error(), "metadata retrieval failed") } + +func TestDefaultStoreBackup(t *testing.T) { + t.Parallel() + + ctx := context.Background() + kv, err := NewDefaultInMemoryKVStore() + require.NoError(t, err) + + s := New(kv) + t.Cleanup(func() { + require.NoError(t, s.Close()) + }) + + require.NoError(t, s.SetMetadata(ctx, "backup-test", []byte("value"))) + + var buf bytes.Buffer + version, err := s.Backup(ctx, &buf, 0) + require.NoError(t, err) + require.NotZero(t, buf.Len()) + require.NotZero(t, version) +} diff --git a/pkg/store/types.go b/pkg/store/types.go index bf1cb6ced8..6e1ec07725 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -2,6 +2,7 @@ package store import ( "context" + "io" ds "github.com/ipfs/go-datastore" @@ -32,6 +33,7 @@ type Batch interface { // Store is minimal interface for storing and retrieving blocks, commits and state. type Store interface { Rollback + Backup Reader // SetMetadata saves arbitrary value in the store. @@ -70,6 +72,18 @@ type Reader interface { GetMetadata(ctx context.Context, key string) ([]byte, error) } +type Backup interface { + // Backup writes a consistent backup stream to writer. The returned version can be used + // as the starting point for incremental backups. + Backup(ctx context.Context, writer io.Writer, since uint64) (uint64, error) + + // Restore loads a backup stream from reader into the datastore. + Restore(ctx context.Context, reader io.Reader) error + + // Close safely closes underlying data storage, to ensure that data is actually saved. + Close() error +} + type Rollback interface { // Rollback deletes x height from the ev-node store. // Aggregator is used to determine if the rollback is performed on the aggregator node. diff --git a/proto/evnode/v1/state_rpc.proto b/proto/evnode/v1/state_rpc.proto index 1b468b6ef8..b41b502684 100644 --- a/proto/evnode/v1/state_rpc.proto +++ b/proto/evnode/v1/state_rpc.proto @@ -1,10 +1,9 @@ syntax = "proto3"; package evnode.v1; -import "google/protobuf/empty.proto"; -import "google/protobuf/timestamp.proto"; import "evnode/v1/evnode.proto"; import "evnode/v1/state.proto"; +import "google/protobuf/empty.proto"; option go_package = "github.com/evstack/ev-node/types/pb/evnode/v1"; @@ -21,12 +20,15 @@ service StoreService { // GetGenesisDaHeight returns the DA height at which the first Evolve block was included. rpc GetGenesisDaHeight(google.protobuf.Empty) returns (GetGenesisDaHeightResponse) {} + + // Backup streams a Badger backup of the datastore so it can be persisted externally. + rpc Backup(BackupRequest) returns (stream BackupResponse) {} } // Block contains all the components of a complete block message Block { SignedHeader header = 1; - Data data = 2; + Data data = 2; } // GetBlockRequest defines the request for retrieving a block @@ -34,15 +36,15 @@ message GetBlockRequest { // The height or hash of the block to retrieve oneof identifier { uint64 height = 1; - bytes hash = 2; + bytes hash = 2; } } // GetBlockResponse defines the response for retrieving a block message GetBlockResponse { - Block block = 1; + Block block = 1; uint64 header_da_height = 2; - uint64 data_da_height = 3; + uint64 data_da_height = 3; } // GetStateResponse defines the response for retrieving the current state @@ -62,5 +64,28 @@ message GetMetadataResponse { // GetGenesisDaHeightResponse defines the DA height at which the first Evolve block was included. message GetGenesisDaHeightResponse { - uint64 height = 3; + uint64 height = 3; +} + +// BackupRequest defines the parameters for requesting a datastore backup. +message BackupRequest { + // since_version allows incremental backups. 0 produces a full backup. + uint64 since_version = 1; +} + +// BackupMetadata contains progress or completion details emitted during a backup stream. +message BackupMetadata { + uint64 current_height = 1; + uint64 since_version = 2; + uint64 last_version = 3; + bool completed = 4; +} + +// BackupResponse multiplexes metadata and raw backup data chunks in the stream. +message BackupResponse { + // response contains either metadata about the backup progress or a chunk of backup data. + oneof response { + BackupMetadata metadata = 1; + bytes chunk = 2; + } } diff --git a/test/mocks/store.go b/test/mocks/store.go index 7f2aa180d0..64f69e6c5f 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -6,6 +6,7 @@ package mocks import ( "context" + "io" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -39,6 +40,78 @@ func (_m *MockStore) EXPECT() *MockStore_Expecter { return &MockStore_Expecter{mock: &_m.Mock} } +// Backup provides a mock function for the type MockStore +func (_mock *MockStore) Backup(ctx context.Context, writer io.Writer, since uint64) (uint64, error) { + ret := _mock.Called(ctx, writer, since) + + if len(ret) == 0 { + panic("no return value specified for Backup") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer, uint64) (uint64, error)); ok { + return returnFunc(ctx, writer, since) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer, uint64) uint64); ok { + r0 = returnFunc(ctx, writer, since) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, io.Writer, uint64) error); ok { + r1 = returnFunc(ctx, writer, since) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStore_Backup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Backup' +type MockStore_Backup_Call struct { + *mock.Call +} + +// Backup is a helper method to define mock.On call +// - ctx context.Context +// - writer io.Writer +// - since uint64 +func (_e *MockStore_Expecter) Backup(ctx interface{}, writer interface{}, since interface{}) *MockStore_Backup_Call { + return &MockStore_Backup_Call{Call: _e.mock.On("Backup", ctx, writer, since)} +} + +func (_c *MockStore_Backup_Call) Run(run func(ctx context.Context, writer io.Writer, since uint64)) *MockStore_Backup_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 io.Writer + if args[1] != nil { + arg1 = args[1].(io.Writer) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockStore_Backup_Call) Return(v uint64, err error) *MockStore_Backup_Call { + _c.Call.Return(v, err) + return _c +} + +func (_c *MockStore_Backup_Call) RunAndReturn(run func(ctx context.Context, writer io.Writer, since uint64) (uint64, error)) *MockStore_Backup_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function for the type MockStore func (_mock *MockStore) Close() error { ret := _mock.Called() @@ -755,6 +828,63 @@ func (_c *MockStore_NewBatch_Call) RunAndReturn(run func(ctx context.Context) (s return _c } +// Restore provides a mock function for the type MockStore +func (_mock *MockStore) Restore(ctx context.Context, reader io.Reader) error { + ret := _mock.Called(ctx, reader) + + if len(ret) == 0 { + panic("no return value specified for Restore") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Reader) error); ok { + r0 = returnFunc(ctx, reader) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_Restore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Restore' +type MockStore_Restore_Call struct { + *mock.Call +} + +// Restore is a helper method to define mock.On call +// - ctx context.Context +// - reader io.Reader +func (_e *MockStore_Expecter) Restore(ctx interface{}, reader interface{}) *MockStore_Restore_Call { + return &MockStore_Restore_Call{Call: _e.mock.On("Restore", ctx, reader)} +} + +func (_c *MockStore_Restore_Call) Run(run func(ctx context.Context, reader io.Reader)) *MockStore_Restore_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 io.Reader + if args[1] != nil { + arg1 = args[1].(io.Reader) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStore_Restore_Call) Return(err error) *MockStore_Restore_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_Restore_Call) RunAndReturn(run func(ctx context.Context, reader io.Reader) error) *MockStore_Restore_Call { + _c.Call.Return(run) + return _c +} + // Rollback provides a mock function for the type MockStore func (_mock *MockStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { ret := _mock.Called(ctx, height, aggregator) diff --git a/types/pb/evnode/v1/batch.pb.go b/types/pb/evnode/v1/batch.pb.go index 77b26c00a6..576f1edfcc 100644 --- a/types/pb/evnode/v1/batch.pb.go +++ b/types/pb/evnode/v1/batch.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/batch.proto diff --git a/types/pb/evnode/v1/config.pb.go b/types/pb/evnode/v1/config.pb.go index 1cab00fd1b..23eb58d56a 100644 --- a/types/pb/evnode/v1/config.pb.go +++ b/types/pb/evnode/v1/config.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/config.proto diff --git a/types/pb/evnode/v1/evnode.pb.go b/types/pb/evnode/v1/evnode.pb.go index 700b2182ef..acfdeccafd 100644 --- a/types/pb/evnode/v1/evnode.pb.go +++ b/types/pb/evnode/v1/evnode.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/evnode.proto diff --git a/types/pb/evnode/v1/execution.pb.go b/types/pb/evnode/v1/execution.pb.go index 0fc0f125b1..cd243512e1 100644 --- a/types/pb/evnode/v1/execution.pb.go +++ b/types/pb/evnode/v1/execution.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/execution.proto diff --git a/types/pb/evnode/v1/health.pb.go b/types/pb/evnode/v1/health.pb.go index aafe1640f9..1a76c46c7e 100644 --- a/types/pb/evnode/v1/health.pb.go +++ b/types/pb/evnode/v1/health.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/health.proto diff --git a/types/pb/evnode/v1/p2p_rpc.pb.go b/types/pb/evnode/v1/p2p_rpc.pb.go index c1c45e17ab..9325f84920 100644 --- a/types/pb/evnode/v1/p2p_rpc.pb.go +++ b/types/pb/evnode/v1/p2p_rpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/p2p_rpc.proto diff --git a/types/pb/evnode/v1/signer.pb.go b/types/pb/evnode/v1/signer.pb.go index b1f6ec3217..1b72b62a93 100644 --- a/types/pb/evnode/v1/signer.pb.go +++ b/types/pb/evnode/v1/signer.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/signer.proto diff --git a/types/pb/evnode/v1/state.pb.go b/types/pb/evnode/v1/state.pb.go index 18386afe78..301666ee88 100644 --- a/types/pb/evnode/v1/state.pb.go +++ b/types/pb/evnode/v1/state.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/state.proto diff --git a/types/pb/evnode/v1/state_rpc.pb.go b/types/pb/evnode/v1/state_rpc.pb.go index 0250f1921e..06e1fa8d37 100644 --- a/types/pb/evnode/v1/state_rpc.pb.go +++ b/types/pb/evnode/v1/state_rpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/state_rpc.proto @@ -10,7 +10,6 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" emptypb "google.golang.org/protobuf/types/known/emptypb" - _ "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -402,11 +401,211 @@ func (x *GetGenesisDaHeightResponse) GetHeight() uint64 { return 0 } +// BackupRequest defines the parameters for requesting a datastore backup. +type BackupRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // since_version allows incremental backups. 0 produces a full backup. + SinceVersion uint64 `protobuf:"varint,1,opt,name=since_version,json=sinceVersion,proto3" json:"since_version,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BackupRequest) Reset() { + *x = BackupRequest{} + mi := &file_evnode_v1_state_rpc_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BackupRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BackupRequest) ProtoMessage() {} + +func (x *BackupRequest) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_rpc_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BackupRequest.ProtoReflect.Descriptor instead. +func (*BackupRequest) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{7} +} + +func (x *BackupRequest) GetSinceVersion() uint64 { + if x != nil { + return x.SinceVersion + } + return 0 +} + +// BackupMetadata contains progress or completion details emitted during a backup stream. +type BackupMetadata struct { + state protoimpl.MessageState `protogen:"open.v1"` + CurrentHeight uint64 `protobuf:"varint,1,opt,name=current_height,json=currentHeight,proto3" json:"current_height,omitempty"` + SinceVersion uint64 `protobuf:"varint,2,opt,name=since_version,json=sinceVersion,proto3" json:"since_version,omitempty"` + LastVersion uint64 `protobuf:"varint,3,opt,name=last_version,json=lastVersion,proto3" json:"last_version,omitempty"` + Completed bool `protobuf:"varint,4,opt,name=completed,proto3" json:"completed,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BackupMetadata) Reset() { + *x = BackupMetadata{} + mi := &file_evnode_v1_state_rpc_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BackupMetadata) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BackupMetadata) ProtoMessage() {} + +func (x *BackupMetadata) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_rpc_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BackupMetadata.ProtoReflect.Descriptor instead. +func (*BackupMetadata) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{8} +} + +func (x *BackupMetadata) GetCurrentHeight() uint64 { + if x != nil { + return x.CurrentHeight + } + return 0 +} + +func (x *BackupMetadata) GetSinceVersion() uint64 { + if x != nil { + return x.SinceVersion + } + return 0 +} + +func (x *BackupMetadata) GetLastVersion() uint64 { + if x != nil { + return x.LastVersion + } + return 0 +} + +func (x *BackupMetadata) GetCompleted() bool { + if x != nil { + return x.Completed + } + return false +} + +// BackupResponse multiplexes metadata and raw backup data chunks in the stream. +type BackupResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // response contains either metadata about the backup progress or a chunk of backup data. + // + // Types that are valid to be assigned to Response: + // + // *BackupResponse_Metadata + // *BackupResponse_Chunk + Response isBackupResponse_Response `protobuf_oneof:"response"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BackupResponse) Reset() { + *x = BackupResponse{} + mi := &file_evnode_v1_state_rpc_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BackupResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BackupResponse) ProtoMessage() {} + +func (x *BackupResponse) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_rpc_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BackupResponse.ProtoReflect.Descriptor instead. +func (*BackupResponse) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{9} +} + +func (x *BackupResponse) GetResponse() isBackupResponse_Response { + if x != nil { + return x.Response + } + return nil +} + +func (x *BackupResponse) GetMetadata() *BackupMetadata { + if x != nil { + if x, ok := x.Response.(*BackupResponse_Metadata); ok { + return x.Metadata + } + } + return nil +} + +func (x *BackupResponse) GetChunk() []byte { + if x != nil { + if x, ok := x.Response.(*BackupResponse_Chunk); ok { + return x.Chunk + } + } + return nil +} + +type isBackupResponse_Response interface { + isBackupResponse_Response() +} + +type BackupResponse_Metadata struct { + Metadata *BackupMetadata `protobuf:"bytes,1,opt,name=metadata,proto3,oneof"` +} + +type BackupResponse_Chunk struct { + Chunk []byte `protobuf:"bytes,2,opt,name=chunk,proto3,oneof"` +} + +func (*BackupResponse_Metadata) isBackupResponse_Response() {} + +func (*BackupResponse_Chunk) isBackupResponse_Response() {} + var File_evnode_v1_state_rpc_proto protoreflect.FileDescriptor const file_evnode_v1_state_rpc_proto_rawDesc = "" + "\n" + - "\x19evnode/v1/state_rpc.proto\x12\tevnode.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\x1a\x15evnode/v1/state.proto\"]\n" + + "\x19evnode/v1/state_rpc.proto\x12\tevnode.v1\x1a\x16evnode/v1/evnode.proto\x1a\x15evnode/v1/state.proto\x1a\x1bgoogle/protobuf/empty.proto\"]\n" + "\x05Block\x12/\n" + "\x06header\x18\x01 \x01(\v2\x17.evnode.v1.SignedHeaderR\x06header\x12#\n" + "\x04data\x18\x02 \x01(\v2\x0f.evnode.v1.DataR\x04data\"O\n" + @@ -426,12 +625,25 @@ const file_evnode_v1_state_rpc_proto_rawDesc = "" + "\x13GetMetadataResponse\x12\x14\n" + "\x05value\x18\x01 \x01(\fR\x05value\"4\n" + "\x1aGetGenesisDaHeightResponse\x12\x16\n" + - "\x06height\x18\x03 \x01(\x04R\x06height2\xbf\x02\n" + + "\x06height\x18\x03 \x01(\x04R\x06height\"4\n" + + "\rBackupRequest\x12#\n" + + "\rsince_version\x18\x01 \x01(\x04R\fsinceVersion\"\x9d\x01\n" + + "\x0eBackupMetadata\x12%\n" + + "\x0ecurrent_height\x18\x01 \x01(\x04R\rcurrentHeight\x12#\n" + + "\rsince_version\x18\x02 \x01(\x04R\fsinceVersion\x12!\n" + + "\flast_version\x18\x03 \x01(\x04R\vlastVersion\x12\x1c\n" + + "\tcompleted\x18\x04 \x01(\bR\tcompleted\"m\n" + + "\x0eBackupResponse\x127\n" + + "\bmetadata\x18\x01 \x01(\v2\x19.evnode.v1.BackupMetadataH\x00R\bmetadata\x12\x16\n" + + "\x05chunk\x18\x02 \x01(\fH\x00R\x05chunkB\n" + + "\n" + + "\bresponse2\x82\x03\n" + "\fStoreService\x12E\n" + "\bGetBlock\x12\x1a.evnode.v1.GetBlockRequest\x1a\x1b.evnode.v1.GetBlockResponse\"\x00\x12A\n" + "\bGetState\x12\x16.google.protobuf.Empty\x1a\x1b.evnode.v1.GetStateResponse\"\x00\x12N\n" + "\vGetMetadata\x12\x1d.evnode.v1.GetMetadataRequest\x1a\x1e.evnode.v1.GetMetadataResponse\"\x00\x12U\n" + - "\x12GetGenesisDaHeight\x12\x16.google.protobuf.Empty\x1a%.evnode.v1.GetGenesisDaHeightResponse\"\x00B/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\x12GetGenesisDaHeight\x12\x16.google.protobuf.Empty\x1a%.evnode.v1.GetGenesisDaHeightResponse\"\x00\x12A\n" + + "\x06Backup\x12\x18.evnode.v1.BackupRequest\x1a\x19.evnode.v1.BackupResponse\"\x000\x01B/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_state_rpc_proto_rawDescOnce sync.Once @@ -445,7 +657,7 @@ func file_evnode_v1_state_rpc_proto_rawDescGZIP() []byte { return file_evnode_v1_state_rpc_proto_rawDescData } -var file_evnode_v1_state_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_evnode_v1_state_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_evnode_v1_state_rpc_proto_goTypes = []any{ (*Block)(nil), // 0: evnode.v1.Block (*GetBlockRequest)(nil), // 1: evnode.v1.GetBlockRequest @@ -454,29 +666,35 @@ var file_evnode_v1_state_rpc_proto_goTypes = []any{ (*GetMetadataRequest)(nil), // 4: evnode.v1.GetMetadataRequest (*GetMetadataResponse)(nil), // 5: evnode.v1.GetMetadataResponse (*GetGenesisDaHeightResponse)(nil), // 6: evnode.v1.GetGenesisDaHeightResponse - (*SignedHeader)(nil), // 7: evnode.v1.SignedHeader - (*Data)(nil), // 8: evnode.v1.Data - (*State)(nil), // 9: evnode.v1.State - (*emptypb.Empty)(nil), // 10: google.protobuf.Empty + (*BackupRequest)(nil), // 7: evnode.v1.BackupRequest + (*BackupMetadata)(nil), // 8: evnode.v1.BackupMetadata + (*BackupResponse)(nil), // 9: evnode.v1.BackupResponse + (*SignedHeader)(nil), // 10: evnode.v1.SignedHeader + (*Data)(nil), // 11: evnode.v1.Data + (*State)(nil), // 12: evnode.v1.State + (*emptypb.Empty)(nil), // 13: google.protobuf.Empty } var file_evnode_v1_state_rpc_proto_depIdxs = []int32{ - 7, // 0: evnode.v1.Block.header:type_name -> evnode.v1.SignedHeader - 8, // 1: evnode.v1.Block.data:type_name -> evnode.v1.Data + 10, // 0: evnode.v1.Block.header:type_name -> evnode.v1.SignedHeader + 11, // 1: evnode.v1.Block.data:type_name -> evnode.v1.Data 0, // 2: evnode.v1.GetBlockResponse.block:type_name -> evnode.v1.Block - 9, // 3: evnode.v1.GetStateResponse.state:type_name -> evnode.v1.State - 1, // 4: evnode.v1.StoreService.GetBlock:input_type -> evnode.v1.GetBlockRequest - 10, // 5: evnode.v1.StoreService.GetState:input_type -> google.protobuf.Empty - 4, // 6: evnode.v1.StoreService.GetMetadata:input_type -> evnode.v1.GetMetadataRequest - 10, // 7: evnode.v1.StoreService.GetGenesisDaHeight:input_type -> google.protobuf.Empty - 2, // 8: evnode.v1.StoreService.GetBlock:output_type -> evnode.v1.GetBlockResponse - 3, // 9: evnode.v1.StoreService.GetState:output_type -> evnode.v1.GetStateResponse - 5, // 10: evnode.v1.StoreService.GetMetadata:output_type -> evnode.v1.GetMetadataResponse - 6, // 11: evnode.v1.StoreService.GetGenesisDaHeight:output_type -> evnode.v1.GetGenesisDaHeightResponse - 8, // [8:12] is the sub-list for method output_type - 4, // [4:8] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 12, // 3: evnode.v1.GetStateResponse.state:type_name -> evnode.v1.State + 8, // 4: evnode.v1.BackupResponse.metadata:type_name -> evnode.v1.BackupMetadata + 1, // 5: evnode.v1.StoreService.GetBlock:input_type -> evnode.v1.GetBlockRequest + 13, // 6: evnode.v1.StoreService.GetState:input_type -> google.protobuf.Empty + 4, // 7: evnode.v1.StoreService.GetMetadata:input_type -> evnode.v1.GetMetadataRequest + 13, // 8: evnode.v1.StoreService.GetGenesisDaHeight:input_type -> google.protobuf.Empty + 7, // 9: evnode.v1.StoreService.Backup:input_type -> evnode.v1.BackupRequest + 2, // 10: evnode.v1.StoreService.GetBlock:output_type -> evnode.v1.GetBlockResponse + 3, // 11: evnode.v1.StoreService.GetState:output_type -> evnode.v1.GetStateResponse + 5, // 12: evnode.v1.StoreService.GetMetadata:output_type -> evnode.v1.GetMetadataResponse + 6, // 13: evnode.v1.StoreService.GetGenesisDaHeight:output_type -> evnode.v1.GetGenesisDaHeightResponse + 9, // 14: evnode.v1.StoreService.Backup:output_type -> evnode.v1.BackupResponse + 10, // [10:15] is the sub-list for method output_type + 5, // [5:10] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_evnode_v1_state_rpc_proto_init() } @@ -490,13 +708,17 @@ func file_evnode_v1_state_rpc_proto_init() { (*GetBlockRequest_Height)(nil), (*GetBlockRequest_Hash)(nil), } + file_evnode_v1_state_rpc_proto_msgTypes[9].OneofWrappers = []any{ + (*BackupResponse_Metadata)(nil), + (*BackupResponse_Chunk)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_state_rpc_proto_rawDesc), len(file_evnode_v1_state_rpc_proto_rawDesc)), NumEnums: 0, - NumMessages: 7, + NumMessages: 10, NumExtensions: 0, NumServices: 1, }, diff --git a/types/pb/evnode/v1/v1connect/config.connect.go b/types/pb/evnode/v1/v1connect/config.connect.go index 0677dd1ca5..e7fb47c1d4 100644 --- a/types/pb/evnode/v1/v1connect/config.connect.go +++ b/types/pb/evnode/v1/v1connect/config.connect.go @@ -46,7 +46,7 @@ const ( type ConfigServiceClient interface { // GetNamespace returns the namespace for this network GetNamespace(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetNamespaceResponse], error) - // GetSequencerInfo returns information about the sequencer + // GetSignerInfo returns information about the signer GetSignerInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetSignerInfoResponse], error) } @@ -96,7 +96,7 @@ func (c *configServiceClient) GetSignerInfo(ctx context.Context, req *connect.Re type ConfigServiceHandler interface { // GetNamespace returns the namespace for this network GetNamespace(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetNamespaceResponse], error) - // GetSequencerInfo returns information about the sequencer + // GetSignerInfo returns information about the signer GetSignerInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetSignerInfoResponse], error) } diff --git a/types/pb/evnode/v1/v1connect/state_rpc.connect.go b/types/pb/evnode/v1/v1connect/state_rpc.connect.go index 1fe049fa11..0e18b9f5c5 100644 --- a/types/pb/evnode/v1/v1connect/state_rpc.connect.go +++ b/types/pb/evnode/v1/v1connect/state_rpc.connect.go @@ -44,6 +44,8 @@ const ( // StoreServiceGetGenesisDaHeightProcedure is the fully-qualified name of the StoreService's // GetGenesisDaHeight RPC. StoreServiceGetGenesisDaHeightProcedure = "/evnode.v1.StoreService/GetGenesisDaHeight" + // StoreServiceBackupProcedure is the fully-qualified name of the StoreService's Backup RPC. + StoreServiceBackupProcedure = "/evnode.v1.StoreService/Backup" ) // StoreServiceClient is a client for the evnode.v1.StoreService service. @@ -56,6 +58,8 @@ type StoreServiceClient interface { GetMetadata(context.Context, *connect.Request[v1.GetMetadataRequest]) (*connect.Response[v1.GetMetadataResponse], error) // GetGenesisDaHeight returns the DA height at which the first Evolve block was included. GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) + // Backup streams a Badger backup of the datastore so it can be persisted externally. + Backup(context.Context, *connect.Request[v1.BackupRequest]) (*connect.ServerStreamForClient[v1.BackupResponse], error) } // NewStoreServiceClient constructs a client for the evnode.v1.StoreService service. By default, it @@ -93,6 +97,12 @@ func NewStoreServiceClient(httpClient connect.HTTPClient, baseURL string, opts . connect.WithSchema(storeServiceMethods.ByName("GetGenesisDaHeight")), connect.WithClientOptions(opts...), ), + backup: connect.NewClient[v1.BackupRequest, v1.BackupResponse]( + httpClient, + baseURL+StoreServiceBackupProcedure, + connect.WithSchema(storeServiceMethods.ByName("Backup")), + connect.WithClientOptions(opts...), + ), } } @@ -102,6 +112,7 @@ type storeServiceClient struct { getState *connect.Client[emptypb.Empty, v1.GetStateResponse] getMetadata *connect.Client[v1.GetMetadataRequest, v1.GetMetadataResponse] getGenesisDaHeight *connect.Client[emptypb.Empty, v1.GetGenesisDaHeightResponse] + backup *connect.Client[v1.BackupRequest, v1.BackupResponse] } // GetBlock calls evnode.v1.StoreService.GetBlock. @@ -124,6 +135,11 @@ func (c *storeServiceClient) GetGenesisDaHeight(ctx context.Context, req *connec return c.getGenesisDaHeight.CallUnary(ctx, req) } +// Backup calls evnode.v1.StoreService.Backup. +func (c *storeServiceClient) Backup(ctx context.Context, req *connect.Request[v1.BackupRequest]) (*connect.ServerStreamForClient[v1.BackupResponse], error) { + return c.backup.CallServerStream(ctx, req) +} + // StoreServiceHandler is an implementation of the evnode.v1.StoreService service. type StoreServiceHandler interface { // GetBlock returns a block by height or hash @@ -134,6 +150,8 @@ type StoreServiceHandler interface { GetMetadata(context.Context, *connect.Request[v1.GetMetadataRequest]) (*connect.Response[v1.GetMetadataResponse], error) // GetGenesisDaHeight returns the DA height at which the first Evolve block was included. GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) + // Backup streams a Badger backup of the datastore so it can be persisted externally. + Backup(context.Context, *connect.Request[v1.BackupRequest], *connect.ServerStream[v1.BackupResponse]) error } // NewStoreServiceHandler builds an HTTP handler from the service implementation. It returns the @@ -167,6 +185,12 @@ func NewStoreServiceHandler(svc StoreServiceHandler, opts ...connect.HandlerOpti connect.WithSchema(storeServiceMethods.ByName("GetGenesisDaHeight")), connect.WithHandlerOptions(opts...), ) + storeServiceBackupHandler := connect.NewServerStreamHandler( + StoreServiceBackupProcedure, + svc.Backup, + connect.WithSchema(storeServiceMethods.ByName("Backup")), + connect.WithHandlerOptions(opts...), + ) return "/evnode.v1.StoreService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case StoreServiceGetBlockProcedure: @@ -177,6 +201,8 @@ func NewStoreServiceHandler(svc StoreServiceHandler, opts ...connect.HandlerOpti storeServiceGetMetadataHandler.ServeHTTP(w, r) case StoreServiceGetGenesisDaHeightProcedure: storeServiceGetGenesisDaHeightHandler.ServeHTTP(w, r) + case StoreServiceBackupProcedure: + storeServiceBackupHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -201,3 +227,7 @@ func (UnimplementedStoreServiceHandler) GetMetadata(context.Context, *connect.Re func (UnimplementedStoreServiceHandler) GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("evnode.v1.StoreService.GetGenesisDaHeight is not implemented")) } + +func (UnimplementedStoreServiceHandler) Backup(context.Context, *connect.Request[v1.BackupRequest], *connect.ServerStream[v1.BackupResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("evnode.v1.StoreService.Backup is not implemented")) +}