Skip to content

Commit

Permalink
Register miniblocks only on sudden intervals (#1923)
Browse files Browse the repository at this point in the history
Co-authored-by: Serge Khorun <[email protected]>
Co-authored-by: Serge Khorun <[email protected]>
Co-authored-by: Shuhui Luo <[email protected]>
  • Loading branch information
4 people authored Dec 30, 2024
1 parent a86dfe8 commit 7dfb480
Show file tree
Hide file tree
Showing 129 changed files with 470 additions and 302 deletions.
12 changes: 4 additions & 8 deletions contracts/src/river/registry/facets/stream/StreamRegistry.sol
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ contract StreamRegistry is IStreamRegistry, RegistryModifiers {
revert(RiverRegistryErrors.STREAM_SEALED);
}

// Check if the lastMiniblockNum is the next expected miniblock and
// the prevMiniblockHash is correct
if (
stream.lastMiniblockNum + 1 != lastMiniblockNum ||
stream.lastMiniblockHash != prevMiniBlockHash
) {
// Ensure that the lastMiniblockNum is newer than the current head.
if (stream.lastMiniblockNum >= lastMiniblockNum) {
revert(RiverRegistryErrors.BAD_ARG);
}

Expand All @@ -127,8 +123,8 @@ contract StreamRegistry is IStreamRegistry, RegistryModifiers {
stream.flags |= StreamFlags.SEALED;
}

// Delete genesis miniblock bytes if the stream is moving beyond genesis
if (lastMiniblockNum == 1) {
// Delete genesis miniblock if `stream` still contains the genesis block after `stream` has advanced since genesis.
if (stream.lastMiniblockNum == 0) {
delete ds.genesisMiniblockByStreamId[streamId];
}

Expand Down
2 changes: 1 addition & 1 deletion core/cmd/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func runServices(ctx context.Context, cfg *config.Config, stream bool, xchain bo
var baseChain *crypto.Blockchain
var riverChain *crypto.Blockchain
if stream {
streamService, err = rpc.StartServer(ctx, cfg, nil)
streamService, err = rpc.StartServer(ctx, cancel, cfg, nil)
if err != nil {
log.Error("Failed to start server", "error", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion core/contracts/river/deploy/mock_river_registry.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions core/node/crypto/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
MediaStreamMembershipLimitsGDMConfigKey = "media.streamMembershipLimits.77"
MediaStreamMembershipLimitsDMConfigKey = "media.streamMembershipLimits.88"
XChainBlockchainsConfigKey = "xchain.blockchains"
StreamMiniblockRegistrationFrequencyKey = "stream.miniblockRegistrationFrequency"
)

var (
Expand Down Expand Up @@ -93,6 +94,10 @@ type OnChainSettings struct {

MinSnapshotEvents MinSnapshotEventsSettings `mapstructure:",squash"`

// StreamMiniblockRegistrationFrequency indicates how often miniblocks are registered.
// E.g. StreamMiniblockRegistrationFrequency=5 means that only 1 out of 5 miniblocks for a stream are registered.
StreamMiniblockRegistrationFrequency uint64 `mapstructure:"stream.miniblockRegistrationFrequency"`

StreamCacheExpiration time.Duration `mapstructure:"stream.cacheExpirationMs"`
StreamCachePollIntterval time.Duration `mapstructure:"stream.cacheExpirationPollIntervalMs"`

Expand Down Expand Up @@ -170,6 +175,8 @@ func DefaultOnChainSettings() *OnChainSettings {
// TODO: Set it to the default value when the client side is updated.
GetMiniblocksMaxPageSize: 0,

StreamMiniblockRegistrationFrequency: 1,

MembershipLimits: MembershipLimitsSettings{
GDM: 48,
DM: 2,
Expand Down
64 changes: 46 additions & 18 deletions core/node/events/miniblock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ type MiniblockProducerOpts struct {
func NewMiniblockProducer(
ctx context.Context,
streamCache StreamCache,
cfg crypto.OnChainConfiguration,
opts *MiniblockProducerOpts,
) *miniblockProducer {
mb := &miniblockProducer{
streamCache: streamCache,
localNodeAddress: streamCache.Params().Wallet.Address,
cfg: cfg,
}

if opts != nil {
mb.opts = *opts
}
Expand All @@ -93,6 +96,7 @@ func NewMiniblockProducer(

type miniblockProducer struct {
streamCache StreamCache
cfg crypto.OnChainConfiguration
opts MiniblockProducerOpts
localNodeAddress common.Address

Expand All @@ -108,8 +112,9 @@ var _ MiniblockProducer = (*miniblockProducer)(nil)

// mbJos tracks single miniblock production attempt for a single stream.
type mbJob struct {
stream *streamImpl
candidate *MiniblockInfo
stream *streamImpl
candidate *MiniblockInfo
replicated bool
}

// candidateTracker is a helper struct to accumulate proposals and call SetStreamLastMiniblockBatch.
Expand Down Expand Up @@ -428,36 +433,37 @@ func mbProduceCandidate(
params *StreamCacheParams,
stream *streamImpl,
forceSnapshot bool,
) (*MiniblockInfo, error) {
) (*MiniblockInfo, bool, error) {
remoteNodes, isLocal := stream.GetRemotesAndIsLocal()
replicated := len(remoteNodes) > 0
// TODO: this is a sanity check, but in general mb production code needs to be hardened
// to handle scenario when local node is removed from the stream.
if !isLocal {
return nil, RiverError(Err_INTERNAL, "Not a local stream")
return nil, replicated, RiverError(Err_INTERNAL, "Not a local stream")
}

view, err := stream.getViewIfLocal(ctx)
if err != nil {
return nil, err
return nil, replicated, err
}
if view == nil {
return nil, RiverError(Err_INTERNAL, "mbProduceCandidate: stream is not local")
return nil, replicated, RiverError(Err_INTERNAL, "mbProduceCandidate: stream is not local")
}

mbInfo, err := mbProduceCandiate_Make(ctx, params, view, forceSnapshot, remoteNodes)
if err != nil {
return nil, err
return nil, replicated, err
}
if mbInfo == nil {
return nil, nil
return nil, replicated, nil
}

err = mbProduceCandiate_Save(ctx, params, stream.streamId, mbInfo, remoteNodes)
if err != nil {
return nil, err
return nil, replicated, err
}

return mbInfo, nil
return mbInfo, replicated, nil
}

func mbProduceCandiate_Make(
Expand Down Expand Up @@ -568,7 +574,7 @@ func (p *miniblockProducer) jobStart(ctx context.Context, j *mbJob, forceSnapsho
return
}

candidate, err := mbProduceCandidate(ctx, p.streamCache.Params(), j.stream, forceSnapshot)
candidate, replicated, err := mbProduceCandidate(ctx, p.streamCache.Params(), j.stream, forceSnapshot)
if err != nil {
dlog.FromCtx(ctx).
Error("MiniblockProducer: jobStart: Error creating new miniblock proposal", "streamId", j.stream.streamId, "err", err)
Expand All @@ -581,6 +587,7 @@ func (p *miniblockProducer) jobStart(ctx context.Context, j *mbJob, forceSnapsho
}

j.candidate = candidate
j.replicated = replicated
p.candidates.add(ctx, p, j)
}

Expand All @@ -597,9 +604,28 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [
return
}

// Only register miniblocks when it's time. If it's not time assume registration was successful.
// This is to reduce the number of transactions/calldata size.
var success []StreamId
if len(proposals) == 1 {
job := proposals[0]
var filteredProposals []*mbJob
for _, job := range proposals {
freq := int64(p.cfg.Get().StreamMiniblockRegistrationFrequency)
if freq <= 0 {
freq = 1
}

if job.replicated || job.candidate.Ref.Num%freq == 0 {
filteredProposals = append(filteredProposals, job)
} else {
success = append(success, job.stream.streamId)

log.Debug("submitProposalBatch: skip miniblock registration",
"streamId", job.stream.streamId, "blocknum", job.candidate.Ref.Num)
}
}

if len(filteredProposals) == 1 {
job := filteredProposals[0]

err := p.streamCache.Params().Registry.SetStreamLastMiniblock(
ctx,
Expand All @@ -614,9 +640,10 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [
} else {
success = append(success, job.stream.streamId)
}

} else {
var mbs []river.SetMiniblock
for _, job := range proposals {
for _, job := range filteredProposals {
mbs = append(
mbs,
river.SetMiniblock{
Expand All @@ -631,13 +658,14 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [

var failed []StreamId
var err error
success, failed, err = p.streamCache.Params().Registry.SetStreamLastMiniblockBatch(ctx, mbs)
if err != nil {
log.Error("processMiniblockProposalBatch: Error registering miniblock batch", "err", err)
} else {
successRegistered, failed, err := p.streamCache.Params().Registry.SetStreamLastMiniblockBatch(ctx, mbs)
if err == nil {
success = append(success, successRegistered...)
if len(failed) > 0 {
log.Error("processMiniblockProposalBatch: Failed to register some miniblocks", "failed", failed)
}
} else {
log.Error("processMiniblockProposalBatch: Error registering miniblock batch", "err", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func Disabled_TestStreamUnloadWithSubscribers(t *testing.T) {
streamCache = NewStreamCache(ctx, tc.instances[0].params)
err = streamCache.Start(ctx)
require.NoError(err, "instantiating stream cache")
mpProducer := NewMiniblockProducer(ctx, streamCache, &MiniblockProducerOpts{TestDisableMbProdcutionOnBlock: true})
mpProducer := NewMiniblockProducer(ctx, streamCache, tc.btc.OnChainConfig, &MiniblockProducerOpts{TestDisableMbProdcutionOnBlock: true})

for streamID, syncCookie := range syncCookies {
streamSync, err := streamCache.GetStreamWaitForLocal(ctx, streamID)
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func mbTest(
addEventToStream(t, ctx, tt.instances[0].params, stream, "1", view.LastBlock().Ref)
addEventToStream(t, ctx, tt.instances[0].params, stream, "2", view.LastBlock().Ref)

proposal, err := mbProduceCandidate(ctx, tt.instances[0].params, stream.(*streamImpl), false)
proposal, _, err := mbProduceCandidate(ctx, tt.instances[0].params, stream.(*streamImpl), false)
mb := proposal.headerEvent.Event.GetMiniblockHeader()
events := proposal.Events()
require.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ctc *cacheTestContext) initCache(n int, opts *MiniblockProducerOpts) *stre
err := streamCache.Start(ctc.ctx)
ctc.require.NoError(err)
ctc.instances[n].cache = streamCache
ctc.instances[n].mbProducer = NewMiniblockProducer(ctc.ctx, streamCache, opts)
ctc.instances[n].mbProducer = NewMiniblockProducer(ctc.ctx, streamCache, ctc.btc.OnChainConfig, opts)
return streamCache
}

Expand Down
4 changes: 2 additions & 2 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ func (s *Service) initCacheAndSync(opts *ServerStartOpts) error {
return err
}

s.mbProducer = events.NewMiniblockProducer(s.serverCtx, s.cache, nil)
s.mbProducer = events.NewMiniblockProducer(s.serverCtx, s.cache, s.chainConfig, nil)

s.syncHandler = sync.NewHandler(
s.wallet.Address,
Expand Down Expand Up @@ -763,11 +763,11 @@ type ServerStartOpts struct {
// and server must exit.
func StartServer(
ctx context.Context,
ctxCancel context.CancelFunc,
cfg *config.Config,
opts *ServerStartOpts,
) (*Service, error) {
ctx = config.CtxWithConfig(ctx, cfg)
ctx, ctxCancel := context.WithCancel(ctx)

streamService := &Service{
serverCtx: ctx,
Expand Down
Loading

0 comments on commit 7dfb480

Please sign in to comment.