From 44117d3ede113d4a2492ac83304e48e202b9d194 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Sat, 28 Sep 2024 16:28:53 -0400 Subject: [PATCH] rhp4: address review comments --- go.mod | 2 +- go.sum | 4 +- rhp/v4/options.go | 9 ---- rhp/v4/rpc.go | 13 +++-- rhp/v4/rpc_test.go | 4 +- rhp/v4/server.go | 118 ++++++++++++++------------------------------- rhp/v4/siamux.go | 15 ++++-- testutil/host.go | 4 +- 8 files changed, 61 insertions(+), 108 deletions(-) diff --git a/go.mod b/go.mod index 5f07226..ec1a622 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( go.etcd.io/bbolt v1.3.11 - go.sia.tech/core v0.4.8-0.20240928172751-311e17d5e7c0 + go.sia.tech/core v0.4.8-0.20240928202806-0e77790bd8bf go.sia.tech/mux v1.3.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.27.0 diff --git a/go.sum b/go.sum index b0c5cee..6928938 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= -go.sia.tech/core v0.4.8-0.20240928172751-311e17d5e7c0 h1:np9jgbC11k8EYUa1MYtw6TBZteu0UlgRpv+G7zoC150= -go.sia.tech/core v0.4.8-0.20240928172751-311e17d5e7c0/go.mod h1:j2Ke8ihV8or7d2VDrFZWcCkwSVHO0DNMQJAGs9Qop2M= +go.sia.tech/core v0.4.8-0.20240928202806-0e77790bd8bf h1:x/lM7Y8Rlo12rcpPXapLvSVNyrHZEKO6j4eLNccPMKw= +go.sia.tech/core v0.4.8-0.20240928202806-0e77790bd8bf/go.mod h1:j2Ke8ihV8or7d2VDrFZWcCkwSVHO0DNMQJAGs9Qop2M= go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c= go.sia.tech/mux v1.3.0/go.mod h1:I46++RD4beqA3cW9Xm9SwXbezwPqLvHhVs9HLpDtt58= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/rhp/v4/options.go b/rhp/v4/options.go index a615c95..d668025 100644 --- a/rhp/v4/options.go +++ b/rhp/v4/options.go @@ -2,20 +2,11 @@ package rhp import ( "time" - - "go.uber.org/zap" ) // An ServerOption sets an option on a Server. type ServerOption func(*Server) -// WithLog sets the logger for the server. -func WithLog(log *zap.Logger) ServerOption { - return func(s *Server) { - s.log = log - } -} - // WithPriceTableValidity sets the duration for which a price table is valid. func WithPriceTableValidity(validity time.Duration) ServerOption { return func(s *Server) { diff --git a/rhp/v4/rpc.go b/rhp/v4/rpc.go index df33d6b..2944acc 100644 --- a/rhp/v4/rpc.go +++ b/rhp/v4/rpc.go @@ -33,7 +33,10 @@ type ( // A TransportClient is a generic multiplexer for outgoing streams. TransportClient interface { DialStream(context.Context) net.Conn + FrameSize() int + PeerKey() types.PublicKey + Close() error } @@ -180,7 +183,7 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice Offset: offset, Length: length, } - if err := req.Validate(); err != nil { + if err := req.Validate(t.PeerKey()); err != nil { return RPCReadSectorResult{}, fmt.Errorf("invalid request: %w", err) } @@ -222,7 +225,7 @@ func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPric DataLength: uint64(length), } - if err := req.Validate(); err != nil { + if err := req.Validate(t.PeerKey(), req.Duration); err != nil { return RPCWriteSectorResult{}, fmt.Errorf("invalid request: %w", err) } @@ -408,7 +411,7 @@ func RPCSectorRoots(ctx context.Context, t TransportClient, cs consensus.State, RenterSignature: revision.RenterSignature, } - if err := req.Validate(revision); err != nil { + if err := req.Validate(contract.Revision.HostPublicKey, revision); err != nil { return RPCSectorRootsResult{}, fmt.Errorf("invalid request: %w", err) } @@ -443,7 +446,7 @@ func RPCAccountBalance(ctx context.Context, t TransportClient, account rhp4.Acco func RPCFormContract(ctx context.Context, t TransportClient, tp TxPool, signer FormContractSigner, cs consensus.State, p rhp4.HostPrices, hostKey types.PublicKey, hostAddress types.Address, params rhp4.RPCFormContractParams) (RPCFormContractResult, error) { fc := rhp4.NewContract(p, params, hostKey, hostAddress) formationTxn := types.V2Transaction{ - MinerFee: types.Siacoins(1), + MinerFee: types.Siacoins(1), // TODO: be better FileContracts: []types.V2FileContract{fc}, } @@ -562,7 +565,7 @@ func RPCFormContract(ctx context.Context, t TransportClient, tp TxPool, signer F func RPCRenewContract(ctx context.Context, t TransportClient, tp TxPool, signer FormContractSigner, cs consensus.State, p rhp4.HostPrices, existing types.V2FileContract, params rhp4.RPCRenewContractParams) (RPCRenewContractResult, error) { renewal := rhp4.NewRenewal(existing, p, params) renewalTxn := types.V2Transaction{ - MinerFee: types.Siacoins(1), + MinerFee: types.Siacoins(1), // TODO: something about this FileContractResolutions: []types.V2FileContractResolution{ { Parent: types.V2FileContractElement{ diff --git a/rhp/v4/rpc_test.go b/rhp/v4/rpc_test.go index 27bd783..db5787f 100644 --- a/rhp/v4/rpc_test.go +++ b/rhp/v4/rpc_test.go @@ -66,8 +66,8 @@ func (fs *fundAndSign) Address() types.Address { return fs.w.Address() } -func testRenterHostPair(tb testing.TB, hostKey types.PrivateKey, cm rhp4.ChainManager, s rhp4.Syncer, w rhp4.Wallet, c rhp4.Contractor, sr rhp4.SettingsReporter, ss rhp4.SectorStore, log *zap.Logger) rhp4.TransportClient { - rs := rhp4.NewServer(hostKey, cm, s, c, w, sr, ss, rhp4.WithContractProofWindowBuffer(10), rhp4.WithPriceTableValidity(2*time.Minute), rhp4.WithLog(log.Named("rhp4"))) +func testRenterHostPair(tb testing.TB, hostKey types.PrivateKey, cm rhp4.ChainManager, s rhp4.Syncer, w rhp4.Wallet, c rhp4.Contractor, sr rhp4.Settings, ss rhp4.Sectors, log *zap.Logger) rhp4.TransportClient { + rs := rhp4.NewServer(hostKey, cm, s, c, w, sr, ss, rhp4.WithContractProofWindowBuffer(10), rhp4.WithPriceTableValidity(2*time.Minute)) hostAddr := testutil.ServeSiaMux(tb, rs, log.Named("siamux")) transport, err := rhp4.DialSiaMux(context.Background(), hostAddr, hostKey.PublicKey()) diff --git a/rhp/v4/server.go b/rhp/v4/server.go index ce7789b..aa911df 100644 --- a/rhp/v4/server.go +++ b/rhp/v4/server.go @@ -75,7 +75,7 @@ type ( // Address returns the host's address Address() types.Address - // FundTransaction funds a transaction with the specified amount of + // FundV2Transaction funds a transaction with the specified amount of // Siacoins. If useUnconfirmed is true, the transaction may spend // unconfirmed outputs. The outputs spent by the transaction are locked // until they are released by ReleaseInputs. @@ -87,10 +87,11 @@ type ( ReleaseInputs(txns []types.Transaction, v2txns []types.V2Transaction) } - // A SectorStore is an interface for reading and writing sectors. - SectorStore interface { - ReadSector(types.Hash256) ([rhp4.SectorSize]byte, error) - // WriteSector stores a sector and returns its root hash. + // A Sectors is an interface for reading and writing sectors. + Sectors interface { + // ReadSector retrieves a sector by its root + ReadSector(root types.Hash256) ([rhp4.SectorSize]byte, error) + // WriteSector stores a sector WriteSector(root types.Hash256, data *[rhp4.SectorSize]byte, expiration uint64) error } @@ -105,24 +106,27 @@ type ( // LockV2Contract locks a contract and returns its current state. // The returned function must be called to release the lock. LockV2Contract(types.FileContractID) (RevisionState, func(), error) - // AddV2Contract adds a new contract to the host + // AddV2Contract adds a new contract to the host. AddV2Contract(TransactionSet, Usage) error - // RenewV2Contract finalizes an existing contract and adds its renewal + // RenewV2Contract finalizes an existing contract and adds its renewal. RenewV2Contract(TransactionSet, Usage) error // ReviseV2Contract atomically revises a contract and updates its sector - // roots and usage + // roots and usage. ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage Usage) error // ContractElement returns the contract state element for the given - // contract ID + // contract ID. ContractElement(types.FileContractID) (types.ChainIndex, types.V2FileContractElement, error) + // AccountBalance returns the balance of an account. AccountBalance(rhp4.Account) (types.Currency, error) + // CreditAccountsWithContract atomically revises a contract and credits the account. CreditAccountsWithContract([]rhp4.AccountDeposit, types.FileContractID, types.V2FileContract) ([]types.Currency, error) + // DebitAccount debits an account. DebitAccount(rhp4.Account, types.Currency) error } - // SettingsReporter reports the host's current settings. - SettingsReporter interface { + // Settings reports the host's current settings. + Settings interface { RHP4Settings() rhp4.HostSettings } @@ -132,18 +136,16 @@ type ( priceTableValidity time.Duration contractProofWindowBuffer uint64 - log *zap.Logger - chain ChainManager syncer Syncer wallet Wallet - sectors SectorStore + sectors Sectors contractor Contractor - settings SettingsReporter + settings Settings } ) -func (s *Server) lockContractForRevision(contractID types.FileContractID) (RevisionState, func(), error) { +func (s *Server) lockContractForRevision(contractID types.FileContractID) (rev RevisionState, unlock func(), _ error) { rev, unlock, err := s.contractor.LockV2Contract(contractID) switch { case err != nil: @@ -176,16 +178,12 @@ func (s *Server) handleRPCReadSector(stream net.Conn) error { return errorDecodingError("failed to read request: %v", err) } - prices, token := req.Prices, req.Token - if err := prices.Validate(s.hostKey.PublicKey()); err != nil { - return errorBadRequest("price table invalid: %v", err) - } else if err := token.Validate(); err != nil { - return errorBadRequest("account token invalid: %v", err) - } else if err := req.Validate(); err != nil { + if err := req.Validate(s.hostKey.PublicKey()); err != nil { return errorBadRequest("request invalid: %v", err) } + prices, token := req.Prices, req.Token - if err := s.contractor.DebitAccount(req.Token.Account, prices.RPCReadSectorCost(req.Length)); err != nil { + if err := s.contractor.DebitAccount(token.Account, prices.RPCReadSectorCost(req.Length)); err != nil { return fmt.Errorf("failed to debit account: %w", err) } @@ -207,25 +205,11 @@ func (s *Server) handleRPCWriteSector(stream net.Conn) error { if err := rhp4.ReadRequest(stream, &req); err != nil { return errorDecodingError("failed to read request: %v", err) } - prices, token := req.Prices, req.Token - if err := prices.Validate(s.hostKey.PublicKey()); err != nil { - return errorBadRequest("price table invalid: %v", err) - } else if err := token.Validate(); err != nil { - return errorBadRequest("account token invalid: %v", err) - } - settings := s.settings.RHP4Settings() - - switch { - case req.DataLength > rhp4.SectorSize: - return errorBadRequest("sector size %v exceeds maximum %v", req.DataLength, rhp4.SectorSize) - case req.DataLength%rhp4.LeafSize != 0: - return errorBadRequest("sector length %v must be a multiple of segment size %v", req.DataLength, rhp4.LeafSize) - case req.Duration > settings.MaxSectorDuration: - return errorBadRequest("sector duration %v exceeds maximum %v", req.Duration, settings.MaxSectorDuration) - case settings.RemainingStorage < rhp4.SectorSize: - return rhp4.ErrNotEnoughStorage + if err := req.Validate(s.hostKey.PublicKey(), settings.MaxSectorDuration); err != nil { + return errorBadRequest("request invalid: %v", err) } + prices := req.Prices var sector [rhp4.SectorSize]byte sr := io.LimitReader(stream, int64(req.DataLength)) @@ -260,15 +244,11 @@ func (s *Server) handleRPCModifySectors(stream net.Conn) error { return errorDecodingError("failed to read request: %v", err) } - prices := req.Prices - if err := prices.Validate(s.hostKey.PublicKey()); err != nil { - return errorBadRequest("price table invalid: %v", err) - } settings := s.settings.RHP4Settings() - - if err := rhp4.ValidateModifyActions(req.Actions, settings.MaxModifyActions); err != nil { - return errorBadRequest("modify actions invalid: %v", err) + if err := req.Validate(s.hostKey.PublicKey(), settings.MaxModifyActions); err != nil { + return errorBadRequest("request invalid: %v", err) } + prices := req.Prices cs := s.chain.TipState() @@ -419,11 +399,6 @@ func (s *Server) handleRPCSectorRoots(stream net.Conn) error { return errorDecodingError("failed to read request: %v", err) } - prices := req.Prices - if err := prices.Validate(s.hostKey.PublicKey()); err != nil { - return fmt.Errorf("price table invalid: %w", err) - } - state, unlock, err := s.lockContractForRevision(req.ContractID) if err != nil { return fmt.Errorf("failed to lock contract: %w", err) @@ -431,9 +406,10 @@ func (s *Server) handleRPCSectorRoots(stream net.Conn) error { defer unlock() // validate the request fields - if err := req.Validate(state.Revision); err != nil { + if err := req.Validate(s.hostKey.PublicKey(), state.Revision); err != nil { return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error()) } + prices := req.Prices // update the revision revision, err := rhp4.ReviseForSectorRoots(state.Revision, prices, req.Length) @@ -490,21 +466,12 @@ func (s *Server) handleRPCFormContract(stream net.Conn) error { } ourKey := s.hostKey.PublicKey() - prices := req.Prices - if err := prices.Validate(ourKey); err != nil { - return err - } - - // get current settings and tip settings := s.settings.RHP4Settings() - // set the prices to match the signed prices - settings.Prices = req.Prices tip := s.chain.Tip() - - // validate the request - if err := req.Validate(settings, tip); err != nil { - return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error()) + if err := req.Validate(ourKey, tip, settings.MaxCollateral, settings.MaxContractDuration); err != nil { + return err } + prices := req.Prices formationTxn := types.V2Transaction{ MinerFee: req.MinerFee, @@ -629,14 +596,11 @@ func (s *Server) handleRPCRenewContract(stream net.Conn) error { return fmt.Errorf("price table invalid: %w", err) } - // get the settings and update the prices to match the signed prices settings := s.settings.RHP4Settings() - settings.Prices = prices - // get the current tip tip := s.chain.Tip() // validate the request - if err := req.Validate(settings, tip); err != nil { + if err := req.Validate(s.hostKey.PublicKey(), tip, settings.MaxCollateral, settings.MaxContractDuration); err != nil { return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error()) } @@ -812,18 +776,10 @@ func (s *Server) handleRPCVerifySector(stream net.Conn) error { var req rhp4.RPCVerifySectorRequest if err := rhp4.ReadRequest(stream, &req); err != nil { return errorDecodingError("failed to read request: %v", err) - } else if err := req.Validate(); err != nil { + } else if err := req.Validate(s.hostKey.PublicKey()); err != nil { return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, err.Error()) } - - prices := req.Prices - if err := prices.Validate(s.hostKey.PublicKey()); err != nil { - return fmt.Errorf("failed to validate price table: %w", err) - } - token := req.Token - if err := token.Validate(); err != nil { - return fmt.Errorf("failed to validate token: %w", err) - } + prices, token := req.Prices, req.Token if err := s.contractor.DebitAccount(token.Account, prices.RPCVerifySectorCost()); err != nil { return fmt.Errorf("failed to debit account: %w", err) @@ -1061,14 +1017,12 @@ func errorDecodingError(f string, p ...any) error { } // NewServer creates a new RHP4 server -func NewServer(pk types.PrivateKey, cm ChainManager, syncer Syncer, contracts Contractor, wallet Wallet, settings SettingsReporter, sectors SectorStore, opts ...ServerOption) *Server { +func NewServer(pk types.PrivateKey, cm ChainManager, syncer Syncer, contracts Contractor, wallet Wallet, settings Settings, sectors Sectors, opts ...ServerOption) *Server { s := &Server{ hostKey: pk, priceTableValidity: 30 * time.Minute, contractProofWindowBuffer: 10, - log: zap.NewNop(), - chain: cm, syncer: syncer, wallet: wallet, diff --git a/rhp/v4/siamux.go b/rhp/v4/siamux.go index e2e38da..2c7caca 100644 --- a/rhp/v4/siamux.go +++ b/rhp/v4/siamux.go @@ -18,9 +18,9 @@ const ( // siaMuxClientTransport is a TransportClient that uses the SiaMux multiplexer. type siaMuxClientTransport struct { - m *mux.Mux - - close chan struct{} + m *mux.Mux + theirKey types.PublicKey + close chan struct{} } // Close implements the [TransportClient] interface. @@ -37,6 +37,10 @@ func (t *siaMuxClientTransport) FrameSize() int { return 1440 * 3 // from SiaMux handshake.go } +func (t *siaMuxClientTransport) PeerKey() types.PublicKey { + return t.theirKey +} + // DialStream implements the [TransportClient] interface. The stream lifetime is // scoped to the context; if the context is canceled, the stream is closed. func (t *siaMuxClientTransport) DialStream(ctx context.Context) net.Conn { @@ -63,7 +67,8 @@ func DialSiaMux(ctx context.Context, addr string, theirKey types.PublicKey) (Tra return nil, fmt.Errorf("failed to establish siamux connection: %w", err) } return &siaMuxClientTransport{ - m: m, - close: make(chan struct{}), + m: m, + theirKey: theirKey, + close: make(chan struct{}), }, nil } diff --git a/testutil/host.go b/testutil/host.go index 9c47e4a..2628f27 100644 --- a/testutil/host.go +++ b/testutil/host.go @@ -21,7 +21,7 @@ type EphemeralSectorStore struct { sectors map[types.Hash256][proto4.SectorSize]byte } -var _ rhp4.SectorStore = (*EphemeralSectorStore)(nil) +var _ rhp4.Sectors = (*EphemeralSectorStore)(nil) // ReadSector reads a sector from the EphemeralSectorStore. func (es *EphemeralSectorStore) ReadSector(root types.Hash256) ([proto4.SectorSize]byte, error) { @@ -285,7 +285,7 @@ type EphemeralSettingsReporter struct { settings proto4.HostSettings } -var _ rhp4.SettingsReporter = (*EphemeralSettingsReporter)(nil) +var _ rhp4.Settings = (*EphemeralSettingsReporter)(nil) // RHP4Settings implements the rhp4.SettingsReporter interface. func (esr *EphemeralSettingsReporter) RHP4Settings() proto4.HostSettings {