Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin/commands/common/set_golog_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *SetGologLevelCommand) Validator(req *admin.CommandRequest) error {
if !ok {
return errors.New("the input must be a string")
}
logLevel, err := golog.LevelFromString(level)
logLevel, err := golog.Parse(level)
if err != nil {
return fmt.Errorf("failed to parse level: %w", err)
}
Expand Down
4 changes: 0 additions & 4 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
),
}

if !builder.BitswapReprovideEnabled {
opts = append(opts, blob.WithReprovideInterval(-1))
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDatastoreManager.Datastore(), opts...)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,6 @@ func (exeNode *ExecutionNode) LoadBlobService(
),
}

if !node.BitswapReprovideEnabled {
opts = append(opts, blob.WithReprovideInterval(-1))
}

if exeNode.exeConf.blobstoreRateLimit > 0 && exeNode.exeConf.blobstoreBurstLimit > 0 {
opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit))
}
Expand Down
15 changes: 5 additions & 10 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,6 @@ type BaseConfig struct {
// DhtSystemEnabled configures whether the DHT system is enabled on Access and Execution nodes.
DhtSystemEnabled bool

// BitswapReprovideEnabled configures whether the Bitswap reprovide mechanism is enabled.
// This is only meaningful to Access and Execution nodes.
BitswapReprovideEnabled bool

TransactionFeesDisabled bool
}

Expand Down Expand Up @@ -297,12 +293,11 @@ func DefaultBaseConfig() *BaseConfig {
Duration: 10 * time.Second,
},

HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
}
}

Expand Down
4 changes: 0 additions & 4 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
"dht-enabled",
defaultConfig.DhtSystemEnabled,
"[experimental] whether to enable dht system. This is an experimental feature. Use with caution.")
fnb.flags.BoolVar(&fnb.BaseConfig.BitswapReprovideEnabled,
"bitswap-reprovide-enabled",
defaultConfig.BitswapReprovideEnabled,
"[experimental] whether to enable bitswap reproviding. This is an experimental feature. Use with caution.")

// dynamic node startup flags
fnb.flags.StringVar(&fnb.BaseConfig.DynamicStartupANPubkey,
Expand Down
164 changes: 77 additions & 87 deletions go.mod

Large diffs are not rendered by default.

410 changes: 175 additions & 235 deletions go.sum

Large diffs are not rendered by default.

161 changes: 75 additions & 86 deletions insecure/go.mod

Large diffs are not rendered by default.

416 changes: 173 additions & 243 deletions insecure/go.sum

Large diffs are not rendered by default.

162 changes: 75 additions & 87 deletions integration/go.mod

Large diffs are not rendered by default.

431 changes: 163 additions & 268 deletions integration/go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion integration/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
dockerclient "github.com/docker/docker/client"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (net *FlowNetwork) GetMetricFromContainer(t *testing.T, containerName, metr
require.NoError(t, err, fmt.Sprintf("failed to get metrics for container %s at url %s: %s", containerName, metricsURL, err))
defer res.Body.Close()

var parser expfmt.TextParser
parser := expfmt.NewTextParser(model.UTF8Validation)
mf, err := parser.TextToMetricFamilies(res.Body)
require.NoError(t, err, fmt.Sprintf("failed to parse metrics for container %s at url %s: %s", containerName, metricsURL, err))
m, ok := mf[metricName]
Expand Down
8 changes: 5 additions & 3 deletions integration/utils/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ type testBlobService struct {
var _ network.BlobService = (*testBlobService)(nil)
var _ component.Component = (*testBlobService)(nil)

// WithHashOnRead sets whether or not the blobstore will rehash the blob data on read
// WithHashOnRead configures the blobstore toi rehash the blob data on read
// When set, calls to GetBlob will fail with an error if the hash of the data in storage does not
// match its CID
func WithHashOnRead(enabled bool) network.BlobServiceOption {
func WithHashOnRead() network.BlobServiceOption {
return func(bs network.BlobService) {
bs.(*testBlobService).blockStore.HashOnRead(enabled)
bs.(*testBlobService).blockStore = &blockstore.ValidatingBlockstore{
Blockstore: bs.(*testBlobService).blockStore,
}
}
}

Expand Down
35 changes: 23 additions & 12 deletions module/blobs/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ type Blobstore interface {
// the CIDs in the Blobstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)

// HashOnRead specifies if every read blob should be
// rehashed to make sure it matches its CID.
HashOnRead(enabled bool)
}

var ErrNotFound = errors.New("blobstore: blob not found")
Expand All @@ -41,8 +37,29 @@ type blobstoreImpl struct {
bs blockstore.Blockstore
}

func NewBlobstore(ds datastore.Batching) *blobstoreImpl {
return &blobstoreImpl{bs: blockstore.NewBlockstore(ds)}
var _ Blobstore = (*blobstoreImpl)(nil)

type BlobstoreOption func(*blobstoreImpl)

// WithHashOnRead configures the blobstore to rehash the blob data on read
// When set, calls to Get will fail with an error if the hash of the data in storage does not
// match its CID
func WithHashOnRead() BlobstoreOption {
return func(bs *blobstoreImpl) {
bs.bs = &blockstore.ValidatingBlockstore{
Blockstore: bs.bs,
}
}
}

func NewBlobstore(ds datastore.Batching, opts ...BlobstoreOption) *blobstoreImpl {
bs := &blobstoreImpl{bs: blockstore.NewBlockstore(ds)}

for _, opt := range opts {
opt(bs)
}

return bs
}

func (bs *blobstoreImpl) DeleteBlob(ctx context.Context, c cid.Cid) error {
Expand Down Expand Up @@ -78,10 +95,6 @@ func (bs *blobstoreImpl) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error
return bs.bs.AllKeysChan(ctx)
}

func (bs *blobstoreImpl) HashOnRead(enabled bool) {
bs.bs.HashOnRead(enabled)
}

// NoopBlobstore is a Blobstore that does nothing, which is useful for calculating
// BlockExecutionData IDs without storing the data.
type NoopBlobstore struct{}
Expand Down Expand Up @@ -117,5 +130,3 @@ func (n *NoopBlobstore) PutMany(context.Context, []Blob) error {
func (n *NoopBlobstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, nil
}

func (n *NoopBlobstore) HashOnRead(enabled bool) {}
4 changes: 2 additions & 2 deletions module/executiondatasync/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func TestCalculateExecutionDataLifecycle(t *testing.T) {
require.NoError(t, err)
defer dsManager.Close()

bs := blobs.NewBlobstore(dsManager.Datastore())
bs.HashOnRead(true) // ensure data read from db matches expected hash
// ensure data read from db matches expected hash
bs := blobs.NewBlobstore(dsManager.Datastore(), blobs.WithHashOnRead())
executionDataStore := execution_data.NewExecutionDataStore(bs, execution_data.DefaultSerializer)

executionData, err := executionDataStore.Get(ctx, canonicalExecutionDataID)
Expand Down
3 changes: 0 additions & 3 deletions network/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ type BlobService interface {

// GetSession creates a new session that allows for controlled exchange of wantlists to decrease the bandwidth overhead.
GetSession(ctx context.Context) BlobGetter

// TriggerReprovide updates the BlobService's provider entries in the DHT
TriggerReprovide(ctx context.Context) error
}

type BlobServiceOption func(BlobService)
Expand Down
3 changes: 1 addition & 2 deletions network/internal/p2pfixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"
"time"

addrutil "github.com/libp2p/go-addr-util"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -58,7 +57,7 @@ func SilentNodeFixture(t *testing.T) (net.Listener, flow.Identity) {
require.NoError(t, err)

addrs := []multiaddr.Multiaddr{addr}
addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
addrs, err = manet.ResolveUnspecifiedAddresses(addrs, nil)
require.NoError(t, err)

go acceptAndHang(t, lst)
Expand Down
18 changes: 0 additions & 18 deletions network/mock/blob_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 11 additions & 45 deletions network/p2p/blob/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/ipfs/boxo/bitswap"
bsmsg "github.com/ipfs/boxo/bitswap/message"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/bsnet"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/provider"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -35,33 +34,19 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

const (
// DefaultReprovideInterval is the default interval at which DHT provider entries are refreshed
DefaultReprovideInterval = 12 * time.Hour
)

type blobService struct {
prefix string
component.Component
blockService blockservice.BlockService
blockStore blockstore.Blockstore
reprovider provider.System
config *BlobServiceConfig
}

var _ network.BlobService = (*blobService)(nil)
var _ component.Component = (*blobService)(nil)

type BlobServiceConfig struct {
ReprovideInterval time.Duration // the interval at which the DHT provider entries are refreshed
BitswapOptions []bitswap.Option // options to pass to the Bitswap service
}

// WithReprovideInterval sets the interval at which DHT provider entries are refreshed
func WithReprovideInterval(d time.Duration) network.BlobServiceOption {
return func(bs network.BlobService) {
bs.(*blobService).config.ReprovideInterval = d
}
BitswapOptions []bitswap.Option // options to pass to the Bitswap service
}

// WithBitswapOptions sets additional options for Bitswap exchange
Expand All @@ -78,12 +63,14 @@ func WithParentBlobService(parent network.BlobService) network.BlobServiceOption
}
}

// WithHashOnRead sets whether the blobstore will rehash the blob data on read
// WithHashOnRead configures the blobstore to rehash the blob data on read
// When set, calls to GetBlob will fail with an error if the hash of the data in storage does not
// match its CID
func WithHashOnRead(enabled bool) network.BlobServiceOption {
func WithHashOnRead() network.BlobServiceOption {
return func(bs network.BlobService) {
bs.(*blobService).blockStore.HashOnRead(enabled)
bs.(*blobService).blockStore = &blockstore.ValidatingBlockstore{
Blockstore: bs.(*blobService).blockStore,
}
}
}

Expand All @@ -108,7 +95,7 @@ func NewBlobService(
logger zerolog.Logger,
opts ...network.BlobServiceOption,
) (*blobService, error) {
bsNetwork := bsnet.NewFromIpfsHost(host, r, bsnet.Prefix(protocol.ID(prefix)))
bsNetwork := bsnet.NewFromIpfsHost(host, bsnet.Prefix(protocol.ID(prefix)))
blockStore, err := blockstore.CachedBlockstore(
context.Background(),
blockstore.NewBlockstore(ds),
Expand All @@ -118,10 +105,8 @@ func NewBlobService(
return nil, fmt.Errorf("failed to create cached blockstore: %w", err)
}
bs := &blobService{
prefix: prefix,
config: &BlobServiceConfig{
ReprovideInterval: DefaultReprovideInterval,
},
prefix: prefix,
config: &BlobServiceConfig{},
blockStore: blockStore,
}

Expand All @@ -131,7 +116,7 @@ func NewBlobService(

cm := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
btswp := bitswap.New(ctx, bsNetwork, bs.blockStore, bs.config.BitswapOptions...)
btswp := bitswap.New(ctx, bsNetwork, r, bs.blockStore, bs.config.BitswapOptions...)
bs.blockService = blockservice.New(bs.blockStore, btswp)

ready()
Expand Down Expand Up @@ -160,20 +145,6 @@ func NewBlobService(
}
}
}).
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
// New creates and starts the reprovider (non-blocking)
reprovider, err := provider.New(ds,
provider.Online(r),
provider.KeyProvider(provider.NewBlockstoreProvider(bs.blockStore)),
provider.ReproviderInterval(bs.config.ReprovideInterval),
)
if err != nil {
ctx.Throw(fmt.Errorf("failed to start reprovider: %w", err))
}

bs.reprovider = reprovider
ready()
}).
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()

Expand All @@ -182,7 +153,6 @@ func NewBlobService(

var err *multierror.Error

err = multierror.Append(err, bs.reprovider.Close())
err = multierror.Append(err, bs.blockService.Close())

if err.ErrorOrNil() != nil {
Expand All @@ -196,10 +166,6 @@ func NewBlobService(
return bs, nil
}

func (bs *blobService) TriggerReprovide(ctx context.Context) error {
return bs.reprovider.Reprovide(ctx)
}

func (bs *blobService) GetBlob(ctx context.Context, c cid.Cid) (blobs.Blob, error) {
blob, err := bs.blockService.GetBlock(ctx, c)
if ipld.IsNotFound(err) {
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/connection/peerManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"testing"
"time"

"github.com/ipfs/go-log"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
Expand Down
6 changes: 6 additions & 0 deletions network/p2p/test/mockStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type MockStream struct {
pr *io.PipeReader
}

var _ network.Stream = (*MockStream)(nil)

func NewMockStream(pw *io.PipeWriter, pr *io.PipeReader) *MockStream {
return &MockStream{
pw: pw,
Expand Down Expand Up @@ -49,6 +51,10 @@ func (m *MockStream) Reset() error {
return nil
}

func (m *MockStream) ResetWithError(errCode network.StreamErrorCode) error {
return nil
}

func (m *MockStream) SetDeadline(_ time.Time) error {
return nil
}
Expand Down
Loading
Loading