diff --git a/go.mod b/go.mod index 0ddc979..f049914 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/cosmos/cosmos-sdk v0.50.13 github.com/cosmos/gogoproto v1.7.0 github.com/go-kit/kit v0.13.0 - github.com/gorilla/rpc v1.2.1 github.com/hashicorp/go-metrics v0.5.4 github.com/ipfs/go-datastore v0.8.2 github.com/rollkit/rollkit v0.14.2-0.20250422111549-9f2f92ea5c6e @@ -108,7 +107,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.1 // indirect - github.com/gorilla/websocket v1.5.3 + github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect diff --git a/go.sum b/go.sum index ecde107..2a5dd12 100644 --- a/go.sum +++ b/go.sum @@ -425,8 +425,6 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/gorilla/rpc v1.2.1 h1:yC+LMV5esttgpVvNORL/xX4jvTTEUE30UZhZ5JF7K9k= -github.com/gorilla/rpc v1.2.1/go.mod h1:uNpOihAlF5xRFLuTYhfR0yfCTm0WTQSQttkMSptRfGk= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= diff --git a/pkg/p2p/errors.go b/pkg/p2p/errors.go new file mode 100644 index 0000000..d162540 --- /dev/null +++ b/pkg/p2p/errors.go @@ -0,0 +1,5 @@ +package p2p + +import "errors" + +var ErrNotReady = errors.New("tx gossiper is not ready") diff --git a/pkg/rpc/core/abci.go b/pkg/rpc/core/abci.go new file mode 100644 index 0000000..9c07ecd --- /dev/null +++ b/pkg/rpc/core/abci.go @@ -0,0 +1,41 @@ +package core + +import ( + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/libs/bytes" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" +) + +// ABCIQuery queries the application for some information. +// More: https://docs.cometbft.com/v0.37/rpc/#/ABCI/abci_query +func ABCIQuery( + ctx *rpctypes.Context, + path string, + data bytes.HexBytes, + height int64, + prove bool, +) (*ctypes.ResultABCIQuery, error) { + resp, err := env.Adapter.App.Query(ctx.Context(), &abci.RequestQuery{ + Data: data, + Path: path, + }) + if err != nil { + return nil, err + } + return &ctypes.ResultABCIQuery{ + Response: *resp, + }, nil +} + +// ABCIInfo gets some info about the application. +// More: https://docs.cometbft.com/v0.37/rpc/#/ABCI/abci_info +func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { + info, err := env.Adapter.App.Info(&abci.RequestInfo{}) + if err != nil { + return nil, err + } + return &ctypes.ResultABCIInfo{ + Response: *info, + }, nil +} diff --git a/pkg/rpc/provider/blockchain.go b/pkg/rpc/core/blocks.go similarity index 52% rename from pkg/rpc/provider/blockchain.go rename to pkg/rpc/core/blocks.go index 3709e22..c8f744e 100644 --- a/pkg/rpc/provider/blockchain.go +++ b/pkg/rpc/core/blocks.go @@ -1,7 +1,6 @@ -package provider +package core import ( - "context" "errors" "fmt" "sort" @@ -9,39 +8,112 @@ import ( cmtbytes "github.com/cometbft/cometbft/libs/bytes" cmtmath "github.com/cometbft/cometbft/libs/math" cmtquery "github.com/cometbft/cometbft/libs/pubsub/query" - coretypes "github.com/cometbft/cometbft/rpc/core/types" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" blockidxnull "github.com/cometbft/cometbft/state/indexer/block/null" cmttypes "github.com/cometbft/cometbft/types" rlktypes "github.com/rollkit/rollkit/types" ) -// Genesis implements client.Client. -func (p *RpcProvider) Genesis(context.Context) (*coretypes.ResultGenesis, error) { - // Returning unimplemented as per the original code. - // Consider implementing or returning a more specific error if needed. - panic("unimplemented") -} +// BlockSearch searches for a paginated set of blocks matching BeginBlock and +// EndBlock event search criteria. +func BlockSearch( + ctx *rpctypes.Context, + query string, + pagePtr, perPagePtr *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + // skip if block indexing is disabled + if env.BlockIndexer == nil { + return nil, errors.New("block indexer is not available") + } + if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok { + return nil, errors.New("block indexing is disabled") + } + + // Use the locally defined maxQueryLength from provider_utils.go + if len(query) > maxQueryLength { + return nil, errors.New("maximum query length exceeded") + } + + q, err := cmtquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) + } + + results, err := env.BlockIndexer.Search(ctx.Context(), q) + if err != nil { + return nil, fmt.Errorf("block search failed: %w", err) + } + + // sort results (must be done before pagination) + switch orderBy { + case "desc", "": + sort.Slice(results, func(i, j int) bool { return results[i] > results[j] }) + + case "asc": + sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) + + default: + return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") + } + + // paginate results + totalCount := len(results) + perPage := validatePerPage(perPagePtr) // Use local function + + page, err := validatePage(pagePtr, perPage, totalCount) // Use local function + if err != nil { + return nil, err + } + + skipCount := validateSkipCount(page, perPage) // Use local function + pageSize := cmtmath.MinInt(perPage, totalCount-skipCount) + + apiResults := make([]*ctypes.ResultBlock, 0, pageSize) + for i := skipCount; i < skipCount+pageSize; i++ { + height := uint64(results[i]) + header, data, err := env.Adapter.RollkitStore.GetBlockData(ctx.Context(), height) + if err != nil { + // If a block referenced by indexer is missing, should we error out or just skip? + // For now, error out. + return nil, fmt.Errorf("failed to get block data for height %d from store: %w", height, err) + } + if header == nil || data == nil { + return nil, fmt.Errorf("nil header or data for height %d from store", height) + } + block, err := ToABCIBlock(header, data) // Use local function + if err != nil { + return nil, fmt.Errorf("failed to convert block at height %d to ABCI block: %w", height, err) + } + apiResults = append(apiResults, &ctypes.ResultBlock{ + Block: block, + BlockID: cmttypes.BlockID{ + Hash: block.Hash(), + }, + }) + } -// GenesisChunked implements client.Client. -func (p *RpcProvider) GenesisChunked(context.Context, uint) (*coretypes.ResultGenesisChunk, error) { - return nil, errors.New("GenesisChunked RPC method is not yet implemented") + return &ctypes.ResultBlockSearch{Blocks: apiResults, TotalCount: totalCount}, nil } -// Block implements client.CometRPC. -func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error) { +// Block gets block at a given height. +// If no height is provided, it will fetch the latest block. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/block +func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { var heightValue uint64 switch { // block tag = included - case height != nil && *height == -1: + case heightPtr != nil && *heightPtr == -1: // heightValue = p.adapter.store.GetDAIncludedHeight() // TODO: implement return nil, errors.New("DA included height not implemented") default: - heightValue = p.normalizeHeight(height) + heightValue = normalizeHeight(heightPtr) } - header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue) + header, data, err := env.Adapter.RollkitStore.GetBlockData(ctx.Context(), heightValue) if err != nil { return nil, err } @@ -51,7 +123,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu if err != nil { return nil, err } - return &coretypes.ResultBlock{ + return &ctypes.ResultBlock{ BlockID: cmttypes.BlockID{ Hash: cmtbytes.HexBytes(hash), PartSetHeader: cmttypes.PartSetHeader{ @@ -63,9 +135,10 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu }, nil } -// BlockByHash implements client.CometRPC. -func (p *RpcProvider) BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) { - header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types +// BlockByHash gets block by hash. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/block_by_hash +func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { + header, data, err := env.Adapter.RollkitStore.GetBlockByHash(ctx.Context(), rlktypes.Hash(hash)) // Used types.Hash from rollkit/types if err != nil { return nil, err } @@ -74,7 +147,7 @@ func (p *RpcProvider) BlockByHash(ctx context.Context, hash []byte) (*coretypes. if err != nil { return nil, err } - return &coretypes.ResultBlock{ + return &ctypes.ResultBlock{ BlockID: cmttypes.BlockID{ Hash: cmtbytes.HexBytes(hash), PartSetHeader: cmttypes.PartSetHeader{ @@ -86,12 +159,36 @@ func (p *RpcProvider) BlockByHash(ctx context.Context, hash []byte) (*coretypes. }, nil } -// BlockResults implements client.CometRPC. -func (p *RpcProvider) BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error) { +// Commit gets block commit at a given height. +// If no height is provided, it will fetch the commit for the latest block. +// More: https://docs.cometbft.com/main/rpc/#/Info/commit +func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { + heightValue := normalizeHeight(heightPtr) + header, data, err := env.Adapter.RollkitStore.GetBlockData(ctx.Context(), heightValue) + if err != nil { + return nil, err + } + + // we should have a single validator + if len(header.ProposerAddress) == 0 { + return nil, errors.New("empty validator set found in block") + } + + commit := getABCICommit(heightValue, header.Hash(), header.ProposerAddress, header.Time(), header.Signature) // Use local function + + block, err := ToABCIBlock(header, data) // Use local function + if err != nil { + return nil, err + } + + return ctypes.NewResultCommit(&block.Header, commit, true), nil +} + +func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { // Currently, this method returns an empty result as the original implementation was commented out. // If block results become available, this implementation should be updated. - _ = p.normalizeHeight(height) // Use height to avoid unused variable error, logic depends on future implementation - return &coretypes.ResultBlockResults{}, nil + _ = normalizeHeight(heightPtr) // Use height to avoid unused variable error, logic depends on future implementation + return &ctypes.ResultBlockResults{}, nil // Original commented-out logic: // var h uint64 // if height == nil { @@ -117,46 +214,26 @@ func (p *RpcProvider) BlockResults(ctx context.Context, height *int64) (*coretyp // }, nil } -// Commit implements client.CometRPC. -func (p *RpcProvider) Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) { - heightValue := p.normalizeHeight(height) - header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue) - if err != nil { - return nil, err - } - - // we should have a single validator - if len(header.ProposerAddress) == 0 { - return nil, errors.New("empty validator set found in block") - } - - commit := getABCICommit(heightValue, header.Hash(), header.ProposerAddress, header.Time(), header.Signature) // Use local function - - block, err := ToABCIBlock(header, data) // Use local function - if err != nil { - return nil, err - } - - return coretypes.NewResultCommit(&block.Header, commit, true), nil -} - -// Header implements client.Client. -func (p *RpcProvider) Header(ctx context.Context, heightPtr *int64) (*coretypes.ResultHeader, error) { - height := p.normalizeHeight(heightPtr) - blockMeta := p.getBlockMeta(ctx, height) +// Header gets block header at a given height. +// If no height is provided, it will fetch the latest header. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/header +func Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, error) { + height := normalizeHeight(heightPtr) + blockMeta := getBlockMeta(ctx.Context(), height) if blockMeta == nil { return nil, fmt.Errorf("block at height %d not found", height) } - return &coretypes.ResultHeader{Header: &blockMeta.Header}, nil + return &ctypes.ResultHeader{Header: &blockMeta.Header}, nil } -// HeaderByHash implements client.Client. -func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes) (*coretypes.ResultHeader, error) { +// HeaderByHash gets header by hash. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/header_by_hash +func HeaderByHash(ctx *rpctypes.Context, hash cmtbytes.HexBytes) (*ctypes.ResultHeader, error) { // N.B. The hash parameter is HexBytes so that the reflective parameter // decoding logic in the HTTP service will correctly translate from JSON. // See https://github.com/cometbft/cometbft/issues/6802 for context. - header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types + header, data, err := env.Adapter.RollkitStore.GetBlockByHash(ctx.Context(), rlktypes.Hash(hash)) // Used types.Hash from rollkit/types if err != nil { return nil, err } @@ -169,17 +246,19 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes) if blockMeta == nil { // Return empty result without error if block not found by hash, consistent with original behaviour? // Or return an error? fmt.Errorf("block with hash %X not found", hash) - return &coretypes.ResultHeader{}, nil // Current behaviour matches original code + return &ctypes.ResultHeader{}, nil // Current behaviour matches original code } - return &coretypes.ResultHeader{Header: &blockMeta.Header}, nil + return &ctypes.ResultHeader{Header: &blockMeta.Header}, nil } -// BlockchainInfo implements client.CometRPC. -func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) { +// BlockchainInfo gets block headers for minHeight <= height <= maxHeight. +// Block headers are returned in descending order (highest first). +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/blockchain +func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { const limit int64 = 20 // Default limit used in the original code - height, err := p.adapter.RollkitStore.Height(ctx) + height, err := env.Adapter.RollkitStore.Height(ctx.Context()) if err != nil { return nil, fmt.Errorf("failed to get current height: %w", err) } @@ -201,7 +280,7 @@ func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHe blocks := make([]*cmttypes.BlockMeta, 0, maxHeight-minHeight+1) for h := maxHeight; h >= minHeight; h-- { // Use getBlockMeta which handles errors and nil checks internally - bMeta := p.getBlockMeta(ctx, uint64(h)) + bMeta := getBlockMeta(ctx.Context(), uint64(h)) if bMeta != nil { blocks = append(blocks, bMeta) } @@ -211,141 +290,13 @@ func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHe // Re-fetch height in case new blocks were added during the loop? // The original code did this. - finalHeight, err := p.adapter.RollkitStore.Height(ctx) + finalHeight, err := env.Adapter.RollkitStore.Height(ctx.Context()) if err != nil { return nil, fmt.Errorf("failed to get final height: %w", err) } - return &coretypes.ResultBlockchainInfo{ + return &ctypes.ResultBlockchainInfo{ LastHeight: int64(finalHeight), //nolint:gosec BlockMetas: blocks, }, nil } - -// BlockSearch implements client.CometRPC. -func (p *RpcProvider) BlockSearch(ctx context.Context, query string, pagePtr *int, perPagePtr *int, orderBy string) (*coretypes.ResultBlockSearch, error) { - // skip if block indexing is disabled - if p.blockIndexer == nil { - return nil, errors.New("block indexer is not available") - } - if _, ok := p.blockIndexer.(*blockidxnull.BlockerIndexer); ok { - return nil, errors.New("block indexing is disabled") - } - - // Use the locally defined maxQueryLength from provider_utils.go - if len(query) > maxQueryLength { - return nil, errors.New("maximum query length exceeded") - } - - q, err := cmtquery.New(query) - if err != nil { - return nil, fmt.Errorf("failed to parse query: %w", err) - } - - results, err := p.blockIndexer.Search(ctx, q) - if err != nil { - return nil, fmt.Errorf("block search failed: %w", err) - } - - // sort results (must be done before pagination) - switch orderBy { - case "desc", "": - sort.Slice(results, func(i, j int) bool { return results[i] > results[j] }) - - case "asc": - sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) - - default: - return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") - } - - // paginate results - totalCount := len(results) - perPage := validatePerPage(perPagePtr) // Use local function - - page, err := validatePage(pagePtr, perPage, totalCount) // Use local function - if err != nil { - return nil, err - } - - skipCount := validateSkipCount(page, perPage) // Use local function - pageSize := cmtmath.MinInt(perPage, totalCount-skipCount) - - apiResults := make([]*coretypes.ResultBlock, 0, pageSize) - for i := skipCount; i < skipCount+pageSize; i++ { - height := uint64(results[i]) - header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, height) - if err != nil { - // If a block referenced by indexer is missing, should we error out or just skip? - // For now, error out. - return nil, fmt.Errorf("failed to get block data for height %d from store: %w", height, err) - } - if header == nil || data == nil { - return nil, fmt.Errorf("nil header or data for height %d from store", height) - } - block, err := ToABCIBlock(header, data) // Use local function - if err != nil { - return nil, fmt.Errorf("failed to convert block at height %d to ABCI block: %w", height, err) - } - apiResults = append(apiResults, &coretypes.ResultBlock{ - Block: block, - BlockID: cmttypes.BlockID{ - Hash: block.Hash(), - }, - }) - } - - return &coretypes.ResultBlockSearch{Blocks: apiResults, TotalCount: totalCount}, nil -} - -// Validators implements client.CometRPC. -func (p *RpcProvider) Validators(ctx context.Context, heightPtr *int64, pagePtr *int, perPagePtr *int) (*coretypes.ResultValidators, error) { - // Determine the height to query validators for. - // If height is nil or latest, use the current block height. - // Otherwise, use the specified height (if state for that height is available). - // Note: Loading state for arbitrary past heights might not be supported - // depending on state pruning. The current implementation implicitly loads latest state. - height := p.normalizeHeight(heightPtr) - - s, err := p.adapter.Store.LoadState(ctx) // Loads the *latest* state - if err != nil { - return nil, fmt.Errorf("failed to load current state: %w", err) - } - - // Check if the requested height matches the loaded state height if a specific height was requested. - // If state history is not kept, this check might be necessary or always fail for past heights. - if heightPtr != nil && int64(height) != s.LastBlockHeight { - // This implies state for the requested height is not available with the current LoadState method. - // Adjust implementation if historical state access is possible and needed. - return nil, fmt.Errorf("validator set for height %d is not available, latest height is %d", height, s.LastBlockHeight) - } - - validators := s.Validators.Validators - totalCount := len(validators) - - // Handle pagination - perPage := validatePerPage(perPagePtr) // Use local function - page, err := validatePage(pagePtr, perPage, totalCount) // Use local function - if err != nil { - return nil, err - } - - start := validateSkipCount(page, perPage) // Use local function - end := start + perPage - if end > totalCount { - end = totalCount - } - - // Ensure start index is not out of bounds, can happen if page * perPage > totalCount - if start >= totalCount { - validators = []*cmttypes.Validator{} // Return empty slice if page is out of range - } else { - validators = validators[start:end] - } - - return &coretypes.ResultValidators{ - BlockHeight: s.LastBlockHeight, // Return the height for which the validator set is valid (latest) - Validators: validators, - Total: totalCount, // Total number of validators *before* pagination - }, nil -} diff --git a/pkg/rpc/core/consensus.go b/pkg/rpc/core/consensus.go new file mode 100644 index 0000000..fd60303 --- /dev/null +++ b/pkg/rpc/core/consensus.go @@ -0,0 +1,116 @@ +package core + +import ( + "fmt" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + cmttypes "github.com/cometbft/cometbft/types" +) + +// Validators gets the validator set at the given block height. +// +// If no height is provided, it will fetch the latest validator set. Note the +// validators are sorted by their voting power - this is the canonical order +// for the validators in the set as used in computing their Merkle root. +// +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/validators +func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*coretypes.ResultValidators, error) { + // Determine the height to query validators for. + // If height is nil or latest, use the current block height. + // Otherwise, use the specified height (if state for that height is available). + // Note: Loading state for arbitrary past heights might not be supported + // depending on state pruning. The current implementation implicitly loads latest state. + height := normalizeHeight(heightPtr) + + s, err := env.Adapter.Store.LoadState(ctx.Context()) // Loads the *latest* state + if err != nil { + return nil, fmt.Errorf("failed to load current state: %w", err) + } + + // Check if the requested height matches the loaded state height if a specific height was requested. + // If state history is not kept, this check might be necessary or always fail for past heights. + if heightPtr != nil && int64(height) != s.LastBlockHeight { + // This implies state for the requested height is not available with the current LoadState method. + // Adjust implementation if historical state access is possible and needed. + return nil, fmt.Errorf("validator set for height %d is not available, latest height is %d", height, s.LastBlockHeight) + } + + validators := s.Validators.Validators + totalCount := len(validators) + + // Handle pagination + perPage := validatePerPage(perPagePtr) // Use local function + page, err := validatePage(pagePtr, perPage, totalCount) // Use local function + if err != nil { + return nil, err + } + + start := validateSkipCount(page, perPage) // Use local function + end := start + perPage + if end > totalCount { + end = totalCount + } + + // Ensure start index is not out of bounds, can happen if page * perPage > totalCount + if start >= totalCount { + validators = []*cmttypes.Validator{} // Return empty slice if page is out of range + } else { + validators = validators[start:end] + } + + return &coretypes.ResultValidators{ + BlockHeight: s.LastBlockHeight, // Return the height for which the validator set is valid (latest) + Validators: validators, + Total: totalCount, // Total number of validators *before* pagination + }, nil +} + +// DumpConsensusState dumps consensus state. +// UNSTABLE +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/dump_consensus_state +func DumpConsensusState(ctx *rpctypes.Context) (*coretypes.ResultDumpConsensusState, error) { + // Rollkit doesn't have Tendermint consensus state. + return nil, ErrConsensusStateNotAvailable +} + +// ConsensusState returns a concise summary of the consensus state. +// UNSTABLE +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/consensus_state +func ConsensusState(ctx *rpctypes.Context) (*coretypes.ResultConsensusState, error) { + // Rollkit doesn't have Tendermint consensus state. + return nil, ErrConsensusStateNotAvailable +} + +// ConsensusParams gets the consensus parameters at the given block height. +// If no height is provided, it will fetch the latest consensus params. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/consensus_params +func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*coretypes.ResultConsensusParams, error) { + state, err := env.Adapter.Store.LoadState(ctx.Context()) + if err != nil { + return nil, err + } + params := state.ConsensusParams + // Use normalizeHeight which should be moved to rpcProvider as well + normalizedHeight := normalizeHeight(heightPtr) // Changed r.normalizeHeight to p.normalizeHeight + return &coretypes.ResultConsensusParams{ + BlockHeight: int64(normalizedHeight), //nolint:gosec + ConsensusParams: cmttypes.ConsensusParams{ + Block: cmttypes.BlockParams{ + MaxBytes: params.Block.MaxBytes, + MaxGas: params.Block.MaxGas, + }, + Evidence: cmttypes.EvidenceParams{ + MaxAgeNumBlocks: params.Evidence.MaxAgeNumBlocks, + MaxAgeDuration: params.Evidence.MaxAgeDuration, + MaxBytes: params.Evidence.MaxBytes, + }, + Validator: cmttypes.ValidatorParams{ + PubKeyTypes: params.Validator.PubKeyTypes, + }, + Version: cmttypes.VersionParams{ + App: params.Version.App, + }, + }, + }, nil +} diff --git a/pkg/rpc/core/const.go b/pkg/rpc/core/const.go new file mode 100644 index 0000000..c0b8721 --- /dev/null +++ b/pkg/rpc/core/const.go @@ -0,0 +1,10 @@ +package core + +const ( + // maxQueryLength is the maximum length of a query string that will be + // accepted. This is just a safety check to avoid outlandish queries. + maxQueryLength = 512 + + defaultPerPage = 30 + maxPerPage = 100 +) diff --git a/pkg/rpc/core/env.go b/pkg/rpc/core/env.go new file mode 100644 index 0000000..19285da --- /dev/null +++ b/pkg/rpc/core/env.go @@ -0,0 +1,75 @@ +package core + +import ( + "fmt" + + cmtlog "github.com/cometbft/cometbft/libs/log" + "github.com/cometbft/cometbft/state/indexer" + "github.com/cometbft/cometbft/state/txindex" + + "github.com/rollkit/go-execution-abci/pkg/adapter" +) + +var ( + // set by Node + env *Environment +) + +// SetEnvironment sets up the given Environment. +// It will race if multiple Node call SetEnvironment. +func SetEnvironment(e *Environment) { + env = e +} + +// Environment contains objects and interfaces used by the RPC. It is expected +// to be setup once during startup. +type Environment struct { + Adapter *adapter.Adapter + TxIndexer txindex.TxIndexer + BlockIndexer indexer.BlockIndexer + Logger cmtlog.Logger +} + +func validateSkipCount(page, perPage int) int { + skipCount := (page - 1) * perPage + if skipCount < 0 { + return 0 + } + + return skipCount +} + +func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { + if perPage < 1 { + panic(fmt.Sprintf("zero or negative perPage: %d", perPage)) + } + + if pagePtr == nil { // no page parameter + return 1, nil + } + + pages := ((totalCount - 1) / perPage) + 1 + if pages == 0 { + pages = 1 // one page (even if it's empty) + } + page := *pagePtr + if page <= 0 || page > pages { + return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page) + } + + return page, nil +} + +func validatePerPage(perPagePtr *int) int { + if perPagePtr == nil { // no per_page parameter + return defaultPerPage + } + + perPage := *perPagePtr + if perPage < 1 { + return defaultPerPage + } else if perPage > maxPerPage { + return maxPerPage + } + return perPage +} diff --git a/pkg/rpc/core/errors.go b/pkg/rpc/core/errors.go new file mode 100644 index 0000000..71faf17 --- /dev/null +++ b/pkg/rpc/core/errors.go @@ -0,0 +1,7 @@ +package core + +import "errors" + +// ErrConsensusStateNotAvailable is returned because Rollkit doesn't use Tendermint consensus. +// Exported error. +var ErrConsensusStateNotAvailable = errors.New("consensus state not available in Rollkit") // Changed to exported diff --git a/pkg/rpc/provider/events.go b/pkg/rpc/core/events.go similarity index 56% rename from pkg/rpc/provider/events.go rename to pkg/rpc/core/events.go index 85f6885..f884184 100644 --- a/pkg/rpc/provider/events.go +++ b/pkg/rpc/core/events.go @@ -1,17 +1,17 @@ -package provider +package core import ( - "context" "errors" - coretypes "github.com/cometbft/cometbft/rpc/core/types" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" ) -// Subscribe implements client.Client. -// This functionality is currently not implemented. -func (p *RpcProvider) Subscribe(ctx context.Context, subscriber string, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) { +// Subscribe for events via WebSocket. +// More: https://docs.cometbft.com/v0.37/rpc/#/Websocket/subscribe +func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { // Check if EventBus is available - if p.adapter.EventBus == nil { + if env.Adapter.EventBus == nil { return nil, errors.New("event bus is not configured, cannot subscribe to events") } // TODO: Implement subscription logic using p.adapter.EventBus @@ -37,11 +37,11 @@ func (p *RpcProvider) Subscribe(ctx context.Context, subscriber string, query st return nil, errors.New("event subscription functionality is not yet implemented") } -// Unsubscribe implements client.Client. -// This functionality is currently not implemented. -func (p *RpcProvider) Unsubscribe(ctx context.Context, subscriber string, query string) error { - if p.adapter.EventBus == nil { - return errors.New("event bus is not configured, cannot unsubscribe") +// Unsubscribe from events via WebSocket. +// More: https://docs.cometbft.com/v0.37/rpc/#/Websocket/unsubscribe +func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + if env.Adapter.EventBus == nil { + return nil, errors.New("event bus is not configured, cannot unsubscribe") } // TODO: Implement unsubscription logic using p.adapter.EventBus // Example structure: @@ -50,16 +50,14 @@ func (p *RpcProvider) Unsubscribe(ctx context.Context, subscriber string, query // return fmt.Errorf("failed to parse query: %w", err) // } // return p.adapter.EventBus.Unsubscribe(ctx, subscriber, q) - return errors.New("event unsubscription functionality is not yet implemented") + return nil, errors.New("event unsubscription functionality is not yet implemented") } -// UnsubscribeAll implements client.Client. -// This functionality is currently not implemented. -func (p *RpcProvider) UnsubscribeAll(ctx context.Context, subscriber string) error { - if p.adapter.EventBus == nil { - return errors.New("event bus is not configured, cannot unsubscribe") +func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + if env.Adapter.EventBus == nil { + return nil, errors.New("event bus is not configured, cannot unsubscribe") } // TODO: Implement unsubscribe all logic using p.adapter.EventBus // return p.adapter.EventBus.UnsubscribeAll(ctx, subscriber) - return errors.New("event unsubscribe all functionality is not yet implemented") + return nil, errors.New("event unsubscribe all functionality is not yet implemented") } diff --git a/pkg/rpc/provider/evidence.go b/pkg/rpc/core/evidence.go similarity index 51% rename from pkg/rpc/provider/evidence.go rename to pkg/rpc/core/evidence.go index de1f793..18098f4 100644 --- a/pkg/rpc/provider/evidence.go +++ b/pkg/rpc/core/evidence.go @@ -1,10 +1,8 @@ -package provider +package core import ( - "context" - - // Check if logger is used - coretypes "github.com/cometbft/cometbft/rpc/core/types" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" cmttypes "github.com/cometbft/cometbft/types" ) @@ -12,10 +10,10 @@ import ( // as Rollkit doesn't handle evidence in the same way as Tendermint. // It returns a successful response with the evidence hash, mimicking Tendermint's behaviour // without actually processing or storing the evidence. -func (p *RpcProvider) BroadcastEvidence(_ context.Context, evidence cmttypes.Evidence) (*coretypes.ResultBroadcastEvidence, error) { +func BroadcastEvidence(ctx *rpctypes.Context, ev cmttypes.Evidence) (*ctypes.ResultBroadcastEvidence, error) { // Log that evidence broadcasting is not supported or is a no-op? - p.logger.Debug("BroadcastEvidence called, but evidence handling is not implemented in Rollkit RPC.") - return &coretypes.ResultBroadcastEvidence{ - Hash: evidence.Hash(), + env.Logger.Debug("BroadcastEvidence called, but evidence handling is not implemented in Rollkit RPC.") + return &ctypes.ResultBroadcastEvidence{ + Hash: ev.Hash(), }, nil } diff --git a/pkg/rpc/core/health.go b/pkg/rpc/core/health.go new file mode 100644 index 0000000..71607dc --- /dev/null +++ b/pkg/rpc/core/health.go @@ -0,0 +1,13 @@ +package core + +import ( + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" +) + +// Health gets node health. Returns empty result (200 OK) on success, no +// response - in case of an error. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/health +func Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { + return &ctypes.ResultHealth{}, nil +} diff --git a/pkg/rpc/provider/mempool.go b/pkg/rpc/core/mempool.go similarity index 54% rename from pkg/rpc/provider/mempool.go rename to pkg/rpc/core/mempool.go index 6c3625f..07e9250 100644 --- a/pkg/rpc/provider/mempool.go +++ b/pkg/rpc/core/mempool.go @@ -1,4 +1,4 @@ -package provider +package core import ( "context" @@ -6,22 +6,32 @@ import ( "fmt" "time" - abci "github.com/cometbft/cometbft/abci/types" // Check if logger is used here + abci "github.com/cometbft/cometbft/abci/types" cmtmath "github.com/cometbft/cometbft/libs/math" "github.com/cometbft/cometbft/mempool" - coretypes "github.com/cometbft/cometbft/rpc/core/types" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" cmttypes "github.com/cometbft/cometbft/types" + + execp2p "github.com/rollkit/go-execution-abci/pkg/p2p" ) // Define timeout for waiting for TX commit event const subscribeTimeout = 5 * time.Second // TODO: Make configurable? -// BroadcastTxAsync implements client.CometRPC. -func (p *RpcProvider) BroadcastTxAsync(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error) { +//----------------------------------------------------------------------------- +// NOTE: tx should be signed, but this is only checked at the app level (not by CometBFT!) + +// BroadcastTxAsync returns right away, with no response. Does not wait for +// CheckTx nor DeliverTx results. +// More: https://docs.cometbft.com/v0.37/rpc/#/Tx/broadcast_tx_async +func BroadcastTxAsync(ctx *rpctypes.Context, tx cmttypes.Tx) (*ctypes.ResultBroadcastTx, error) { + unwrappedCtx := ctx.Context() + var res *abci.ResponseCheckTx responseCh := make(chan *abci.ResponseCheckTx, 1) - err := p.adapter.Mempool.CheckTx(tx, func(response *abci.ResponseCheckTx) { + err := env.Adapter.Mempool.CheckTx(tx, func(response *abci.ResponseCheckTx) { responseCh <- response }, mempool.TxInfo{}) if err != nil { @@ -32,14 +42,14 @@ func (p *RpcProvider) BroadcastTxAsync(ctx context.Context, tx cmttypes.Tx) (*co select { case res = <-responseCh: // Successfully received CheckTx response - case <-ctx.Done(): - return nil, fmt.Errorf("context cancelled while waiting for CheckTx response: %w", ctx.Err()) + case <-unwrappedCtx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for CheckTx response: %w", unwrappedCtx.Err()) } // Note: Original code didn't gossip on async. If gossiping is desired here, // it should be added similarly to BroadcastTxSync, potentially after checking res.Code. - return &coretypes.ResultBroadcastTx{ + return &ctypes.ResultBroadcastTx{ Code: res.Code, Data: res.Data, Log: res.Log, @@ -48,59 +58,116 @@ func (p *RpcProvider) BroadcastTxAsync(ctx context.Context, tx cmttypes.Tx) (*co }, nil } -// BroadcastTxCommit implements client.CometRPC. -func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTxCommit, error) { +// BroadcastTxSync returns with the response from CheckTx. Does not wait for +// DeliverTx result. +// More: https://docs.cometbft.com/v0.37/rpc/#/Tx/broadcast_tx_sync +func BroadcastTxSync(ctx *rpctypes.Context, tx cmttypes.Tx) (*ctypes.ResultBroadcastTx, error) { + unwrappedCtx := ctx.Context() + + resCh := make(chan *abci.ResponseCheckTx, 1) + err := env.Adapter.Mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { + select { + case <-unwrappedCtx.Done(): + return + case resCh <- res: + } + }, mempool.TxInfo{}) + if err != nil { + return nil, fmt.Errorf("error during CheckTx: %w", err) + } + + var res *abci.ResponseCheckTx + select { + case res = <-resCh: + // Got response + case <-unwrappedCtx.Done(): + return nil, fmt.Errorf("context cancelled waiting for CheckTx: %w", unwrappedCtx.Err()) + } + + // Gossip the transaction if it passed CheckTx. + if res.Code == abci.CodeTypeOK { + if env.Adapter.TxGossiper == nil { + return nil, execp2p.ErrNotReady // Cannot gossip if gossiper is not ready + } + + err = env.Adapter.TxGossiper.Publish(unwrappedCtx, tx) + if err != nil { + // If gossiping fails, remove the tx from the mempool to allow resubmission. + // This matches the behaviour described in the original comments. + rmErr := env.Adapter.Mempool.RemoveTxByKey(tx.Key()) + if rmErr != nil { + // Log if removal also failed, but return the gossip error primarily. + env.Logger.Error("Failed to remove tx from mempool after gossip failure", "tx_key", tx.Key(), "removal_error", rmErr) + } + return nil, fmt.Errorf("failed to gossip tx: %w", err) + } + } + + return &ctypes.ResultBroadcastTx{ + Code: res.Code, + Data: res.Data, + Log: res.Log, + Codespace: res.Codespace, + Hash: tx.Hash(), + }, nil +} + +// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. +// More: https://docs.cometbft.com/v0.37/rpc/#/Tx/broadcast_tx_commit +func BroadcastTxCommit(ctx *rpctypes.Context, tx cmttypes.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + unwrappedCtx := ctx.Context() + // This implementation corresponds to Tendermint's implementation from rpc/core/mempool.go. // ctx.RemoteAddr godoc: If neither HTTPReq nor WSConn is set, an empty string is returned. // This code is a local client, so we can assume that subscriber is "" // TODO: Check if assuming empty subscriber is always correct for this context. subscriber := "" // ctx.RemoteAddr() - if p.adapter.EventBus == nil { + if env.Adapter.EventBus == nil { return nil, errors.New("event bus is not configured, cannot subscribe to events") } // Use CometBFT config values directly if available // TODO: Access these config values properly, perhaps via RpcProvider struct if needed - maxSubs := 100 // Placeholder - maxClients := 100 // Placeholder - commitTimeout := p.adapter.CometCfg.RPC.TimeoutBroadcastTxCommit // Assuming CometCfg is accessible + maxSubs := 100 // Placeholder + maxClients := 100 // Placeholder + commitTimeout := env.Adapter.CometCfg.RPC.TimeoutBroadcastTxCommit // Assuming CometCfg is accessible - if p.adapter.EventBus.NumClients() >= maxClients { + if env.Adapter.EventBus.NumClients() >= maxClients { return nil, fmt.Errorf("max_subscription_clients %d reached", maxClients) - } else if p.adapter.EventBus.NumClientSubscriptions(subscriber) >= maxSubs { + } else if env.Adapter.EventBus.NumClientSubscriptions(subscriber) >= maxSubs { return nil, fmt.Errorf("max_subscriptions_per_client %d reached", maxSubs) } // Subscribe to tx being committed in block. - subCtx, cancel := context.WithTimeout(ctx, subscribeTimeout) + subCtx, cancel := context.WithTimeout(unwrappedCtx, subscribeTimeout) defer cancel() q := cmttypes.EventQueryTxFor(tx) - deliverTxSub, err := p.adapter.EventBus.Subscribe(subCtx, subscriber, q) + deliverTxSub, err := env.Adapter.EventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = fmt.Errorf("failed to subscribe to tx: %w", err) - p.logger.Error("Error on broadcast_tx_commit", "err", err) + env.Logger.Error("Error on broadcast_tx_commit", "err", err) return nil, err } defer func() { - if err := p.adapter.EventBus.Unsubscribe(ctx, subscriber, q); err != nil { - p.logger.Error("Error unsubscribing from eventBus", "err", err) + if err := env.Adapter.EventBus.Unsubscribe(ctx.Context(), subscriber, q); err != nil { + env.Logger.Error("Error unsubscribing from eventBus", "err", err) } }() // Add to mempool and wait for CheckTx result checkTxResCh := make(chan *abci.ResponseCheckTx, 1) - err = p.adapter.Mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { + err = env.Adapter.Mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { select { - case <-ctx.Done(): + case <-unwrappedCtx.Done(): // Context cancelled before CheckTx completed - p.logger.Error("Context cancelled during CheckTx in BroadcastTxCommit") + env.Logger.Error("Context cancelled during CheckTx in BroadcastTxCommit") return case checkTxResCh <- res: } }, mempool.TxInfo{}) if err != nil { - p.logger.Error("Error on CheckTx in BroadcastTxCommit", "err", err) + env.Logger.Error("Error on CheckTx in BroadcastTxCommit", "err", err) return nil, fmt.Errorf("error on CheckTx in broadcastTxCommit: %w", err) } @@ -108,12 +175,12 @@ func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*c select { case checkTxRes = <-checkTxResCh: // Got CheckTx response - case <-ctx.Done(): - return nil, fmt.Errorf("context cancelled while waiting for CheckTx response: %w", ctx.Err()) + case <-unwrappedCtx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for CheckTx response: %w", unwrappedCtx.Err()) } if checkTxRes.Code != abci.CodeTypeOK { - return &coretypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, // Included for consistency, though tx didn't make it to a block Hash: tx.Hash(), @@ -121,14 +188,14 @@ func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*c } // Broadcast tx gossip only if CheckTx passed - if p.adapter.TxGossiper == nil { - return nil, errors.New("tx gossiper is not ready") + if env.Adapter.TxGossiper == nil { + return nil, execp2p.ErrNotReady // Cannot gossip if gossiper is nil } - err = p.adapter.TxGossiper.Publish(ctx, tx) + err = env.Adapter.TxGossiper.Publish(unwrappedCtx, tx) if err != nil { // Note: If gossiping fails, the tx is still in the local mempool and subscribed. // Tendermint's original behaviour might differ here. Consider if tx should be removed from mempool. - p.logger.Error("tx added to local mempool but failed to broadcast", "err", err) + env.Logger.Error("tx added to local mempool but failed to broadcast", "err", err) return nil, fmt.Errorf("tx added to local mempool but failure to broadcast: %w", err) } @@ -138,9 +205,9 @@ func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*c if !ok { // Channel closed, possibly due to unsubscribe or EventBus shutdown err = fmt.Errorf("subscription channel closed unexpectedly: %w", deliverTxSub.Err()) - p.logger.Error("Error on broadcastTxCommit", "err", err) + env.Logger.Error("Error on broadcastTxCommit", "err", err) // Return the CheckTx result as the tx wasn't confirmed in a block - return &coretypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), @@ -149,14 +216,14 @@ func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*c deliverTxRes, ok := msg.Data().(cmttypes.EventDataTx) if !ok { err = fmt.Errorf("unexpected event data type: got %T, expected %T", msg.Data(), cmttypes.EventDataTx{}) - p.logger.Error("Error on broadcastTxCommit", "err", err) - return &coretypes.ResultBroadcastTxCommit{ + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), }, err } - return &coretypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: deliverTxRes.Result, Hash: tx.Hash(), @@ -170,25 +237,25 @@ func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*c reason = deliverTxSub.Err().Error() } err = fmt.Errorf("subscription was cancelled (reason: %s)", reason) - p.logger.Error("Error on broadcastTxCommit", "err", err) - return &coretypes.ResultBroadcastTxCommit{ + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), }, err case <-time.After(commitTimeout): err = errors.New("timed out waiting for tx to be included in a block") - p.logger.Error("Error on broadcastTxCommit", "err", err) - return &coretypes.ResultBroadcastTxCommit{ + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), }, err - case <-ctx.Done(): + case <-unwrappedCtx.Done(): // Parent context cancelled - err = fmt.Errorf("context cancelled while waiting for tx commit event: %w", ctx.Err()) - p.logger.Error("Error on broadcastTxCommit", "err", err) - return &coretypes.ResultBroadcastTxCommit{ + err = fmt.Errorf("context cancelled while waiting for tx commit event: %w", unwrappedCtx.Err()) + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), @@ -196,63 +263,51 @@ func (p *RpcProvider) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*c } } -// BroadcastTxSync implements client.CometRPC. -func (p *RpcProvider) BroadcastTxSync(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error) { - resCh := make(chan *abci.ResponseCheckTx, 1) - err := p.adapter.Mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { - select { - case <-ctx.Done(): - return - case resCh <- res: - } - }, mempool.TxInfo{}) - if err != nil { - return nil, fmt.Errorf("error during CheckTx: %w", err) - } - - var res *abci.ResponseCheckTx - select { - case res = <-resCh: - case <-ctx.Done(): - return nil, fmt.Errorf("context cancelled waiting for CheckTx: %w", ctx.Err()) - } +// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries) +// including their number. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/unconfirmed_txs +func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { + txs := env.Adapter.Mempool.ReapMaxTxs(-1) // Reap all transactions - // Gossip the transaction if it passed CheckTx. - if res.Code == abci.CodeTypeOK { - if p.adapter.TxGossiper == nil { - return nil, errors.New("tx gossiper is not ready") - } - - err = p.adapter.TxGossiper.Publish(ctx, tx) - if err != nil { - // If gossiping fails, remove the tx from the mempool to allow resubmission. - // This matches the behaviour described in the original comments. - rmErr := p.adapter.Mempool.RemoveTxByKey(tx.Key()) - if rmErr != nil { - // Log if removal also failed, but return the gossip error primarily. - p.logger.Error("Failed to remove tx from mempool after gossip failure", "tx_key", tx.Key(), "removal_error", rmErr) - } - return nil, fmt.Errorf("failed to gossip tx: %w", err) + limit := len(txs) + if limitPtr != nil { + limit = cmtmath.MinInt(*limitPtr, limit) + if limit < 0 { + limit = 0 } } + paginatedTxs := txs[:limit] - return &coretypes.ResultBroadcastTx{ - Code: res.Code, - Data: res.Data, - Log: res.Log, - Codespace: res.Codespace, - Hash: tx.Hash(), + return &ctypes.ResultUnconfirmedTxs{ + Count: len(paginatedTxs), + Total: env.Adapter.Mempool.Size(), + TotalBytes: env.Adapter.Mempool.SizeBytes(), + Txs: paginatedTxs, }, nil } -// CheckTx implements client.Client. -func (p *RpcProvider) CheckTx(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultCheckTx, error) { +// NumUnconfirmedTxs gets number of unconfirmed transactions. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/num_unconfirmed_txs +func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { + return &ctypes.ResultUnconfirmedTxs{ + Count: env.Adapter.Mempool.Size(), + Total: env.Adapter.Mempool.Size(), // Assuming Total means current size + TotalBytes: env.Adapter.Mempool.SizeBytes(), + }, nil +} + +// CheckTx checks the transaction without executing it. The transaction won't +// be added to the mempool either. +// More: https://docs.cometbft.com/v0.37/rpc/#/Tx/check_tx +func CheckTx(ctx *rpctypes.Context, tx cmttypes.Tx) (*ctypes.ResultCheckTx, error) { + unwrappedCtx := ctx.Context() + var res *abci.ResponseCheckTx responseCh := make(chan *abci.ResponseCheckTx, 1) - err := p.adapter.Mempool.CheckTx(tx, func(response *abci.ResponseCheckTx) { + err := env.Adapter.Mempool.CheckTx(tx, func(response *abci.ResponseCheckTx) { select { - case <-ctx.Done(): + case <-unwrappedCtx.Done(): return case responseCh <- response: } @@ -264,38 +319,8 @@ func (p *RpcProvider) CheckTx(ctx context.Context, tx cmttypes.Tx) (*coretypes.R // Wait for the callback to be called or context cancellation select { case res = <-responseCh: - return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil - case <-ctx.Done(): - return nil, fmt.Errorf("context cancelled while waiting for CheckTx response: %w", ctx.Err()) + return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil + case <-unwrappedCtx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for CheckTx response: %w", unwrappedCtx.Err()) } } - -// NumUnconfirmedTxs implements client.Client. -func (p *RpcProvider) NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error) { - return &coretypes.ResultUnconfirmedTxs{ - Count: p.adapter.Mempool.Size(), - Total: p.adapter.Mempool.Size(), // Assuming Total means current size - TotalBytes: p.adapter.Mempool.SizeBytes(), - }, nil -} - -// UnconfirmedTxs implements client.CometRPC. -func (p *RpcProvider) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) { - txs := p.adapter.Mempool.ReapMaxTxs(-1) // Reap all transactions - - limit := len(txs) - if limitPtr != nil { - limit = cmtmath.MinInt(*limitPtr, limit) - if limit < 0 { - limit = 0 - } - } - paginatedTxs := txs[:limit] - - return &coretypes.ResultUnconfirmedTxs{ - Count: len(paginatedTxs), - Total: p.adapter.Mempool.Size(), - TotalBytes: p.adapter.Mempool.SizeBytes(), - Txs: paginatedTxs, - }, nil -} diff --git a/pkg/rpc/core/net.go b/pkg/rpc/core/net.go new file mode 100644 index 0000000..4601b89 --- /dev/null +++ b/pkg/rpc/core/net.go @@ -0,0 +1,56 @@ +package core + +import ( + "errors" + + "github.com/cometbft/cometbft/p2p" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" +) + +func NetInfo(ctx *rpctypes.Context) (*coretypes.ResultNetInfo, error) { + res := coretypes.ResultNetInfo{ + Listening: true, // Assuming node is always listening if RPC is up + Listeners: []string{}, // TODO: Populate with actual listener addresses if available + } + + // Access P2P client from adapter to get peer info + if env.Adapter.P2PClient != nil { + for _, ma := range env.Adapter.P2PClient.Addrs() { + res.Listeners = append(res.Listeners, ma.String()) + } + peers := env.Adapter.P2PClient.Peers() + res.NPeers = len(peers) + for _, peer := range peers { + // Convert peer info to coretypes.Peer + // Ensure p2p.DefaultNodeInfo is correctly populated from peer.NodeInfo + res.Peers = append(res.Peers, coretypes.Peer{ + NodeInfo: p2p.DefaultNodeInfo{ // Adapt this based on actual available PeerInfo structure + // Access fields via peer.NodeInfo + DefaultNodeID: p2p.ID(peer.NodeInfo.NodeID), + ListenAddr: peer.NodeInfo.ListenAddr, + Network: peer.NodeInfo.Network, + // Other fields like Moniker, Version might be available too + }, + IsOutbound: peer.IsOutbound, + RemoteIP: peer.RemoteIP, + }) + } + } else { + // Handle case where P2P client is not available or initialized + res.NPeers = 0 + res.Peers = []coretypes.Peer{} + } + + return &res, nil +} + +func Genesis(ctx *rpctypes.Context) (*coretypes.ResultGenesis, error) { + // Returning unimplemented as per the original code. + // Consider implementing or returning a more specific error if needed. + panic("unimplemented") +} + +func GenesisChunked(ctx *rpctypes.Context, chunk uint) (*coretypes.ResultGenesisChunk, error) { + return nil, errors.New("GenesisChunked RPC method is not yet implemented") +} diff --git a/pkg/rpc/core/routes.go b/pkg/rpc/core/routes.go new file mode 100644 index 0000000..9750263 --- /dev/null +++ b/pkg/rpc/core/routes.go @@ -0,0 +1,48 @@ +package core + +import ( + "github.com/cometbft/cometbft/rpc/jsonrpc/server" +) + +var Routes = map[string]*server.RPCFunc{ + // subscribe/unsubscribe are reserved for websocket events. + "subscribe": server.NewWSRPCFunc(Subscribe, "query"), + "unsubscribe": server.NewWSRPCFunc(Unsubscribe, "query"), + "unsubscribe_all": server.NewWSRPCFunc(UnsubscribeAll, ""), + + // // info API + "health": server.NewRPCFunc(Health, ""), + "status": server.NewRPCFunc(Status, ""), + "net_info": server.NewRPCFunc(NetInfo, ""), + "blockchain": server.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight", server.Cacheable()), + "genesis": server.NewRPCFunc(Genesis, "", server.Cacheable()), + "genesis_chunked": server.NewRPCFunc(GenesisChunked, "chunk", server.Cacheable()), + "block": server.NewRPCFunc(Block, "height", server.Cacheable("height")), + "block_by_hash": server.NewRPCFunc(BlockByHash, "hash", server.Cacheable()), + "block_results": server.NewRPCFunc(BlockResults, "height", server.Cacheable("height")), + "commit": server.NewRPCFunc(Commit, "height", server.Cacheable("height")), + "header": server.NewRPCFunc(Header, "height", server.Cacheable("height")), + "header_by_hash": server.NewRPCFunc(HeaderByHash, "hash", server.Cacheable()), + "check_tx": server.NewRPCFunc(CheckTx, "tx"), + "tx": server.NewRPCFunc(Tx, "hash,prove", server.Cacheable()), + "tx_search": server.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), + "block_search": server.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), + "validators": server.NewRPCFunc(Validators, "height,page,per_page", server.Cacheable("height")), + "dump_consensus_state": server.NewRPCFunc(DumpConsensusState, ""), + "consensus_state": server.NewRPCFunc(ConsensusState, ""), + "consensus_params": server.NewRPCFunc(ConsensusParams, "height", server.Cacheable("height")), + "unconfirmed_txs": server.NewRPCFunc(UnconfirmedTxs, "limit"), + "num_unconfirmed_txs": server.NewRPCFunc(NumUnconfirmedTxs, ""), + + // // tx broadcast API + "broadcast_tx_commit": server.NewRPCFunc(BroadcastTxCommit, "tx"), + "broadcast_tx_sync": server.NewRPCFunc(BroadcastTxSync, "tx"), + "broadcast_tx_async": server.NewRPCFunc(BroadcastTxAsync, "tx"), + + // // abci API + "abci_query": server.NewRPCFunc(ABCIQuery, "path,data,height,prove"), + "abci_info": server.NewRPCFunc(ABCIInfo, "", server.Cacheable()), + + // // evidence API + "broadcast_evidence": server.NewRPCFunc(BroadcastEvidence, "evidence"), +} diff --git a/pkg/rpc/core/status.go b/pkg/rpc/core/status.go new file mode 100644 index 0000000..e70bae0 --- /dev/null +++ b/pkg/rpc/core/status.go @@ -0,0 +1,63 @@ +package core + +import ( + abci "github.com/cometbft/cometbft/abci/types" + cmtbytes "github.com/cometbft/cometbft/libs/bytes" + "github.com/cometbft/cometbft/p2p" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" +) + +// Status returns CometBFT status including node info, pubkey, latest block +// hash, app hash, block height and time. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/status +func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { + info, err := env.Adapter.App.Info(&abci.RequestInfo{}) + if err != nil { + return nil, err + } + + s, err := env.Adapter.Store.LoadState(ctx.Context()) + if err != nil { + return nil, err + } + + // TODO: Populate NodeInfo properly + nodeInfo := p2p.DefaultNodeInfo{ + // ProtocolVersion: p2p.NewProtocolVersion( + // // Assuming these are accessible or definable constants/config values + // version.P2PProtocol, // Assuming 'version' package or similar imported + // version.BlockProtocol, + // version.AppProtocol, + // ), + // DefaultNodeID: // Needs node key -> p.nodeKey.ID() ? Requires passing nodeKey to RpcProvider + // ListenAddr: // Needs listener address -> listener.Addr().String() ? Requires access to listener + // Network: // Needs network/chain ID -> p.config.ChainID ? Requires access to config + // Version: // Needs application version -> version.TMCoreSemVer ? + // Channels: // Needs channel info -> p.channels // Requires access to p2p channels + // Moniker: // Needs moniker -> p.config.Moniker ? + // Other fields like Moniker, Version might be available too + } + // Check if the node key info is readily available in adapter or needs to be passed separately. + // If p.adapter.P2PClient is accessible and has NodeInfo: + // nodeKey := p.adapter.P2PClient.NodeInfo() // Hypothetical method + // if nodeKey != nil { + // nodeInfo = *nodeKey + // } + + return &ctypes.ResultStatus{ + NodeInfo: nodeInfo, // Use the populated or default NodeInfo + SyncInfo: ctypes.SyncInfo{ + // LatestBlockHash: // Need block meta -> latestBlockMeta.BlockID.Hash + LatestAppHash: cmtbytes.HexBytes(info.LastBlockAppHash), + LatestBlockHeight: info.LastBlockHeight, + LatestBlockTime: s.LastBlockTime, + // CatchingUp: // Requires sync status logic + }, + ValidatorInfo: ctypes.ValidatorInfo{ // Assumes single validator/sequencer model + Address: s.Validators.Proposer.Address, + PubKey: s.Validators.Proposer.PubKey, + VotingPower: s.Validators.Proposer.VotingPower, + }, + }, nil +} diff --git a/pkg/rpc/provider/tx.go b/pkg/rpc/core/tx.go similarity index 62% rename from pkg/rpc/provider/tx.go rename to pkg/rpc/core/tx.go index 048e0e2..3d77394 100644 --- a/pkg/rpc/provider/tx.go +++ b/pkg/rpc/core/tx.go @@ -1,47 +1,38 @@ -package provider +package core import ( - "context" "errors" "fmt" "sort" cmtmath "github.com/cometbft/cometbft/libs/math" cmtquery "github.com/cometbft/cometbft/libs/pubsub/query" - coretypes "github.com/cometbft/cometbft/rpc/core/types" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" "github.com/cometbft/cometbft/state/txindex/null" - cmttypes "github.com/cometbft/cometbft/types" + "github.com/cometbft/cometbft/types" ) -// Tx implements client.CometRPC. -func (p *RpcProvider) Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error) { - // Check if tx indexing is disabled - if p.txIndexer == nil { - return nil, fmt.Errorf("transaction indexing is disabled") - } - if _, ok := p.txIndexer.(*null.TxIndex); ok { +// Tx allows you to query the transaction results. `nil` could mean the +// transaction is in the mempool, invalidated, or was not sent in the first +// place. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/tx +func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + // if index is disabled, return error + if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("transaction indexing is disabled") } - txResult, err := p.txIndexer.Get(hash) + r, err := env.TxIndexer.Get(hash) if err != nil { - // If the tx is not found, return nil result without error, maintaining behaviour. - // This differs from some Tendermint versions that might return an error. - // TODO: Consider aligning error handling with target Tendermint version if necessary. - // A more robust check might involve type asserting the error if txIndexer returns a specific "not found" error type. - if txResult == nil { // Heuristic check if error implies not found - return nil, nil - } - // Return other errors encountered during Get - return nil, fmt.Errorf("error getting tx from indexer: %w", err) + return nil, err } - if txResult == nil { - // Tx not found - return nil, nil + if r == nil { + return nil, fmt.Errorf("tx (%X) not found", hash) } - var proof cmttypes.TxProof + var proof types.TxProof if prove { // Proof generation is currently not supported. // When supported, logic to retrieve block and compute proof would go here. @@ -58,25 +49,32 @@ func (p *RpcProvider) Tx(ctx context.Context, hash []byte, prove bool) (*coretyp proof = blockRes.Block.Data.Txs.Proof(int(txResult.Index)) */ return nil, errors.New("transaction proof generation is not supported") // Return error as proofs aren't supported + // block := env.BlockStore.LoadBlock(r.Height) + // proof = block.Data.Txs.Proof(int(r.Index)) } - return &coretypes.ResultTx{ + return &ctypes.ResultTx{ Hash: hash, - Height: txResult.Height, - Index: txResult.Index, - TxResult: txResult.Result, - Tx: txResult.Tx, - Proof: proof, // Will be empty if prove is false or unsupported + Height: r.Height, + Index: r.Index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, }, nil } -// TxSearch implements client.CometRPC. -func (p *RpcProvider) TxSearch(ctx context.Context, query string, prove bool, pagePtr *int, perPagePtr *int, orderBy string) (*coretypes.ResultTxSearch, error) { +// TxSearch allows you to query for multiple transactions results. It returns a +// list of transactions (maximum ?per_page entries) and the total count. +// More: https://docs.cometbft.com/v0.37/rpc/#/Info/tx_search +func TxSearch( + ctx *rpctypes.Context, + query string, + prove bool, + pagePtr, perPagePtr *int, + orderBy string, +) (*ctypes.ResultTxSearch, error) { // Check if tx indexing is disabled - if p.txIndexer == nil { - return nil, fmt.Errorf("transaction indexing is disabled") - } - if _, ok := p.txIndexer.(*null.TxIndex); ok { + if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("transaction indexing is disabled") } @@ -89,7 +87,7 @@ func (p *RpcProvider) TxSearch(ctx context.Context, query string, prove bool, pa return nil, fmt.Errorf("failed to parse query: %w", err) } - txResults, err := p.txIndexer.Search(ctx, q) + txResults, err := env.TxIndexer.Search(ctx.Context(), q) if err != nil { return nil, fmt.Errorf("error searching txs: %w", err) } @@ -133,17 +131,17 @@ func (p *RpcProvider) TxSearch(ctx context.Context, query string, prove bool, pa skipCount := validateSkipCount(page, perPage) // Removed rpc. prefix pageSize := cmtmath.MinInt(perPage, totalCount-skipCount) - apiResults := make([]*coretypes.ResultTx, 0, pageSize) + apiResults := make([]*ctypes.ResultTx, 0, pageSize) for i := skipCount; i < skipCount+pageSize; i++ { result := txResults[i] - var proof cmttypes.TxProof + var proof types.TxProof if prove { // Proof generation is currently not supported. return nil, errors.New("transaction proof generation is not supported") } - apiResults = append(apiResults, &coretypes.ResultTx{ - Hash: cmttypes.Tx(result.Tx).Hash(), // Correctly calculate hash from []byte + apiResults = append(apiResults, &ctypes.ResultTx{ + Hash: types.Tx(result.Tx).Hash(), // Correctly calculate hash from []byte Height: result.Height, Index: result.Index, TxResult: result.Result, @@ -152,7 +150,7 @@ func (p *RpcProvider) TxSearch(ctx context.Context, query string, prove bool, pa }) } - return &coretypes.ResultTxSearch{ + return &ctypes.ResultTxSearch{ Txs: apiResults, TotalCount: totalCount, }, nil diff --git a/pkg/rpc/provider/utils.go b/pkg/rpc/core/utils.go similarity index 66% rename from pkg/rpc/provider/utils.go rename to pkg/rpc/core/utils.go index 39bf3de..d349ee0 100644 --- a/pkg/rpc/provider/utils.go +++ b/pkg/rpc/core/utils.go @@ -1,57 +1,18 @@ -package provider +package core import ( + "context" "errors" "fmt" "time" cmbytes "github.com/cometbft/cometbft/libs/bytes" - cmproto "github.com/cometbft/cometbft/proto/tendermint/types" cmversion "github.com/cometbft/cometbft/proto/tendermint/version" cmtypes "github.com/cometbft/cometbft/types" "github.com/rollkit/rollkit/types" ) -const ( - // Constants for pagination - // TODO: Make these configurable or derive from CometBFT config? - defaultPerPage = 30 - maxPerPage = 100 - // Define query length limit here - maxQueryLength = 256 -) - -// ToABCIHeaderPB converts Rollkit header to Header format defined in ABCI. -// Caller should fill all the fields that are not available in Rollkit header (like ChainID). -func ToABCIHeaderPB(header *types.Header) (cmproto.Header, error) { - return cmproto.Header{ - Version: cmversion.Consensus{ - Block: header.Version.Block, - App: header.Version.App, - }, - Height: int64(header.Height()), //nolint:gosec - Time: header.Time(), - LastBlockId: cmproto.BlockID{ - Hash: header.LastHeaderHash[:], - PartSetHeader: cmproto.PartSetHeader{ - Total: 0, - Hash: nil, - }, - }, - LastCommitHash: header.LastCommitHash[:], - DataHash: header.DataHash[:], - ConsensusHash: header.ConsensusHash[:], - AppHash: header.AppHash[:], - LastResultsHash: header.LastResultsHash[:], - EvidenceHash: new(cmtypes.EvidenceData).Hash(), - ProposerAddress: header.ProposerAddress, - ChainID: header.ChainID(), - ValidatorsHash: header.ValidatorHash, - NextValidatorsHash: header.ValidatorHash, - }, nil -} - // ToABCIHeader converts Rollkit header to Header format defined in ABCI. // Caller should fill all the fields that are not available in Rollkit header (like ChainID). func ToABCIHeader(header *types.Header) (cmtypes.Header, error) { @@ -156,6 +117,54 @@ func getABCICommit(height uint64, hash []byte, val cmtypes.Address, time time.Ti return &tmCommit } +func normalizeHeight(height *int64) uint64 { + var heightValue uint64 + if height == nil { + var err error + // TODO: Decide how to handle context here. Using background for now. + heightValue, err = env.Adapter.RollkitStore.Height(context.Background()) + if err != nil { + // TODO: Consider logging or returning error + env.Logger.Error("Failed to get current height in normalizeHeight", "err", err) + return 0 + } + } else if *height < 0 { + // Handle negative heights if they have special meaning (e.g., -1 for latest) + // Currently, just treat them as 0 or latest, adjust as needed. + // For now, let's assume negative height means latest valid height. + var err error + heightValue, err = env.Adapter.RollkitStore.Height(context.Background()) + if err != nil { + env.Logger.Error("Failed to get current height for negative height in normalizeHeight", "err", err) + return 0 + } + } else { + heightValue = uint64(*height) + } + + return heightValue +} + +func getBlockMeta(ctx context.Context, n uint64) *cmtypes.BlockMeta { + header, data, err := env.Adapter.RollkitStore.GetBlockData(ctx, n) + if err != nil { + env.Logger.Error("Failed to get block data in getBlockMeta", "height", n, "err", err) + return nil + } + if header == nil || data == nil { + env.Logger.Error("Nil header or data returned from GetBlockData", "height", n) + return nil + } + // Assuming ToABCIBlockMeta is now in pkg/rpc/provider/provider_utils.go + bmeta, err := ToABCIBlockMeta(header, data) // Removed rpc. prefix + if err != nil { + env.Logger.Error("Failed to convert block to ABCI block meta", "height", n, "err", err) + return nil + } + + return bmeta +} + func filterMinMax(base, height, mini, maxi, limit int64) (int64, int64, error) { // filter negatives if mini < 0 || maxi < 0 { @@ -186,47 +195,3 @@ func filterMinMax(base, height, mini, maxi, limit int64) (int64, int64, error) { } return mini, maxi, nil } - -func validateSkipCount(page, perPage int) int { - skipCount := (page - 1) * perPage - if skipCount < 0 { - return 0 - } - - return skipCount -} - -func validatePerPage(perPagePtr *int) int { - if perPagePtr == nil { // no per_page parameter - return defaultPerPage - } - - perPage := *perPagePtr - if perPage < 1 { - return defaultPerPage - } else if perPage > maxPerPage { - return maxPerPage - } - return perPage -} - -func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { - if perPage < 1 { - return 0, fmt.Errorf("invalid perPage parameter: %d (must be positive)", perPage) - } - - if pagePtr == nil { // no page parameter - return 1, nil - } - - pages := ((totalCount - 1) / perPage) + 1 - if pages == 0 { - pages = 1 // one page (even if it's empty) - } - page := *pagePtr - if page <= 0 || page > pages { - return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page) - } - - return page, nil -} diff --git a/pkg/rpc/json/handler.go b/pkg/rpc/json/handler.go deleted file mode 100644 index 58dbf9f..0000000 --- a/pkg/rpc/json/handler.go +++ /dev/null @@ -1,284 +0,0 @@ -package json - -import ( - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "net/http" - "net/url" - "reflect" - "strconv" - - "cosmossdk.io/log" - "github.com/cometbft/cometbft/libs/bytes" - cmjson "github.com/cometbft/cometbft/libs/json" - "github.com/gorilla/rpc/v2" - "github.com/gorilla/rpc/v2/json2" -) - -type handler struct { - srv *service - mux *http.ServeMux - codec rpc.Codec - logger log.Logger -} - -func newHandler(s *service, codec rpc.Codec, logger log.Logger) *handler { - mux := http.NewServeMux() - h := &handler{ - srv: s, - mux: mux, - codec: codec, - logger: logger, - } - - mux.HandleFunc("/", h.serveJSONRPC) - mux.HandleFunc("/websocket", h.wsHandler) - for name, method := range s.methods { - logger.Debug("registering method", "name", name) - mux.HandleFunc("/"+name, h.newHandler(method)) - } - - return h -} -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.mux.ServeHTTP(w, r) -} - -// serveJSONRPC serves HTTP request -func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) { - h.serveJSONRPCforWS(w, r, nil) -} - -// serveJSONRPC serves HTTP request -// implementation is highly inspired by Gorilla RPC v2 (but simplified a lot) -func (h *handler) serveJSONRPCforWS(w http.ResponseWriter, r *http.Request, wsConn *wsConn) { - // Create a new codec request. - codecReq := h.codec.NewRequest(r) - if wsConn != nil { - wsConn.codecReq = codecReq - } - // Get service method to be called. - method, err := codecReq.Method() - if err != nil { - var e *json2.Error - if method == "" && errors.As(err, &e) && e.Message == "EOF" { - // just serve empty page if request is empty - return - } - codecReq.WriteError(w, http.StatusBadRequest, err) - return - } - methodSpec, ok := h.srv.methods[method] - if !ok { - codecReq.WriteError(w, int(json2.E_NO_METHOD), err) - return - } - - // Decode the args. - args := reflect.New(methodSpec.argsType) - if errRead := codecReq.ReadRequest(args.Interface()); errRead != nil { - codecReq.WriteError(w, http.StatusBadRequest, errRead) - return - } - - callArgs := []reflect.Value{ - reflect.ValueOf(r), - args, - } - if methodSpec.ws { - callArgs = append(callArgs, reflect.ValueOf(wsConn)) - } - rets := methodSpec.m.Call(callArgs) - - // Extract the result to error if needed. - var errResult error - statusCode := http.StatusOK - errInter := rets[1].Interface() - if errInter != nil { - statusCode = http.StatusBadRequest - errResult = errInter.(error) - } - - // Prevents Internet Explorer from MIME-sniffing a response away - // from the declared content-type - w.Header().Set("x-content-type-options", "nosniff") - - // Encode the response. - if errResult == nil { - var raw json.RawMessage - raw, err = cmjson.Marshal(rets[0].Interface()) - if err != nil { - codecReq.WriteError(w, http.StatusInternalServerError, err) - return - } - codecReq.WriteResponse(w, raw) - } else { - codecReq.WriteError(w, statusCode, errResult) - } -} - -func (h *handler) newHandler(methodSpec *method) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - args := reflect.New(methodSpec.argsType) - values, err := url.ParseQuery(r.URL.RawQuery) - if err != nil { - h.encodeAndWriteResponse(w, nil, err, int(json2.E_PARSE)) - return - } - for i := 0; i < methodSpec.argsType.NumField(); i++ { - field := methodSpec.argsType.Field(i) - name := field.Tag.Get("json") - kind := field.Type.Kind() - // pointers can be skipped from required check - if kind != reflect.Pointer && !values.Has(name) { - h.encodeAndWriteResponse(w, nil, fmt.Errorf("missing param '%s'", name), int(json2.E_INVALID_REQ)) - return - } - rawVal := values.Get(name) - var err error - switch kind { - case reflect.Pointer: - err = setPointerParam(rawVal, &args, i) - case reflect.Bool: - err = setBoolParam(rawVal, &args, i) - case reflect.Int, reflect.Int64: - err = setIntParam(rawVal, &args, i) - case reflect.String: - args.Elem().Field(i).SetString(rawVal) - case reflect.Slice: - // []byte is a reflect.Slice of reflect.Uint8's - if field.Type.Elem().Kind() == reflect.Uint8 { - err = setByteSliceParam(rawVal, &args, i) - } - default: - err = errors.New("unknown type") - } - if err != nil { - err = fmt.Errorf("failed to parse param '%s': %w", name, err) - h.encodeAndWriteResponse(w, nil, err, int(json2.E_PARSE)) - return - } - } - rets := methodSpec.m.Call([]reflect.Value{ - reflect.ValueOf(r), - args, - }) - - // Extract the result to error if needed. - statusCode := http.StatusOK - errInter := rets[1].Interface() - if errInter != nil { - statusCode = int(json2.E_INTERNAL) - err = errInter.(error) - } - - h.encodeAndWriteResponse(w, rets[0].Interface(), err, statusCode) - } -} - -func (h *handler) encodeAndWriteResponse(w http.ResponseWriter, result interface{}, errResult error, statusCode int) { - // Prevents Internet Explorer from MIME-sniffing a response away - // from the declared content-type - w.Header().Set("x-content-type-options", "nosniff") - w.Header().Set("Content-Type", "application/json; charset=utf-8") - - resp := response{ - Version: "2.0", - ID: []byte("-1"), - } - - if errResult != nil { - resp.Error = &json2.Error{Code: json2.ErrorCode(statusCode), Data: errResult.Error()} - } else { - bytes, err := cmjson.Marshal(result) - if err != nil { - resp.Error = &json2.Error{Code: json2.E_INTERNAL, Data: err.Error()} - } else { - resp.Result = bytes - } - } - - encoder := json.NewEncoder(w) - err := encoder.Encode(resp) - if err != nil { - h.logger.Error("failed to encode RPC response", "error", err) - } -} - -func setPointerParam(rawVal string, args *reflect.Value, i int) error { - if rawVal == "" { - return nil - } - field := args.Elem().Field(i) - switch field.Type() { - case reflect.TypeOf((*BlockNumber)(nil)): - var bn BlockNumber - err := bn.UnmarshalJSON([]byte(rawVal)) - if err != nil { - return err - } - args.Elem().Field(i).Set(reflect.ValueOf(&bn)) - return nil - case reflect.TypeOf((*StrInt64)(nil)): - val, err := strconv.ParseInt(rawVal, 10, 64) - if err != nil { - return err - } - strInt64Val := StrInt64(val) - field.Set(reflect.ValueOf(&strInt64Val)) - case reflect.TypeOf((*StrInt)(nil)): - val, err := strconv.Atoi(rawVal) - if err != nil { - return err - } - strIntVal := StrInt(val) - field.Set(reflect.ValueOf(&strIntVal)) - case reflect.TypeOf((*string)(nil)): - field.Set(reflect.ValueOf(&rawVal)) - case reflect.TypeOf((*bool)(nil)): - val, err := strconv.ParseBool(rawVal) - if err != nil { - return err - } - field.Set(reflect.ValueOf(&val)) - case reflect.TypeOf((*bytes.HexBytes)(nil)): - hexBytes, err := hex.DecodeString(rawVal) - if err != nil { - return err - } - hb := bytes.HexBytes(hexBytes) - field.Set(reflect.ValueOf(&hb)) - default: - return fmt.Errorf("unsupported pointer type: %v", field.Type()) - } - return nil -} - -func setBoolParam(rawVal string, args *reflect.Value, i int) error { - v, err := strconv.ParseBool(rawVal) - if err != nil { - return err - } - args.Elem().Field(i).SetBool(v) - return nil -} - -func setIntParam(rawVal string, args *reflect.Value, i int) error { - v, err := strconv.ParseInt(rawVal, 10, 64) - if err != nil { - return err - } - args.Elem().Field(i).SetInt(v) - return nil -} - -func setByteSliceParam(rawVal string, args *reflect.Value, i int) error { - b, err := hex.DecodeString(rawVal) - if err != nil { - return err - } - args.Elem().Field(i).SetBytes(b) - return nil -} diff --git a/pkg/rpc/json/service.go b/pkg/rpc/json/service.go deleted file mode 100644 index 544cbfa..0000000 --- a/pkg/rpc/json/service.go +++ /dev/null @@ -1,373 +0,0 @@ -package json - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "reflect" - "time" - - "cosmossdk.io/log" - cmjson "github.com/cometbft/cometbft/libs/json" - cometrpc "github.com/cometbft/cometbft/rpc/client" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/gorilla/rpc/v2" - "github.com/gorilla/rpc/v2/json2" - - abcirpc "github.com/rollkit/go-execution-abci/pkg/rpc" -) - -// GetRPCHandler returns handler configured to serve Tendermint-compatible RPC. -func GetRPCHandler(l abcirpc.RpcProvider, logger log.Logger) (http.Handler, error) { - return newHandler(newService(l, logger), json2.NewCodec(), logger), nil -} - -type method struct { - m reflect.Value - argsType reflect.Type - returnType reflect.Type - ws bool -} - -func newMethod(m interface{}) *method { - mType := reflect.TypeOf(m) - - return &method{ - m: reflect.ValueOf(m), - argsType: mType.In(1).Elem(), - returnType: mType.Out(0).Elem(), - ws: mType.NumIn() == 3, - } -} - -type service struct { - client abcirpc.RpcProvider - methods map[string]*method - logger log.Logger -} - -func newService(c abcirpc.RpcProvider, l log.Logger) *service { - s := service{ - client: c, - logger: l, - } - s.methods = map[string]*method{ - "subscribe": newMethod(s.Subscribe), - "unsubscribe": newMethod(s.Unsubscribe), - "unsubscribe_all": newMethod(s.UnsubscribeAll), - "health": newMethod(s.Health), - "status": newMethod(s.Status), - "net_info": newMethod(s.NetInfo), - "blockchain": newMethod(s.BlockchainInfo), - "genesis": newMethod(s.Genesis), - "genesis_chunked": newMethod(s.GenesisChunked), - "block": newMethod(s.Block), - "block_by_hash": newMethod(s.BlockByHash), - "block_results": newMethod(s.BlockResults), - "commit": newMethod(s.Commit), - "header": newMethod(s.Header), - "header_by_hash": newMethod(s.HeaderByHash), - "check_tx": newMethod(s.CheckTx), - "tx": newMethod(s.Tx), - "tx_search": newMethod(s.TxSearch), - "block_search": newMethod(s.BlockSearch), - "validators": newMethod(s.Validators), - "dump_consensus_state": newMethod(s.DumpConsensusState), - "consensus_state": newMethod(s.GetConsensusState), - "consensus_params": newMethod(s.ConsensusParams), - "unconfirmed_txs": newMethod(s.UnconfirmedTxs), - "num_unconfirmed_txs": newMethod(s.NumUnconfirmedTxs), - "broadcast_tx_commit": newMethod(s.BroadcastTxCommit), - "broadcast_tx_sync": newMethod(s.BroadcastTxSync), - "broadcast_tx_async": newMethod(s.BroadcastTxAsync), - "abci_query": newMethod(s.ABCIQuery), - "abci_info": newMethod(s.ABCIInfo), - "broadcast_evidence": newMethod(s.BroadcastEvidence), - } - return &s -} - -func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn) (*ctypes.ResultSubscribe, error) { - // TODO(tzdybal): pass config and check subscriptions limits - // TODO(tzdybal): extract consts or configs - const SubscribeTimeout = 5 * time.Second - const subBufferSize = 100 - - addr := req.RemoteAddr - var query string - if args.Query != nil { - query = *args.Query - } - - ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) - defer cancel() - - sub, err := s.client.Subscribe(ctx, addr, query, subBufferSize) - if err != nil { - return nil, fmt.Errorf("failed to subscribe: %w", err) - } - - go func() { - var codecReq rpc.CodecRequest - if wsConn != nil { - codecReq = wsConn.codecReq - } else { - codecReq = json2.NewCodec().NewRequest(req) - } - - for msg := range sub { - var raw json.RawMessage - raw, err = cmjson.Marshal(msg.Data) - btz := new(bytes.Buffer) - w := newResponseWriter(btz) - if err != nil { - codecReq.WriteError(w, http.StatusInternalServerError, err) - return - } - codecReq.WriteResponse(w, raw) - - data := btz.Bytes() - if wsConn != nil { - wsConn.queue <- data - } - } - }() - - return &ctypes.ResultSubscribe{}, nil -} - -func (s *service) Unsubscribe(req *http.Request, args *unsubscribeArgs) (*emptyResult, error) { - s.logger.Debug("unsubscribe from query", "remote", req.RemoteAddr, "query", args.Query) - - var query string - if args.Query != nil { - query = *args.Query - } - - err := s.client.Unsubscribe(context.Background(), req.RemoteAddr, query) - if err != nil { - return nil, fmt.Errorf("failed to unsubscribe: %w", err) - } - return &emptyResult{}, nil -} - -func (s *service) UnsubscribeAll(req *http.Request, args *unsubscribeAllArgs) (*emptyResult, error) { - s.logger.Debug("unsubscribe from all queries", "remote", req.RemoteAddr) - err := s.client.UnsubscribeAll(context.Background(), req.RemoteAddr) - if err != nil { - return nil, fmt.Errorf("failed to unsubscribe all: %w", err) - } - return &emptyResult{}, nil -} - -// info API -func (s *service) Health(req *http.Request, args *healthArgs) (*ctypes.ResultHealth, error) { - return s.client.Health(req.Context()) -} - -func (s *service) Status(req *http.Request, args *statusArgs) (*ctypes.ResultStatus, error) { - return s.client.Status(req.Context()) -} - -func (s *service) NetInfo(req *http.Request, args *netInfoArgs) (*ctypes.ResultNetInfo, error) { - return s.client.NetInfo(req.Context()) -} - -func (s *service) BlockchainInfo(req *http.Request, args *blockchainInfoArgs) (*ctypes.ResultBlockchainInfo, error) { - var minHeight, maxHeight int64 - if args.MinHeight != nil { - h := int64(*args.MinHeight) - minHeight = h - } - if args.MaxHeight != nil { - h := int64(*args.MaxHeight) - maxHeight = h - } - return s.client.BlockchainInfo(req.Context(), minHeight, maxHeight) -} - -func (s *service) Genesis(req *http.Request, args *genesisArgs) (*ctypes.ResultGenesis, error) { - return s.client.Genesis(req.Context()) -} - -func (s *service) GenesisChunked(req *http.Request, args *genesisChunkedArgs) (*ctypes.ResultGenesisChunk, error) { - return s.client.GenesisChunked(req.Context(), uint(args.ID)) -} - -func (s *service) Block(req *http.Request, args *blockArgs) (*ctypes.ResultBlock, error) { - var height *int64 - if args.Height != nil { - h := int64(*args.Height) - height = &h - } - return s.client.Block(req.Context(), height) -} - -func (s *service) BlockByHash(req *http.Request, args *blockByHashArgs) (*ctypes.ResultBlock, error) { - return s.client.BlockByHash(req.Context(), args.Hash) -} - -func (s *service) BlockResults(req *http.Request, args *blockResultsArgs) (*ctypes.ResultBlockResults, error) { - var height *int64 - if args.Height != nil { - h := int64(*args.Height) - height = &h - } - return s.client.BlockResults(req.Context(), height) -} - -func (s *service) Commit(req *http.Request, args *commitArgs) (*ctypes.ResultCommit, error) { - var height *int64 - if args.Height != nil { - h := int64(*args.Height) - height = &h - } - return s.client.Commit(req.Context(), height) -} - -func (s *service) Header(req *http.Request, args *headerArgs) (*ctypes.ResultHeader, error) { - var height *int64 - if args.Height != nil { - h := int64(*args.Height) - height = &h - } - return s.client.Header(req.Context(), height) -} - -func (s *service) HeaderByHash(req *http.Request, args *headerByHashArgs) (*ctypes.ResultHeader, error) { - return s.client.HeaderByHash(req.Context(), args.Hash) -} - -func (s *service) CheckTx(req *http.Request, args *checkTxArgs) (*ctypes.ResultCheckTx, error) { - return s.client.CheckTx(req.Context(), args.Tx) -} - -func (s *service) Tx(req *http.Request, args *txArgs) (*ctypes.ResultTx, error) { - return s.client.Tx(req.Context(), args.Hash, args.Prove) -} - -func (s *service) TxSearch(req *http.Request, args *txSearchArgs) (*ctypes.ResultTxSearch, error) { - var page, perPage *int - var orderBy string - - if args.Page != nil { - p := int(*args.Page) - page = &p - } - if args.PerPage != nil { - pp := int(*args.PerPage) - perPage = &pp - } - if args.OrderBy != nil { - orderBy = *args.OrderBy - } - - return s.client.TxSearch(req.Context(), args.Query, args.Prove, page, perPage, orderBy) -} - -func (s *service) BlockSearch(req *http.Request, args *blockSearchArgs) (*ctypes.ResultBlockSearch, error) { - var page, perPage *int - var orderBy string - - if args.Page != nil { - p := int(*args.Page) - page = &p - } - if args.PerPage != nil { - pp := int(*args.PerPage) - perPage = &pp - } - if args.OrderBy != nil { - orderBy = *args.OrderBy - } - - return s.client.BlockSearch(req.Context(), args.Query, page, perPage, orderBy) -} - -func (s *service) Validators(req *http.Request, args *validatorsArgs) (*ctypes.ResultValidators, error) { - var height *int64 - var page, perPage *int - - if args.Height != nil { - h := int64(*args.Height) - height = &h - } - if args.Page != nil { - p := int(*args.Page) - page = &p - } - if args.PerPage != nil { - pp := int(*args.PerPage) - perPage = &pp - } - - return s.client.Validators(req.Context(), height, page, perPage) -} - -func (s *service) DumpConsensusState(req *http.Request, args *dumpConsensusStateArgs) (*ctypes.ResultDumpConsensusState, error) { - return s.client.DumpConsensusState(req.Context()) -} - -func (s *service) GetConsensusState(req *http.Request, args *getConsensusStateArgs) (*ctypes.ResultConsensusState, error) { - return s.client.ConsensusState(req.Context()) -} - -func (s *service) ConsensusParams(req *http.Request, args *consensusParamsArgs) (*ctypes.ResultConsensusParams, error) { - var height *int64 - if args.Height != nil { - h := int64(*args.Height) - height = &h - } - return s.client.ConsensusParams(req.Context(), height) -} - -func (s *service) UnconfirmedTxs(req *http.Request, args *unconfirmedTxsArgs) (*ctypes.ResultUnconfirmedTxs, error) { - var limit *int - if args.Limit != nil { - l := int(*args.Limit) - limit = &l - } - return s.client.UnconfirmedTxs(req.Context(), limit) -} - -func (s *service) NumUnconfirmedTxs(req *http.Request, args *numUnconfirmedTxsArgs) (*ctypes.ResultUnconfirmedTxs, error) { - return s.client.NumUnconfirmedTxs(req.Context()) -} - -// tx broadcast API -func (s *service) BroadcastTxCommit(req *http.Request, args *broadcastTxCommitArgs) (*ctypes.ResultBroadcastTxCommit, error) { - return s.client.BroadcastTxCommit(req.Context(), args.Tx) -} - -func (s *service) BroadcastTxSync(req *http.Request, args *broadcastTxSyncArgs) (*ctypes.ResultBroadcastTx, error) { - return s.client.BroadcastTxSync(req.Context(), args.Tx) -} - -func (s *service) BroadcastTxAsync(req *http.Request, args *broadcastTxAsyncArgs) (*ctypes.ResultBroadcastTx, error) { - return s.client.BroadcastTxAsync(req.Context(), args.Tx) -} - -// abci API -func (s *service) ABCIQuery(req *http.Request, args *ABCIQueryArgs) (*ctypes.ResultABCIQuery, error) { - options := cometrpc.ABCIQueryOptions{} - - if args.Height != nil { - options.Height = int64(*args.Height) - } - if args.Prove != nil { - options.Prove = *args.Prove - } - - return s.client.ABCIQueryWithOptions(req.Context(), args.Path, args.Data, options) -} - -func (s *service) ABCIInfo(req *http.Request, args *ABCIInfoArgs) (*ctypes.ResultABCIInfo, error) { - return s.client.ABCIInfo(req.Context()) -} - -// evidence API -func (s *service) BroadcastEvidence(req *http.Request, args *broadcastEvidenceArgs) (*ctypes.ResultBroadcastEvidence, error) { - return s.client.BroadcastEvidence(req.Context(), args.Evidence) -} diff --git a/pkg/rpc/json/types.go b/pkg/rpc/json/types.go deleted file mode 100644 index cead7e2..0000000 --- a/pkg/rpc/json/types.go +++ /dev/null @@ -1,240 +0,0 @@ -package json - -import ( - "encoding/json" - "reflect" - "strconv" - "strings" - - "github.com/cometbft/cometbft/libs/bytes" - "github.com/cometbft/cometbft/types" - "github.com/gorilla/rpc/v2/json2" -) - -type subscribeArgs struct { - Query *string `json:"query"` -} - -type unsubscribeArgs struct { - Query *string `json:"query"` -} - -type unsubscribeAllArgs struct{} - -// info API -type healthArgs struct { -} -type statusArgs struct { -} -type netInfoArgs struct { -} -type blockchainInfoArgs struct { - MinHeight *StrInt64 `json:"minHeight"` - MaxHeight *StrInt64 `json:"maxHeight"` -} - -type genesisArgs struct{} - -type genesisChunkedArgs struct { - ID StrInt `json:"chunk"` -} - -type blockArgs struct { - Height *BlockNumber `json:"height"` -} - -type blockByHashArgs struct { - Hash []byte `json:"hash"` -} - -type blockResultsArgs struct { - Height *StrInt64 `json:"height"` -} - -type commitArgs struct { - Height *StrInt64 `json:"height"` -} - -type headerArgs struct { - Height *StrInt64 `json:"height"` -} - -type headerByHashArgs struct { - Hash []byte `json:"hash"` -} - -type checkTxArgs struct { - Tx types.Tx `json:"tx"` -} - -type txArgs struct { - Hash []byte `json:"hash"` - Prove bool `json:"prove"` -} - -type txSearchArgs struct { - Query string `json:"query"` - Prove bool `json:"prove"` - Page *StrInt `json:"page"` - PerPage *StrInt `json:"per_page"` - OrderBy *string `json:"order_by"` -} - -type blockSearchArgs struct { - Query string `json:"query"` - Page *StrInt `json:"page"` - PerPage *StrInt `json:"per_page"` - OrderBy *string `json:"order_by"` -} - -type validatorsArgs struct { - Height *StrInt64 `json:"height"` - Page *StrInt `json:"page"` - PerPage *StrInt `json:"per_page"` -} - -type dumpConsensusStateArgs struct{} - -type getConsensusStateArgs struct{} - -type consensusParamsArgs struct { - Height *StrInt64 `json:"height"` -} - -type unconfirmedTxsArgs struct { - Limit *StrInt `json:"limit"` -} - -type numUnconfirmedTxsArgs struct{} - -// tx broadcast API -type broadcastTxCommitArgs struct { - Tx types.Tx `json:"tx"` -} -type broadcastTxSyncArgs struct { - Tx types.Tx `json:"tx"` -} -type broadcastTxAsyncArgs struct { - Tx types.Tx `json:"tx"` -} - -// abci API - -// ABCIQueryArgs defines args for ABCI Query method. -type ABCIQueryArgs struct { - Path string `json:"path"` - Data bytes.HexBytes `json:"data"` - Height *StrInt64 `json:"height"` - Prove *bool `json:"prove"` -} - -// ABCIInfoArgs defines args for ABCI Info method. -type ABCIInfoArgs struct { -} - -// evidence API - -type broadcastEvidenceArgs struct { - Evidence types.Evidence `json:"evidence"` -} - -type emptyResult struct{} - -// JSON-deserialization specific types - -// StrInt is an proper int or quoted "int" -type StrInt int - -// StrInt64 is an proper int64 or quoted "int64" -type StrInt64 int64 - -// UnmarshalJSON parses JSON (int or int quoted as string) into StrInt64 -func (s *StrInt64) UnmarshalJSON(b []byte) error { - return unmarshalStrInt64(b, s) -} - -// UnmarshalJSON parses JSON (int or int quoted as string) into StrInt -func (s *StrInt) UnmarshalJSON(b []byte) error { - var val StrInt64 - err := unmarshalStrInt64(b, &val) - *s = StrInt(val) - return err -} - -// BlockNumber is a StrInt64 with helper tags for block heights -type BlockNumber StrInt64 - -// BlockNumber tags: -// - "earliest" = literal 1 -// - "included" = seen on DA -const ( - IncludedBlockNumber = BlockNumber(-1) - EarliestBlockNumber = BlockNumber(1) -) - -// UnmarshalJSON parses JSON (int or block tag quoted as string) into BlockNumber -func (bn *BlockNumber) UnmarshalJSON(b []byte) error { - input := strings.TrimSpace(string(b)) - if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { - input = input[1 : len(input)-1] - } - - switch input { - case "earliest": - *bn = EarliestBlockNumber - case "included": - *bn = IncludedBlockNumber - default: - // Try to parse as int64 - if i, err := strconv.ParseInt(input, 10, 64); err == nil { - *bn = BlockNumber(i) - } else { - // If parsing as int64 fails, try to unmarshal as JSON number - var f float64 - if err := json.Unmarshal(b, &f); err == nil { - *bn = BlockNumber(f) - } else { - return &json.UnsupportedValueError{ - Value: reflect.ValueOf(input), - Str: string(b), - } - } - } - } - - return nil -} - -func unmarshalStrInt64(b []byte, s *StrInt64) error { - var i interface{} - err := json.Unmarshal(b, &i) - if err != nil { - return err - } - - switch v := i.(type) { - case int: - *s = StrInt64(v) - case int64: - *s = StrInt64(v) - case string: - iv, err := strconv.Atoi(v) - if err != nil { - return err - } - *s = StrInt64(iv) - default: - return &json.UnsupportedValueError{ - Value: reflect.ValueOf(i), - Str: string(b), - } - } - return nil -} - -type response struct { - Version string `json:"jsonrpc"` - Result json.RawMessage `json:"result,omitempty"` - Error *json2.Error `json:"error,omitempty"` - ID json.RawMessage `json:"id"` -} diff --git a/pkg/rpc/json/ws.go b/pkg/rpc/json/ws.go deleted file mode 100644 index d7e5064..0000000 --- a/pkg/rpc/json/ws.go +++ /dev/null @@ -1,113 +0,0 @@ -package json - -import ( - "bytes" - "io" - "net/http" - - "cosmossdk.io/log" - "github.com/gorilla/rpc/v2" - "github.com/gorilla/websocket" -) - -type wsConn struct { - conn *websocket.Conn - codecReq rpc.CodecRequest - queue chan []byte - logger log.Logger -} - -func (wsc *wsConn) sendLoop() { - for msg := range wsc.queue { - writer, err := wsc.conn.NextWriter(websocket.TextMessage) - if err != nil { - wsc.logger.Error("failed to create writer", "error", err) - continue - } - _, err = writer.Write(msg) - if err != nil { - wsc.logger.Error("failed to write message", "error", err) - } - if err = writer.Close(); err != nil { - wsc.logger.Error("failed to close writer", "error", err) - } - } -} - -func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) { - // TODO(tzdybal): configuration options - upgrader := websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, - } - - wsc, err := upgrader.Upgrade(w, r, nil) - if err != nil { - h.logger.Error("failed to update to WebSocket connection", "error", err) - return - } - remoteAddr := wsc.RemoteAddr().String() - defer func() { - err := wsc.Close() - if err != nil { - h.logger.Error("failed to close WebSocket connection", "err") - } - }() - - ws := &wsConn{ - conn: wsc, - queue: make(chan []byte), - logger: h.logger, - } - go ws.sendLoop() - - for { - mt, r, err := wsc.NextReader() - if err != nil { - h.logger.Error("failed to read next WebSocket message", "error", err) - break - } - - if mt != websocket.TextMessage { - // TODO(tzdybal): https://github.com/rollkit/rollkit/issues/465 - h.logger.Debug("expected text message") - continue - } - req, err := http.NewRequest(http.MethodGet, "", r) - req.RemoteAddr = remoteAddr - if err != nil { - h.logger.Error("failed to create request", "error", err) - continue - } - - writer := new(bytes.Buffer) - h.serveJSONRPCforWS(newResponseWriter(writer), req, ws) - ws.queue <- writer.Bytes() - } - -} - -func newResponseWriter(w io.Writer) http.ResponseWriter { - return &wsResponse{w} -} - -// wsResponse is a simple implementation of http.ResponseWriter -type wsResponse struct { - w io.Writer -} - -var _ http.ResponseWriter = wsResponse{} - -// Write use underlying writer to write response to WebSocket -func (w wsResponse) Write(bytes []byte) (int, error) { - return w.w.Write(bytes) -} - -func (w wsResponse) Header() http.Header { - return http.Header{} - -} - -func (w wsResponse) WriteHeader(statusCode int) { -} diff --git a/pkg/rpc/provider.go b/pkg/rpc/provider.go deleted file mode 100644 index 62218a9..0000000 --- a/pkg/rpc/provider.go +++ /dev/null @@ -1,16 +0,0 @@ -package rpc - -import cometrpc "github.com/cometbft/cometbft/rpc/client" - -// RpcProvider defines the interface needed by various RPC services. -// It aggregates multiple client interfaces from CometBFT. -type RpcProvider interface { - cometrpc.ABCIClient - cometrpc.HistoryClient - cometrpc.NetworkClient - cometrpc.SignClient - cometrpc.StatusClient - cometrpc.EventsClient - cometrpc.EvidenceClient - cometrpc.MempoolClient -} diff --git a/pkg/rpc/provider/abci.go b/pkg/rpc/provider/abci.go deleted file mode 100644 index 6b363b0..0000000 --- a/pkg/rpc/provider/abci.go +++ /dev/null @@ -1,51 +0,0 @@ -package provider - -import ( - "context" - - abci "github.com/cometbft/cometbft/abci/types" - cmtbytes "github.com/cometbft/cometbft/libs/bytes" - rpcclient "github.com/cometbft/cometbft/rpc/client" - coretypes "github.com/cometbft/cometbft/rpc/core/types" -) - -// ABCIInfo implements client.CometRPC. -func (p *RpcProvider) ABCIInfo(context.Context) (*coretypes.ResultABCIInfo, error) { - info, err := p.adapter.App.Info(&abci.RequestInfo{}) - if err != nil { - return nil, err - } - return &coretypes.ResultABCIInfo{ - Response: *info, - }, nil -} - -// ABCIQuery implements client.CometRPC. -func (p *RpcProvider) ABCIQuery(ctx context.Context, path string, data cmtbytes.HexBytes) (*coretypes.ResultABCIQuery, error) { - resp, err := p.adapter.App.Query(ctx, &abci.RequestQuery{ - Data: data, - Path: path, - }) - if err != nil { - return nil, err - } - return &coretypes.ResultABCIQuery{ - Response: *resp, - }, nil -} - -// ABCIQueryWithOptions implements client.CometRPC. -func (p *RpcProvider) ABCIQueryWithOptions(ctx context.Context, path string, data cmtbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) { - resp, err := p.adapter.App.Query(ctx, &abci.RequestQuery{ - Data: data, - Path: path, - Height: opts.Height, - Prove: opts.Prove, - }) - if err != nil { - return nil, err - } - return &coretypes.ResultABCIQuery{ - Response: *resp, - }, nil -} diff --git a/pkg/rpc/provider/info.go b/pkg/rpc/provider/info.go deleted file mode 100644 index 752226c..0000000 --- a/pkg/rpc/provider/info.go +++ /dev/null @@ -1,151 +0,0 @@ -package provider - -import ( - "context" - - abci "github.com/cometbft/cometbft/abci/types" // Needed for Status - cmtbytes "github.com/cometbft/cometbft/libs/bytes" - "github.com/cometbft/cometbft/p2p" // Used by ABCIQueryWithOptions indirectly via Status? No, remove if unused. - coretypes "github.com/cometbft/cometbft/rpc/core/types" - cmttypes "github.com/cometbft/cometbft/types" -) - -// Status implements client.CometRPC. -func (p *RpcProvider) Status(ctx context.Context) (*coretypes.ResultStatus, error) { - info, err := p.adapter.App.Info(&abci.RequestInfo{}) - if err != nil { - return nil, err - } - - s, err := p.adapter.Store.LoadState(ctx) - if err != nil { - return nil, err - } - - // TODO: Populate NodeInfo properly - nodeInfo := p2p.DefaultNodeInfo{ - // ProtocolVersion: p2p.NewProtocolVersion( - // // Assuming these are accessible or definable constants/config values - // version.P2PProtocol, // Assuming 'version' package or similar imported - // version.BlockProtocol, - // version.AppProtocol, - // ), - // DefaultNodeID: // Needs node key -> p.nodeKey.ID() ? Requires passing nodeKey to RpcProvider - // ListenAddr: // Needs listener address -> listener.Addr().String() ? Requires access to listener - // Network: // Needs network/chain ID -> p.config.ChainID ? Requires access to config - // Version: // Needs application version -> version.TMCoreSemVer ? - // Channels: // Needs channel info -> p.channels // Requires access to p2p channels - // Moniker: // Needs moniker -> p.config.Moniker ? - // Other fields like Moniker, Version might be available too - } - // Check if the node key info is readily available in adapter or needs to be passed separately. - // If p.adapter.P2PClient is accessible and has NodeInfo: - // nodeKey := p.adapter.P2PClient.NodeInfo() // Hypothetical method - // if nodeKey != nil { - // nodeInfo = *nodeKey - // } - - return &coretypes.ResultStatus{ - NodeInfo: nodeInfo, // Use the populated or default NodeInfo - SyncInfo: coretypes.SyncInfo{ - // LatestBlockHash: // Need block meta -> latestBlockMeta.BlockID.Hash - LatestAppHash: cmtbytes.HexBytes(info.LastBlockAppHash), - LatestBlockHeight: info.LastBlockHeight, - LatestBlockTime: s.LastBlockTime, - // CatchingUp: // Requires sync status logic - }, - ValidatorInfo: coretypes.ValidatorInfo{ // Assumes single validator/sequencer model - Address: s.Validators.Proposer.Address, - PubKey: s.Validators.Proposer.PubKey, - VotingPower: s.Validators.Proposer.VotingPower, - }, - }, nil -} - -// NetInfo implements client.Client. -func (p *RpcProvider) NetInfo(context.Context) (*coretypes.ResultNetInfo, error) { - res := coretypes.ResultNetInfo{ - Listening: true, // Assuming node is always listening if RPC is up - Listeners: []string{}, // TODO: Populate with actual listener addresses if available - } - - // Access P2P client from adapter to get peer info - if p.adapter.P2PClient != nil { - for _, ma := range p.adapter.P2PClient.Addrs() { - res.Listeners = append(res.Listeners, ma.String()) - } - peers := p.adapter.P2PClient.Peers() - res.NPeers = len(peers) - for _, peer := range peers { - // Convert peer info to coretypes.Peer - // Ensure p2p.DefaultNodeInfo is correctly populated from peer.NodeInfo - res.Peers = append(res.Peers, coretypes.Peer{ - NodeInfo: p2p.DefaultNodeInfo{ // Adapt this based on actual available PeerInfo structure - // Access fields via peer.NodeInfo - DefaultNodeID: p2p.ID(peer.NodeInfo.NodeID), - ListenAddr: peer.NodeInfo.ListenAddr, - Network: peer.NodeInfo.Network, - // Other fields like Moniker, Version might be available too - }, - IsOutbound: peer.IsOutbound, - RemoteIP: peer.RemoteIP, - }) - } - } else { - // Handle case where P2P client is not available or initialized - res.NPeers = 0 - res.Peers = []coretypes.Peer{} - } - - return &res, nil -} - -// Health implements client.Client. -func (p *RpcProvider) Health(context.Context) (*coretypes.ResultHealth, error) { - // Basic health check, always returns OK for now. - // Could be extended to check DB connection, P2P status, etc. - return &coretypes.ResultHealth{}, nil -} - -// ConsensusParams implements client.Client. -func (p *RpcProvider) ConsensusParams(ctx context.Context, height *int64) (*coretypes.ResultConsensusParams, error) { - state, err := p.adapter.Store.LoadState(ctx) - if err != nil { - return nil, err - } - params := state.ConsensusParams - // Use normalizeHeight which should be moved to rpcProvider as well - normalizedHeight := p.normalizeHeight(height) // Changed r.normalizeHeight to p.normalizeHeight - return &coretypes.ResultConsensusParams{ - BlockHeight: int64(normalizedHeight), //nolint:gosec - ConsensusParams: cmttypes.ConsensusParams{ - Block: cmttypes.BlockParams{ - MaxBytes: params.Block.MaxBytes, - MaxGas: params.Block.MaxGas, - }, - Evidence: cmttypes.EvidenceParams{ - MaxAgeNumBlocks: params.Evidence.MaxAgeNumBlocks, - MaxAgeDuration: params.Evidence.MaxAgeDuration, - MaxBytes: params.Evidence.MaxBytes, - }, - Validator: cmttypes.ValidatorParams{ - PubKeyTypes: params.Validator.PubKeyTypes, - }, - Version: cmttypes.VersionParams{ - App: params.Version.App, - }, - }, - }, nil -} - -// ConsensusState implements client.Client. -func (p *RpcProvider) ConsensusState(context.Context) (*coretypes.ResultConsensusState, error) { - // Rollkit doesn't have Tendermint consensus state. - return nil, ErrConsensusStateNotAvailable -} - -// DumpConsensusState implements client.Client. -func (p *RpcProvider) DumpConsensusState(context.Context) (*coretypes.ResultDumpConsensusState, error) { - // Rollkit doesn't have Tendermint consensus state. - return nil, ErrConsensusStateNotAvailable -} diff --git a/pkg/rpc/provider/provider.go b/pkg/rpc/provider/provider.go deleted file mode 100644 index 31a7591..0000000 --- a/pkg/rpc/provider/provider.go +++ /dev/null @@ -1,94 +0,0 @@ -package provider - -import ( - "context" - "errors" - - "cosmossdk.io/log" - "github.com/cometbft/cometbft/state/indexer" - "github.com/cometbft/cometbft/state/txindex" - cmtypes "github.com/cometbft/cometbft/types" // Keep for getBlockMeta - - "github.com/rollkit/go-execution-abci/pkg/adapter" -) - -// ErrConsensusStateNotAvailable is returned because Rollkit doesn't use Tendermint consensus. -// Exported error. -var ErrConsensusStateNotAvailable = errors.New("consensus state not available in Rollkit") // Changed to exported - -// RpcProvider implements the interfaces required by the JSON-RPC service, -// primarily by delegating calls to the underlying adapter and indexers. -type RpcProvider struct { - adapter *adapter.Adapter - txIndexer txindex.TxIndexer - blockIndexer indexer.BlockIndexer - logger log.Logger -} - -// NewRpcProvider creates a new instance of rpcProvider. -func NewRpcProvider( - adapter *adapter.Adapter, - txIndexer txindex.TxIndexer, - blockIndexer indexer.BlockIndexer, - logger log.Logger, -) *RpcProvider { // Corrected return type to local *RpcProvider - return &RpcProvider{ - adapter: adapter, - txIndexer: txIndexer, - blockIndexer: blockIndexer, - logger: logger, - } -} - -// Logger returns the logger used by the RpcProvider. -func (p *RpcProvider) Logger() log.Logger { - return p.logger -} - -func (p *RpcProvider) normalizeHeight(height *int64) uint64 { - var heightValue uint64 - if height == nil { - var err error - // TODO: Decide how to handle context here. Using background for now. - heightValue, err = p.adapter.RollkitStore.Height(context.Background()) - if err != nil { - // TODO: Consider logging or returning error - p.logger.Error("Failed to get current height in normalizeHeight", "err", err) - return 0 - } - } else if *height < 0 { - // Handle negative heights if they have special meaning (e.g., -1 for latest) - // Currently, just treat them as 0 or latest, adjust as needed. - // For now, let's assume negative height means latest valid height. - var err error - heightValue, err = p.adapter.RollkitStore.Height(context.Background()) - if err != nil { - p.logger.Error("Failed to get current height for negative height in normalizeHeight", "err", err) - return 0 - } - } else { - heightValue = uint64(*height) - } - - return heightValue -} - -func (p *RpcProvider) getBlockMeta(ctx context.Context, n uint64) *cmtypes.BlockMeta { - header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, n) - if err != nil { - p.logger.Error("Failed to get block data in getBlockMeta", "height", n, "err", err) - return nil - } - if header == nil || data == nil { - p.logger.Error("Nil header or data returned from GetBlockData", "height", n) - return nil - } - // Assuming ToABCIBlockMeta is now in pkg/rpc/provider/provider_utils.go - bmeta, err := ToABCIBlockMeta(header, data) // Removed rpc. prefix - if err != nil { - p.logger.Error("Failed to convert block to ABCI block meta", "height", n, "err", err) - return nil - } - - return bmeta -} diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 5e904a2..16b3d0c 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -10,32 +10,36 @@ import ( "cosmossdk.io/log" cmtcfg "github.com/cometbft/cometbft/config" + cmtlog "github.com/cometbft/cometbft/libs/log" + rpcserver "github.com/cometbft/cometbft/rpc/jsonrpc/server" + servercmtlog "github.com/cosmos/cosmos-sdk/server/log" "github.com/rs/cors" "golang.org/x/net/netutil" -) -// RPCServer manages the HTTP server for RPC requests. -// It delegates the actual RPC method implementations to an rpcProvider. -type RPCServer struct { - config *cmtcfg.RPCConfig - httpHandler http.Handler - server http.Server - logger log.Logger -} + "github.com/rollkit/go-execution-abci/pkg/rpc/core" +) // NewRPCServer creates a new RPC server. func NewRPCServer( - httpHandler http.Handler, cfg *cmtcfg.RPCConfig, logger log.Logger, ) *RPCServer { + cmtLogger := servercmtlog.CometLoggerWrapper{Logger: logger} return &RPCServer{ - config: cfg, - httpHandler: httpHandler, - logger: logger, + config: cfg, + logger: cmtLogger, } } +// RPCServer manages the HTTP server for RPC requests. +// It delegates the actual RPC method implementations to an rpcProvider. +type RPCServer struct { + config *cmtcfg.RPCConfig + httpHandler http.Handler + server http.Server + logger cmtlog.Logger +} + // Start starts the RPC server. func (r *RPCServer) Start() error { return r.startRPC() @@ -58,13 +62,15 @@ func (r *RPCServer) startRPC() error { return err } + mux := http.NewServeMux() + rpcserver.RegisterRPCFuncs(mux, core.Routes, r.logger) + r.httpHandler = mux + if r.config.MaxOpenConnections != 0 { r.logger.Debug("limiting number of connections", "limit", r.config.MaxOpenConnections) listener = netutil.LimitListener(listener, r.config.MaxOpenConnections) } - handler := r.httpHandler - if r.config.IsCorsEnabled() { r.logger.Debug("CORS enabled", "origins", r.config.CORSAllowedOrigins, @@ -76,11 +82,11 @@ func (r *RPCServer) startRPC() error { AllowedMethods: r.config.CORSAllowedMethods, AllowedHeaders: r.config.CORSAllowedHeaders, }) - handler = c.Handler(handler) + r.httpHandler = c.Handler(r.httpHandler) } go func() { - err := r.serve(listener, handler) + err := r.serve(listener, r.httpHandler) if !errors.Is(err, http.ErrServerClosed) { r.logger.Error("error while serving HTTP", "error", err) } diff --git a/server/start.go b/server/start.go index 6f69bfb..f70f2bf 100644 --- a/server/start.go +++ b/server/start.go @@ -14,8 +14,6 @@ import ( cmtp2p "github.com/cometbft/cometbft/p2p" pvm "github.com/cometbft/cometbft/privval" "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/state/indexer" - "github.com/cometbft/cometbft/state/txindex" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server" @@ -45,8 +43,7 @@ import ( "github.com/rollkit/go-execution-abci/pkg/adapter" "github.com/rollkit/go-execution-abci/pkg/rpc" - rpcjson "github.com/rollkit/go-execution-abci/pkg/rpc/json" - provider "github.com/rollkit/go-execution-abci/pkg/rpc/provider" + "github.com/rollkit/go-execution-abci/pkg/rpc/core" execsigner "github.com/rollkit/go-execution-abci/pkg/signer" ) @@ -114,21 +111,17 @@ func startInProcess(svrCtx *server.Context, svrCfg serverconfig.Config, clientCt gRPCOnly := svrCtx.Viper.GetBool(flagGRPCOnly) g, ctx := getCtx(svrCtx, true) - var rpcProvider rpc.RpcProvider - if gRPCOnly { // TODO: Generalize logic so that gRPC only is really in startStandAlone svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled") svrCfg.GRPC.Enable = true } else { svrCtx.Logger.Info("starting node with ABCI CometBFT in-process") - // Call startNode to initialize components - rollkitNode, localRpcServer, executor, cleanupFn, err := startNode(ctx, svrCtx, cmtCfg, app) + rollkitNode, executor, cleanupFn, err := startNode(ctx, svrCtx, cmtCfg, app) if err != nil { return err } defer cleanupFn() - rpcProvider = localRpcServer g.Go(func() error { svrCtx.Logger.Info("Attempting to start Rollkit node run loop") @@ -163,7 +156,7 @@ func startInProcess(svrCtx *server.Context, svrCfg serverconfig.Config, clientCt // Add the tx service to the gRPC router. if svrCfg.API.Enable || svrCfg.GRPC.Enable { // Use the started rpcServer for the client context - clientCtx = clientCtx.WithClient(rpcProvider) + //clientCtx = clientCtx.WithClient(rpcProvider) app.RegisterTxService(clientCtx) app.RegisterTendermintService(clientCtx) @@ -294,7 +287,7 @@ func startNode( srvCtx *server.Context, cfg *cmtcfg.Config, app sdktypes.Application, -) (rolllkitNode node.Node, rpcProvider rpc.RpcProvider, executor *adapter.Adapter, cleanupFn func(), err error) { +) (rolllkitNode node.Node, executor *adapter.Adapter, cleanupFn func(), err error) { logger := srvCtx.Logger.With("module", "rollkit") logger.Info("starting node with Rollkit in-process") @@ -305,14 +298,14 @@ func startNode( signingKey, err := execsigner.GetNodeKey(&cmtp2p.NodeKey{PrivKey: pval.Key.PrivKey}) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } nodeKey := &key.NodeKey{PrivKey: signingKey, PubKey: signingKey.GetPublic()} rollkitcfg, err := config.LoadFromViper(srvCtx.Viper) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } // only load signer if rollkit.node.aggregator == true @@ -320,29 +313,29 @@ func startNode( if rollkitcfg.Node.Aggregator { signer, err = execsigner.NewSignerWrapper(pval.Key.PrivKey) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } } genDoc, err := getGenDocProvider(cfg)() if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } cmtGenDoc, err := genDoc.ToGenesisDoc() if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } // Get AppGenesis before creating the executor appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile()) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } database, err := store.NewDefaultKVStore(cfg.RootDir, "data", "rollkit") if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } metrics := node.DefaultMetricsProvider(rollkitcfg.Instrumentation) @@ -350,7 +343,7 @@ func startNode( _, p2pMetrics := metrics(cmtGenDoc.ChainID) p2pClient, err := p2p.NewClient(rollkitcfg, nodeKey, database, logger.With("module", "p2p"), p2pMetrics) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } adapterMetrics := adapter.NopMetrics() @@ -379,7 +372,7 @@ func startNode( height, err := executor.RollkitStore.Height(context.Background()) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } mempool := mempool.NewCListMempool(cfg.Mempool, proxyApp.Mempool(), int64(height)) executor.SetMempool(mempool) @@ -399,7 +392,7 @@ func startNode( // create the DA client daClient, err := jsonrpc.NewClient(ctx, logger, rollkitcfg.DA.Address, rollkitcfg.DA.AuthToken) if err != nil { - return nil, nil, nil, cleanupFn, fmt.Errorf("failed to create DA client: %w", err) + return nil, nil, cleanupFn, fmt.Errorf("failed to create DA client: %w", err) } // TODO(@facu): gas price and gas multiplier should be set by the node operator @@ -420,31 +413,37 @@ func startNode( logger, ) if err != nil { - return nil, nil, nil, cleanupFn, err + return nil, nil, cleanupFn, err } - // Create the RPC provider with necessary dependencies - // TODO: Pass actual indexers when implemented/available - txIndexer := txindex.TxIndexer(nil) - blockIndexer := indexer.BlockIndexer(nil) - rpcProvider = provider.NewRpcProvider(executor, txIndexer, blockIndexer, logger) + core.SetEnvironment(&core.Environment{ + Logger: servercmtlog.CometLoggerWrapper{Logger: logger}, + Adapter: executor, + }) - // Create the RPC handler using the provider - rpcHandler, err := rpcjson.GetRPCHandler(rpcProvider, logger) + // Pass the created handler to the RPC server constructor + rpcServer := rpc.NewRPCServer(cfg.RPC, logger) + err = rpcServer.Start() if err != nil { - return nil, nil, nil, cleanupFn, fmt.Errorf("failed to create rpc handler: %w", err) + return nil, nil, cleanupFn, fmt.Errorf("failed to start rpc server: %w", err) } - // Pass the created handler to the RPC server constructor - rpcServer := rpc.NewRPCServer(rpcHandler, cfg.RPC, logger) - err = rpcServer.Start() + logger.Info("starting node") + err = rolllkitNode.Run(ctx) if err != nil { - return nil, nil, nil, cleanupFn, fmt.Errorf("failed to start rpc server: %w", err) + if err == context.Canceled { + return nil, nil, cleanupFn, nil + } + return nil, nil, cleanupFn, fmt.Errorf("failed to start node: %w", err) + } + + // executor must be started after the node is started + logger.Info("starting executor") + if err = executor.Start(ctx); err != nil { + return nil, nil, cleanupFn, fmt.Errorf("failed to start executor: %w", err) } - // Return the initialized node, rpc server, and the executor adapter - // We need to return the executor so startInProcess can call Start on it. - return rolllkitNode, rpcProvider, executor, cleanupFn, nil + return rolllkitNode, executor, cleanupFn, nil } // getGenDocProvider returns a function which returns the genesis doc from the genesis file.