From c8abfae8671f909dab1710bd8a11826fe5cbce67 Mon Sep 17 00:00:00 2001 From: rbhnt Date: Thu, 6 Feb 2025 10:07:36 +0000 Subject: [PATCH 1/2] Cache stream when normalized --- core/node/events/stream_ephemeral.go | 31 +++++----------------- core/node/rpc/media_stream_test.go | 39 ++++++++++++++-------------- core/node/rpc/replicated_add.go | 2 +- 3 files changed, 27 insertions(+), 45 deletions(-) diff --git a/core/node/events/stream_ephemeral.go b/core/node/events/stream_ephemeral.go index cae8675850..b4694e1361 100644 --- a/core/node/events/stream_ephemeral.go +++ b/core/node/events/stream_ephemeral.go @@ -5,8 +5,6 @@ import ( "slices" "time" - "google.golang.org/protobuf/proto" - "connectrpc.com/connect" "github.com/river-build/river/core/contracts/river" @@ -40,10 +38,15 @@ func (s *StreamCache) onStreamCreated( if err := s.normalizeEphemeralStream( ctx, stream, - int64(event.Stream.LastMiniblockNum), // FIXME: This is 0 + int64(event.Stream.LastMiniblockNum), event.Stream.Flags&uint64(registries.StreamFlagSealed) != 0, ); err != nil { logging.FromCtx(ctx).Errorw("Failed to normalize ephemeral stream", "err", err, "streamId", event.GetStreamId()) + } else { + // Cache the stream + stream.mu.Lock() + s.cache.Store(stream.streamId, stream) + stream.mu.Unlock() } }() } @@ -62,28 +65,6 @@ func (s *StreamCache) normalizeEphemeralStream( return nil } - // Sealed stream has 0 in the last miniblock parameter so the given value should be fetched from the genesis - // TODO: Either store genesis miniblock data on chain or fetch it from replicas. Discuss. - if lastMiniblockNum == 0 && false { - _, _, blockdata, _, err := s.params.Registry.GetStreamWithGenesis(ctx, stream.StreamId()) - if err != nil { - return err - } - - var mb Miniblock - if err = proto.Unmarshal(blockdata, &mb); err != nil { - return err - } - - var mediaEvent StreamEvent - if err = proto.Unmarshal(mb.GetEvents()[0].Event, &mediaEvent); err != nil { - return err - } - - // Total miniblocks: genesis miniblock + media chunks - lastMiniblockNum = int64(mediaEvent.GetMediaPayload().GetInception().GetChunkCount()) + 1 - } - missingMbs := make([]int64, 0, lastMiniblockNum+1) // Check if the given stream is already sealed, if so, ignore the event. diff --git a/core/node/rpc/media_stream_test.go b/core/node/rpc/media_stream_test.go index 69549c408b..dd6bfa64e7 100644 --- a/core/node/rpc/media_stream_test.go +++ b/core/node/rpc/media_stream_test.go @@ -7,6 +7,7 @@ import ( "connectrpc.com/connect" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" "github.com/river-build/river/core/node/events" "github.com/river-build/river/core/node/protocol" @@ -58,29 +59,29 @@ func TestCreateMediaStream(t *testing.T) { t.Run("AddEvent failed for ephemeral streams", func(t *testing.T) { mp := events.Make_MediaPayload_Chunk([]byte("chunk 0"), 0) envelope, err := events.MakeEnvelopeWithPayload(alice.wallet, mp, mb) - tt.require.NoError(err) + require.NoError(t, err) aeResp, err := alice.client.AddEvent(alice.ctx, connect.NewRequest(&protocol.AddEventRequest{ StreamId: mediaStreamId[:], Event: envelope, })) - tt.require.Nil(aeResp) - tt.require.Error(err) - tt.require.Equal(connect.CodeNotFound, connect.CodeOf(err)) + require.Nil(t, aeResp) + require.Error(t, err) + require.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) }) t.Run("AddMediaEvent failed to add event with out of range chunk index", func(t *testing.T) { mp := events.Make_MediaPayload_Chunk([]byte("chunk"), chunks+1) envelope, err := events.MakeEnvelopeWithPayload(alice.wallet, mp, mb) - tt.require.NoError(err) + require.NoError(t, err) aeResp, err := alice.client.AddMediaEvent(alice.ctx, connect.NewRequest(&protocol.AddMediaEventRequest{ Event: envelope, CreationCookie: creationCookie, })) - tt.require.Nil(aeResp) - tt.require.Error(err) - tt.require.Equal(connect.CodeInvalidArgument, connect.CodeOf(err)) + require.Nil(t, aeResp) + require.Error(t, err) + require.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) }) t.Run("AddMediaEvent passed for ephemeral media streams", func(t *testing.T) { @@ -117,28 +118,28 @@ func TestCreateMediaStream(t *testing.T) { FromInclusive: 0, ToExclusive: chunks * 2, // adding a threshold to make sure there are no unexpected events })) - tt.require.NoError(err) - tt.require.NotNil(resp) - tt.require.Len(resp.Msg.GetMiniblocks(), chunks+1) // The first miniblock is the stream creation one + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Msg.GetMiniblocks(), chunks+1) // The first miniblock is the stream creation one mbs := resp.Msg.GetMiniblocks() // The first miniblock is the stream creation one - tt.require.Len(mbs[0].GetEvents(), 1) + require.Len(t, mbs[0].GetEvents(), 1) pe, err := events.ParseEvent(mbs[0].GetEvents()[0]) - tt.require.NoError(err) + require.NoError(t, err) mp, ok := pe.Event.GetPayload().(*protocol.StreamEvent_MediaPayload) - tt.require.True(ok) - tt.require.Equal(int32(chunks), mp.MediaPayload.GetInception().GetChunkCount()) + require.True(t, ok) + require.Equal(t, int32(chunks), mp.MediaPayload.GetInception().GetChunkCount()) // The rest of the miniblocks are the media chunks for i, mb := range mbs[1:] { - tt.require.Len(mb.GetEvents(), 1) + require.Len(t, mb.GetEvents(), 1) pe, err = events.ParseEvent(mb.GetEvents()[0]) - tt.require.NoError(err) + require.NoError(t, err) mp, ok = pe.Event.GetPayload().(*protocol.StreamEvent_MediaPayload) - tt.require.True(ok) - tt.require.Equal(mediaChunks[i], mp.MediaPayload.GetChunk().Data) + require.True(t, ok) + require.Equal(t, mediaChunks[i], mp.MediaPayload.GetChunk().Data) } }) } diff --git a/core/node/rpc/replicated_add.go b/core/node/rpc/replicated_add.go index 02a6715dd0..bcac6594e7 100644 --- a/core/node/rpc/replicated_add.go +++ b/core/node/rpc/replicated_add.go @@ -238,7 +238,7 @@ func (s *Service) replicatedAddMediaEventImpl(ctx context.Context, event *Parsed cc.NodeAddresses(), genesisMiniblockHash, common.BytesToHash(ephemeralMb.Header.Hash), - 0, + cc.MiniblockNum, true, ); err != nil { return nil, err From 70a1f5d8e017c1cf7eaff83db2994d190d45c4e5 Mon Sep 17 00:00:00 2001 From: rbhnt Date: Mon, 10 Feb 2025 09:15:30 +0000 Subject: [PATCH 2/2] Addressing comments --- core/node/events/stream_ephemeral.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/node/events/stream_ephemeral.go b/core/node/events/stream_ephemeral.go index b4694e1361..36f6356af0 100644 --- a/core/node/events/stream_ephemeral.go +++ b/core/node/events/stream_ephemeral.go @@ -42,12 +42,12 @@ func (s *StreamCache) onStreamCreated( event.Stream.Flags&uint64(registries.StreamFlagSealed) != 0, ); err != nil { logging.FromCtx(ctx).Errorw("Failed to normalize ephemeral stream", "err", err, "streamId", event.GetStreamId()) - } else { - // Cache the stream - stream.mu.Lock() - s.cache.Store(stream.streamId, stream) - stream.mu.Unlock() } + + // Cache the stream + stream.mu.Lock() + s.cache.Store(stream.streamId, stream) + stream.mu.Unlock() }() }