Skip to content

Commit

Permalink
rhp4: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Sep 28, 2024
1 parent 3543c96 commit 44117d3
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 108 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
go.etcd.io/bbolt v1.3.11
go.sia.tech/core v0.4.8-0.20240928172751-311e17d5e7c0
go.sia.tech/core v0.4.8-0.20240928202806-0e77790bd8bf
go.sia.tech/mux v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.27.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.sia.tech/core v0.4.8-0.20240928172751-311e17d5e7c0 h1:np9jgbC11k8EYUa1MYtw6TBZteu0UlgRpv+G7zoC150=
go.sia.tech/core v0.4.8-0.20240928172751-311e17d5e7c0/go.mod h1:j2Ke8ihV8or7d2VDrFZWcCkwSVHO0DNMQJAGs9Qop2M=
go.sia.tech/core v0.4.8-0.20240928202806-0e77790bd8bf h1:x/lM7Y8Rlo12rcpPXapLvSVNyrHZEKO6j4eLNccPMKw=
go.sia.tech/core v0.4.8-0.20240928202806-0e77790bd8bf/go.mod h1:j2Ke8ihV8or7d2VDrFZWcCkwSVHO0DNMQJAGs9Qop2M=
go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c=
go.sia.tech/mux v1.3.0/go.mod h1:I46++RD4beqA3cW9Xm9SwXbezwPqLvHhVs9HLpDtt58=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
9 changes: 0 additions & 9 deletions rhp/v4/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,11 @@ package rhp

import (
"time"

"go.uber.org/zap"
)

// An ServerOption sets an option on a Server.
type ServerOption func(*Server)

// WithLog sets the logger for the server.
func WithLog(log *zap.Logger) ServerOption {
return func(s *Server) {
s.log = log
}
}

// WithPriceTableValidity sets the duration for which a price table is valid.
func WithPriceTableValidity(validity time.Duration) ServerOption {
return func(s *Server) {
Expand Down
13 changes: 8 additions & 5 deletions rhp/v4/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type (
// A TransportClient is a generic multiplexer for outgoing streams.
TransportClient interface {
DialStream(context.Context) net.Conn

FrameSize() int
PeerKey() types.PublicKey

Close() error
}

Expand Down Expand Up @@ -180,7 +183,7 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice
Offset: offset,
Length: length,
}
if err := req.Validate(); err != nil {
if err := req.Validate(t.PeerKey()); err != nil {
return RPCReadSectorResult{}, fmt.Errorf("invalid request: %w", err)
}

Expand Down Expand Up @@ -222,7 +225,7 @@ func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPric
DataLength: uint64(length),
}

if err := req.Validate(); err != nil {
if err := req.Validate(t.PeerKey(), req.Duration); err != nil {
return RPCWriteSectorResult{}, fmt.Errorf("invalid request: %w", err)
}

Expand Down Expand Up @@ -408,7 +411,7 @@ func RPCSectorRoots(ctx context.Context, t TransportClient, cs consensus.State,
RenterSignature: revision.RenterSignature,
}

if err := req.Validate(revision); err != nil {
if err := req.Validate(contract.Revision.HostPublicKey, revision); err != nil {
return RPCSectorRootsResult{}, fmt.Errorf("invalid request: %w", err)
}

Expand Down Expand Up @@ -443,7 +446,7 @@ func RPCAccountBalance(ctx context.Context, t TransportClient, account rhp4.Acco
func RPCFormContract(ctx context.Context, t TransportClient, tp TxPool, signer FormContractSigner, cs consensus.State, p rhp4.HostPrices, hostKey types.PublicKey, hostAddress types.Address, params rhp4.RPCFormContractParams) (RPCFormContractResult, error) {
fc := rhp4.NewContract(p, params, hostKey, hostAddress)
formationTxn := types.V2Transaction{
MinerFee: types.Siacoins(1),
MinerFee: types.Siacoins(1), // TODO: be better
FileContracts: []types.V2FileContract{fc},
}

Expand Down Expand Up @@ -562,7 +565,7 @@ func RPCFormContract(ctx context.Context, t TransportClient, tp TxPool, signer F
func RPCRenewContract(ctx context.Context, t TransportClient, tp TxPool, signer FormContractSigner, cs consensus.State, p rhp4.HostPrices, existing types.V2FileContract, params rhp4.RPCRenewContractParams) (RPCRenewContractResult, error) {
renewal := rhp4.NewRenewal(existing, p, params)
renewalTxn := types.V2Transaction{
MinerFee: types.Siacoins(1),
MinerFee: types.Siacoins(1), // TODO: something about this
FileContractResolutions: []types.V2FileContractResolution{
{
Parent: types.V2FileContractElement{
Expand Down
4 changes: 2 additions & 2 deletions rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (fs *fundAndSign) Address() types.Address {
return fs.w.Address()
}

func testRenterHostPair(tb testing.TB, hostKey types.PrivateKey, cm rhp4.ChainManager, s rhp4.Syncer, w rhp4.Wallet, c rhp4.Contractor, sr rhp4.SettingsReporter, ss rhp4.SectorStore, log *zap.Logger) rhp4.TransportClient {
rs := rhp4.NewServer(hostKey, cm, s, c, w, sr, ss, rhp4.WithContractProofWindowBuffer(10), rhp4.WithPriceTableValidity(2*time.Minute), rhp4.WithLog(log.Named("rhp4")))
func testRenterHostPair(tb testing.TB, hostKey types.PrivateKey, cm rhp4.ChainManager, s rhp4.Syncer, w rhp4.Wallet, c rhp4.Contractor, sr rhp4.Settings, ss rhp4.Sectors, log *zap.Logger) rhp4.TransportClient {
rs := rhp4.NewServer(hostKey, cm, s, c, w, sr, ss, rhp4.WithContractProofWindowBuffer(10), rhp4.WithPriceTableValidity(2*time.Minute))
hostAddr := testutil.ServeSiaMux(tb, rs, log.Named("siamux"))

transport, err := rhp4.DialSiaMux(context.Background(), hostAddr, hostKey.PublicKey())
Expand Down
118 changes: 36 additions & 82 deletions rhp/v4/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type (
// Address returns the host's address
Address() types.Address

// FundTransaction funds a transaction with the specified amount of
// FundV2Transaction funds a transaction with the specified amount of
// Siacoins. If useUnconfirmed is true, the transaction may spend
// unconfirmed outputs. The outputs spent by the transaction are locked
// until they are released by ReleaseInputs.
Expand All @@ -87,10 +87,11 @@ type (
ReleaseInputs(txns []types.Transaction, v2txns []types.V2Transaction)
}

// A SectorStore is an interface for reading and writing sectors.
SectorStore interface {
ReadSector(types.Hash256) ([rhp4.SectorSize]byte, error)
// WriteSector stores a sector and returns its root hash.
// A Sectors is an interface for reading and writing sectors.
Sectors interface {
// ReadSector retrieves a sector by its root
ReadSector(root types.Hash256) ([rhp4.SectorSize]byte, error)
// WriteSector stores a sector
WriteSector(root types.Hash256, data *[rhp4.SectorSize]byte, expiration uint64) error
}

Expand All @@ -105,24 +106,27 @@ type (
// LockV2Contract locks a contract and returns its current state.
// The returned function must be called to release the lock.
LockV2Contract(types.FileContractID) (RevisionState, func(), error)
// AddV2Contract adds a new contract to the host
// AddV2Contract adds a new contract to the host.
AddV2Contract(TransactionSet, Usage) error
// RenewV2Contract finalizes an existing contract and adds its renewal
// RenewV2Contract finalizes an existing contract and adds its renewal.
RenewV2Contract(TransactionSet, Usage) error
// ReviseV2Contract atomically revises a contract and updates its sector
// roots and usage
// roots and usage.
ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage Usage) error
// ContractElement returns the contract state element for the given
// contract ID
// contract ID.
ContractElement(types.FileContractID) (types.ChainIndex, types.V2FileContractElement, error)

// AccountBalance returns the balance of an account.
AccountBalance(rhp4.Account) (types.Currency, error)
// CreditAccountsWithContract atomically revises a contract and credits the account.
CreditAccountsWithContract([]rhp4.AccountDeposit, types.FileContractID, types.V2FileContract) ([]types.Currency, error)
// DebitAccount debits an account.
DebitAccount(rhp4.Account, types.Currency) error
}

// SettingsReporter reports the host's current settings.
SettingsReporter interface {
// Settings reports the host's current settings.
Settings interface {
RHP4Settings() rhp4.HostSettings
}

Expand All @@ -132,18 +136,16 @@ type (
priceTableValidity time.Duration
contractProofWindowBuffer uint64

log *zap.Logger

chain ChainManager
syncer Syncer
wallet Wallet
sectors SectorStore
sectors Sectors
contractor Contractor
settings SettingsReporter
settings Settings
}
)

func (s *Server) lockContractForRevision(contractID types.FileContractID) (RevisionState, func(), error) {
func (s *Server) lockContractForRevision(contractID types.FileContractID) (rev RevisionState, unlock func(), _ error) {
rev, unlock, err := s.contractor.LockV2Contract(contractID)
switch {
case err != nil:
Expand Down Expand Up @@ -176,16 +178,12 @@ func (s *Server) handleRPCReadSector(stream net.Conn) error {
return errorDecodingError("failed to read request: %v", err)
}

prices, token := req.Prices, req.Token
if err := prices.Validate(s.hostKey.PublicKey()); err != nil {
return errorBadRequest("price table invalid: %v", err)
} else if err := token.Validate(); err != nil {
return errorBadRequest("account token invalid: %v", err)
} else if err := req.Validate(); err != nil {
if err := req.Validate(s.hostKey.PublicKey()); err != nil {
return errorBadRequest("request invalid: %v", err)
}
prices, token := req.Prices, req.Token

if err := s.contractor.DebitAccount(req.Token.Account, prices.RPCReadSectorCost(req.Length)); err != nil {
if err := s.contractor.DebitAccount(token.Account, prices.RPCReadSectorCost(req.Length)); err != nil {
return fmt.Errorf("failed to debit account: %w", err)
}

Expand All @@ -207,25 +205,11 @@ func (s *Server) handleRPCWriteSector(stream net.Conn) error {
if err := rhp4.ReadRequest(stream, &req); err != nil {
return errorDecodingError("failed to read request: %v", err)
}
prices, token := req.Prices, req.Token
if err := prices.Validate(s.hostKey.PublicKey()); err != nil {
return errorBadRequest("price table invalid: %v", err)
} else if err := token.Validate(); err != nil {
return errorBadRequest("account token invalid: %v", err)
}

settings := s.settings.RHP4Settings()

switch {
case req.DataLength > rhp4.SectorSize:
return errorBadRequest("sector size %v exceeds maximum %v", req.DataLength, rhp4.SectorSize)
case req.DataLength%rhp4.LeafSize != 0:
return errorBadRequest("sector length %v must be a multiple of segment size %v", req.DataLength, rhp4.LeafSize)
case req.Duration > settings.MaxSectorDuration:
return errorBadRequest("sector duration %v exceeds maximum %v", req.Duration, settings.MaxSectorDuration)
case settings.RemainingStorage < rhp4.SectorSize:
return rhp4.ErrNotEnoughStorage
if err := req.Validate(s.hostKey.PublicKey(), settings.MaxSectorDuration); err != nil {
return errorBadRequest("request invalid: %v", err)
}
prices := req.Prices

var sector [rhp4.SectorSize]byte
sr := io.LimitReader(stream, int64(req.DataLength))
Expand Down Expand Up @@ -260,15 +244,11 @@ func (s *Server) handleRPCModifySectors(stream net.Conn) error {
return errorDecodingError("failed to read request: %v", err)
}

prices := req.Prices
if err := prices.Validate(s.hostKey.PublicKey()); err != nil {
return errorBadRequest("price table invalid: %v", err)
}
settings := s.settings.RHP4Settings()

if err := rhp4.ValidateModifyActions(req.Actions, settings.MaxModifyActions); err != nil {
return errorBadRequest("modify actions invalid: %v", err)
if err := req.Validate(s.hostKey.PublicKey(), settings.MaxModifyActions); err != nil {
return errorBadRequest("request invalid: %v", err)
}
prices := req.Prices

cs := s.chain.TipState()

Expand Down Expand Up @@ -419,21 +399,17 @@ func (s *Server) handleRPCSectorRoots(stream net.Conn) error {
return errorDecodingError("failed to read request: %v", err)
}

prices := req.Prices
if err := prices.Validate(s.hostKey.PublicKey()); err != nil {
return fmt.Errorf("price table invalid: %w", err)
}

state, unlock, err := s.lockContractForRevision(req.ContractID)
if err != nil {
return fmt.Errorf("failed to lock contract: %w", err)
}
defer unlock()

// validate the request fields
if err := req.Validate(state.Revision); err != nil {
if err := req.Validate(s.hostKey.PublicKey(), state.Revision); err != nil {
return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error())
}
prices := req.Prices

// update the revision
revision, err := rhp4.ReviseForSectorRoots(state.Revision, prices, req.Length)
Expand Down Expand Up @@ -490,21 +466,12 @@ func (s *Server) handleRPCFormContract(stream net.Conn) error {
}

ourKey := s.hostKey.PublicKey()
prices := req.Prices
if err := prices.Validate(ourKey); err != nil {
return err
}

// get current settings and tip
settings := s.settings.RHP4Settings()
// set the prices to match the signed prices
settings.Prices = req.Prices
tip := s.chain.Tip()

// validate the request
if err := req.Validate(settings, tip); err != nil {
return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error())
if err := req.Validate(ourKey, tip, settings.MaxCollateral, settings.MaxContractDuration); err != nil {
return err
}
prices := req.Prices

formationTxn := types.V2Transaction{
MinerFee: req.MinerFee,
Expand Down Expand Up @@ -629,14 +596,11 @@ func (s *Server) handleRPCRenewContract(stream net.Conn) error {
return fmt.Errorf("price table invalid: %w", err)
}

// get the settings and update the prices to match the signed prices
settings := s.settings.RHP4Settings()
settings.Prices = prices
// get the current tip
tip := s.chain.Tip()

// validate the request
if err := req.Validate(settings, tip); err != nil {
if err := req.Validate(s.hostKey.PublicKey(), tip, settings.MaxCollateral, settings.MaxContractDuration); err != nil {
return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error())
}

Expand Down Expand Up @@ -812,18 +776,10 @@ func (s *Server) handleRPCVerifySector(stream net.Conn) error {
var req rhp4.RPCVerifySectorRequest
if err := rhp4.ReadRequest(stream, &req); err != nil {
return errorDecodingError("failed to read request: %v", err)
} else if err := req.Validate(); err != nil {
} else if err := req.Validate(s.hostKey.PublicKey()); err != nil {
return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error())
}

prices := req.Prices
if err := prices.Validate(s.hostKey.PublicKey()); err != nil {
return fmt.Errorf("failed to validate price table: %w", err)
}
token := req.Token
if err := token.Validate(); err != nil {
return fmt.Errorf("failed to validate token: %w", err)
}
prices, token := req.Prices, req.Token

if err := s.contractor.DebitAccount(token.Account, prices.RPCVerifySectorCost()); err != nil {
return fmt.Errorf("failed to debit account: %w", err)
Expand Down Expand Up @@ -1061,14 +1017,12 @@ func errorDecodingError(f string, p ...any) error {
}

// NewServer creates a new RHP4 server
func NewServer(pk types.PrivateKey, cm ChainManager, syncer Syncer, contracts Contractor, wallet Wallet, settings SettingsReporter, sectors SectorStore, opts ...ServerOption) *Server {
func NewServer(pk types.PrivateKey, cm ChainManager, syncer Syncer, contracts Contractor, wallet Wallet, settings Settings, sectors Sectors, opts ...ServerOption) *Server {
s := &Server{
hostKey: pk,
priceTableValidity: 30 * time.Minute,
contractProofWindowBuffer: 10,

log: zap.NewNop(),

chain: cm,
syncer: syncer,
wallet: wallet,
Expand Down
15 changes: 10 additions & 5 deletions rhp/v4/siamux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ const (

// siaMuxClientTransport is a TransportClient that uses the SiaMux multiplexer.
type siaMuxClientTransport struct {
m *mux.Mux

close chan struct{}
m *mux.Mux
theirKey types.PublicKey
close chan struct{}
}

// Close implements the [TransportClient] interface.
Expand All @@ -37,6 +37,10 @@ func (t *siaMuxClientTransport) FrameSize() int {
return 1440 * 3 // from SiaMux handshake.go
}

func (t *siaMuxClientTransport) PeerKey() types.PublicKey {
return t.theirKey
}

// DialStream implements the [TransportClient] interface. The stream lifetime is
// scoped to the context; if the context is canceled, the stream is closed.
func (t *siaMuxClientTransport) DialStream(ctx context.Context) net.Conn {
Expand All @@ -63,7 +67,8 @@ func DialSiaMux(ctx context.Context, addr string, theirKey types.PublicKey) (Tra
return nil, fmt.Errorf("failed to establish siamux connection: %w", err)
}
return &siaMuxClientTransport{
m: m,
close: make(chan struct{}),
m: m,
theirKey: theirKey,
close: make(chan struct{}),
}, nil
}
Loading

0 comments on commit 44117d3

Please sign in to comment.