diff --git a/core/node/events/stream_ephemeral.go b/core/node/events/stream_ephemeral.go index cae867585..36f6356af 100644 --- a/core/node/events/stream_ephemeral.go +++ b/core/node/events/stream_ephemeral.go @@ -5,8 +5,6 @@ import ( "slices" "time" - "google.golang.org/protobuf/proto" - "connectrpc.com/connect" "github.com/river-build/river/core/contracts/river" @@ -40,11 +38,16 @@ func (s *StreamCache) onStreamCreated( if err := s.normalizeEphemeralStream( ctx, stream, - int64(event.Stream.LastMiniblockNum), // FIXME: This is 0 + int64(event.Stream.LastMiniblockNum), event.Stream.Flags&uint64(registries.StreamFlagSealed) != 0, ); err != nil { logging.FromCtx(ctx).Errorw("Failed to normalize ephemeral stream", "err", err, "streamId", event.GetStreamId()) } + + // Cache the stream + stream.mu.Lock() + s.cache.Store(stream.streamId, stream) + stream.mu.Unlock() }() } @@ -62,28 +65,6 @@ func (s *StreamCache) normalizeEphemeralStream( return nil } - // Sealed stream has 0 in the last miniblock parameter so the given value should be fetched from the genesis - // TODO: Either store genesis miniblock data on chain or fetch it from replicas. Discuss. - if lastMiniblockNum == 0 && false { - _, _, blockdata, _, err := s.params.Registry.GetStreamWithGenesis(ctx, stream.StreamId()) - if err != nil { - return err - } - - var mb Miniblock - if err = proto.Unmarshal(blockdata, &mb); err != nil { - return err - } - - var mediaEvent StreamEvent - if err = proto.Unmarshal(mb.GetEvents()[0].Event, &mediaEvent); err != nil { - return err - } - - // Total miniblocks: genesis miniblock + media chunks - lastMiniblockNum = int64(mediaEvent.GetMediaPayload().GetInception().GetChunkCount()) + 1 - } - missingMbs := make([]int64, 0, lastMiniblockNum+1) // Check if the given stream is already sealed, if so, ignore the event. diff --git a/core/node/rpc/media_stream_test.go b/core/node/rpc/media_stream_test.go index 69549c408..dd6bfa64e 100644 --- a/core/node/rpc/media_stream_test.go +++ b/core/node/rpc/media_stream_test.go @@ -7,6 +7,7 @@ import ( "connectrpc.com/connect" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" "github.com/river-build/river/core/node/events" "github.com/river-build/river/core/node/protocol" @@ -58,29 +59,29 @@ func TestCreateMediaStream(t *testing.T) { t.Run("AddEvent failed for ephemeral streams", func(t *testing.T) { mp := events.Make_MediaPayload_Chunk([]byte("chunk 0"), 0) envelope, err := events.MakeEnvelopeWithPayload(alice.wallet, mp, mb) - tt.require.NoError(err) + require.NoError(t, err) aeResp, err := alice.client.AddEvent(alice.ctx, connect.NewRequest(&protocol.AddEventRequest{ StreamId: mediaStreamId[:], Event: envelope, })) - tt.require.Nil(aeResp) - tt.require.Error(err) - tt.require.Equal(connect.CodeNotFound, connect.CodeOf(err)) + require.Nil(t, aeResp) + require.Error(t, err) + require.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) }) t.Run("AddMediaEvent failed to add event with out of range chunk index", func(t *testing.T) { mp := events.Make_MediaPayload_Chunk([]byte("chunk"), chunks+1) envelope, err := events.MakeEnvelopeWithPayload(alice.wallet, mp, mb) - tt.require.NoError(err) + require.NoError(t, err) aeResp, err := alice.client.AddMediaEvent(alice.ctx, connect.NewRequest(&protocol.AddMediaEventRequest{ Event: envelope, CreationCookie: creationCookie, })) - tt.require.Nil(aeResp) - tt.require.Error(err) - tt.require.Equal(connect.CodeInvalidArgument, connect.CodeOf(err)) + require.Nil(t, aeResp) + require.Error(t, err) + require.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) }) t.Run("AddMediaEvent passed for ephemeral media streams", func(t *testing.T) { @@ -117,28 +118,28 @@ func TestCreateMediaStream(t *testing.T) { FromInclusive: 0, ToExclusive: chunks * 2, // adding a threshold to make sure there are no unexpected events })) - tt.require.NoError(err) - tt.require.NotNil(resp) - tt.require.Len(resp.Msg.GetMiniblocks(), chunks+1) // The first miniblock is the stream creation one + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Msg.GetMiniblocks(), chunks+1) // The first miniblock is the stream creation one mbs := resp.Msg.GetMiniblocks() // The first miniblock is the stream creation one - tt.require.Len(mbs[0].GetEvents(), 1) + require.Len(t, mbs[0].GetEvents(), 1) pe, err := events.ParseEvent(mbs[0].GetEvents()[0]) - tt.require.NoError(err) + require.NoError(t, err) mp, ok := pe.Event.GetPayload().(*protocol.StreamEvent_MediaPayload) - tt.require.True(ok) - tt.require.Equal(int32(chunks), mp.MediaPayload.GetInception().GetChunkCount()) + require.True(t, ok) + require.Equal(t, int32(chunks), mp.MediaPayload.GetInception().GetChunkCount()) // The rest of the miniblocks are the media chunks for i, mb := range mbs[1:] { - tt.require.Len(mb.GetEvents(), 1) + require.Len(t, mb.GetEvents(), 1) pe, err = events.ParseEvent(mb.GetEvents()[0]) - tt.require.NoError(err) + require.NoError(t, err) mp, ok = pe.Event.GetPayload().(*protocol.StreamEvent_MediaPayload) - tt.require.True(ok) - tt.require.Equal(mediaChunks[i], mp.MediaPayload.GetChunk().Data) + require.True(t, ok) + require.Equal(t, mediaChunks[i], mp.MediaPayload.GetChunk().Data) } }) } diff --git a/core/node/rpc/replicated_add.go b/core/node/rpc/replicated_add.go index 02a6715dd..bcac6594e 100644 --- a/core/node/rpc/replicated_add.go +++ b/core/node/rpc/replicated_add.go @@ -238,7 +238,7 @@ func (s *Service) replicatedAddMediaEventImpl(ctx context.Context, event *Parsed cc.NodeAddresses(), genesisMiniblockHash, common.BytesToHash(ephemeralMb.Header.Hash), - 0, + cc.MiniblockNum, true, ); err != nil { return nil, err