From 7d989e4bff99f8c8484e8c478742e5b0d1ced4a7 Mon Sep 17 00:00:00 2001 From: bas-vk Date: Thu, 6 Feb 2025 10:45:47 +0100 Subject: [PATCH 1/2] node: add opentel tracing to ModifySync (#2256) --- core/node/rpc/service_test.go | 7 +++---- core/node/rpc/sync/client/local.go | 24 ++++++++++++++++++++++-- core/node/rpc/sync/client/remote.go | 22 +++++++++++++++++++++- core/node/rpc/sync/client/syncer_set.go | 23 +++++++++++++++++++---- core/node/rpc/sync/handler.go | 9 ++++----- core/node/rpc/sync/operation.go | 24 +++++++++++++++++++++++- 6 files changed, 92 insertions(+), 17 deletions(-) diff --git a/core/node/rpc/service_test.go b/core/node/rpc/service_test.go index 0a2ec6571a..5346193099 100644 --- a/core/node/rpc/service_test.go +++ b/core/node/rpc/service_test.go @@ -19,9 +19,6 @@ import ( "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" eth_crypto "github.com/ethereum/go-ethereum/crypto" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - . "github.com/river-build/river/core/node/base" "github.com/river-build/river/core/node/crypto" "github.com/river-build/river/core/node/events" @@ -32,6 +29,8 @@ import ( . "github.com/river-build/river/core/node/shared" "github.com/river-build/river/core/node/testutils" "github.com/river-build/river/core/node/testutils/testfmt" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) func TestMain(m *testing.M) { @@ -1656,7 +1655,7 @@ func TestSyncSubscriptionWithTooSlowClient(t *testing.T) { testfmt.Logf(t, "subscribe on node %s", node1.address) syncPos := append(users, channels...) syncOp, err := river_sync.NewStreamsSyncOperation( - ctx, syncID, node1.address, node1.service.cache, node1.service.nodeRegistry) + ctx, syncID, node1.address, node1.service.cache, node1.service.nodeRegistry, nil) req.NoError(err, "NewStreamsSyncOperation") syncOpResult := make(chan error) diff --git a/core/node/rpc/sync/client/local.go b/core/node/rpc/sync/client/local.go index c421cdf994..365763152d 100644 --- a/core/node/rpc/sync/client/local.go +++ b/core/node/rpc/sync/client/local.go @@ -5,12 +5,13 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" - . "github.com/river-build/river/core/node/base" . "github.com/river-build/river/core/node/events" "github.com/river-build/river/core/node/logging" . "github.com/river-build/river/core/node/protocol" . "github.com/river-build/river/core/node/shared" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type localSyncer struct { @@ -26,6 +27,9 @@ type localSyncer struct { activeStreamsMu sync.Mutex activeStreams map[StreamId]*Stream + + // otelTracer is used to trace individual sync Send operations, tracing is disabled if nil + otelTracer trace.Tracer } func newLocalSyncer( @@ -36,6 +40,7 @@ func newLocalSyncer( streamCache *StreamCache, cookies []*SyncCookie, messages chan<- *SyncStreamsResponse, + otelTracer trace.Tracer, ) (*localSyncer, error) { return &localSyncer{ globalSyncOpID: globalSyncOpID, @@ -46,6 +51,7 @@ func newLocalSyncer( cookies: cookies, messages: messages, activeStreams: make(map[StreamId]*Stream), + otelTracer: otelTracer, }, nil } @@ -74,6 +80,14 @@ func (s *localSyncer) Address() common.Address { } func (s *localSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error { + if s.otelTracer != nil { + var span trace.Span + streamID, _ := StreamIdFromBytes(cookie.GetStreamId()) + ctx, span = s.otelTracer.Start(ctx, "localSyncer::AddStream", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + streamID, err := StreamIdFromBytes(cookie.GetStreamId()) if err != nil { return err @@ -81,7 +95,13 @@ func (s *localSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error { return s.addStream(ctx, streamID, cookie) } -func (s *localSyncer) RemoveStream(_ context.Context, streamID StreamId) (bool, error) { +func (s *localSyncer) RemoveStream(ctx context.Context, streamID StreamId) (bool, error) { + if s.otelTracer != nil { + _, span := s.otelTracer.Start(ctx, "localSyncer::removeStream", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + s.activeStreamsMu.Lock() defer s.activeStreamsMu.Unlock() diff --git a/core/node/rpc/sync/client/remote.go b/core/node/rpc/sync/client/remote.go index 60a519d055..5d0c55e032 100644 --- a/core/node/rpc/sync/client/remote.go +++ b/core/node/rpc/sync/client/remote.go @@ -10,12 +10,13 @@ import ( "connectrpc.com/connect" "github.com/ethereum/go-ethereum/common" - . "github.com/river-build/river/core/node/base" "github.com/river-build/river/core/node/logging" . "github.com/river-build/river/core/node/protocol" "github.com/river-build/river/core/node/protocol/protocolconnect" . "github.com/river-build/river/core/node/shared" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type remoteSyncer struct { @@ -31,6 +32,8 @@ type remoteSyncer struct { streams sync.Map responseStream *connect.ServerStreamForClient[SyncStreamsResponse] unsubStream func(streamID StreamId) + // otelTracer is used to trace individual sync Send operations, tracing is disabled if nil + otelTracer trace.Tracer } func newRemoteSyncer( @@ -42,6 +45,7 @@ func newRemoteSyncer( cookies []*SyncCookie, unsubStream func(streamID StreamId), messages chan<- *SyncStreamsResponse, + otelTracer trace.Tracer, ) (*remoteSyncer, error) { syncStreamCtx, syncStreamCancel := context.WithCancel(ctx) responseStream, err := client.SyncStreams(syncStreamCtx, connect.NewRequest(&SyncStreamsRequest{SyncPos: cookies})) @@ -82,6 +86,7 @@ func newRemoteSyncer( responseStream: responseStream, remoteAddr: remoteAddr, unsubStream: unsubStream, + otelTracer: otelTracer, } s.syncID = responseStream.Msg().GetSyncId() @@ -228,6 +233,14 @@ func (s *remoteSyncer) Address() common.Address { } func (s *remoteSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error { + if s.otelTracer != nil { + var span trace.Span + streamID, _ := StreamIdFromBytes(cookie.GetStreamId()) + ctx, span = s.otelTracer.Start(ctx, "remoteSyncer::AddStream", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + streamID, err := StreamIdFromBytes(cookie.GetStreamId()) if err != nil { return err @@ -246,6 +259,13 @@ func (s *remoteSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error } func (s *remoteSyncer) RemoveStream(ctx context.Context, streamID StreamId) (bool, error) { + if s.otelTracer != nil { + var span trace.Span + ctx, span = s.otelTracer.Start(ctx, "remoteSyncer::removeStream", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + _, err := s.client.RemoveStreamFromSync(ctx, connect.NewRequest(&RemoveStreamFromSyncRequest{ SyncId: s.syncID, StreamId: streamID[:], diff --git a/core/node/rpc/sync/client/syncer_set.go b/core/node/rpc/sync/client/syncer_set.go index a2dbbdaf3c..8f8055d4f5 100644 --- a/core/node/rpc/sync/client/syncer_set.go +++ b/core/node/rpc/sync/client/syncer_set.go @@ -2,6 +2,8 @@ package client import ( "context" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "sync" "github.com/ethereum/go-ethereum/common" @@ -52,6 +54,8 @@ type ( syncers map[common.Address]StreamsSyncer // streamID2Syncer maps from a stream to its syncer streamID2Syncer map[StreamId]StreamsSyncer + // otelTracer is used to trace individual sync Send operations, tracing is disabled if nil + otelTracer trace.Tracer } // SyncCookieSet maps from a stream id to a sync cookie @@ -87,6 +91,7 @@ func NewSyncers( nodeRegistry nodes.NodeRegistry, localNodeAddress common.Address, cookies StreamCookieSetGroupedByNodeAddress, + otelTracer trace.Tracer, ) (*SyncerSet, <-chan *SyncStreamsResponse, error) { var ( log = logging.FromCtx(ctx) @@ -103,6 +108,7 @@ func NewSyncers( syncers: syncers, streamID2Syncer: streamID2Syncer, messages: messages, + otelTracer: otelTracer, } // report these streams as down @@ -125,7 +131,8 @@ func NewSyncers( for nodeAddress, cookieSet := range cookies { if nodeAddress == localNodeAddress { // stream managed by this node syncer, err := newLocalSyncer( - ctx, syncID, globalSyncOpCtxCancel, localNodeAddress, streamCache, cookieSet.AsSlice(), messages) + ctx, syncID, globalSyncOpCtxCancel, localNodeAddress, + streamCache, cookieSet.AsSlice(), messages, ss.otelTracer) if err != nil { return nil, nil, err } @@ -140,7 +147,8 @@ func NewSyncers( } syncer, err := newRemoteSyncer( - ctx, globalSyncOpCtxCancel, syncID, nodeAddress, client, cookieSet.AsSlice(), ss.rmStream, messages) + ctx, globalSyncOpCtxCancel, syncID, nodeAddress, client, cookieSet.AsSlice(), + ss.rmStream, messages, ss.otelTracer) if err != nil { log.Warnw("Unable to connect to remote stream when starting stream sync", "err", err, "remoteNode", nodeAddress) @@ -186,6 +194,13 @@ func (ss *SyncerSet) AddStream( streamID StreamId, cookie *SyncCookie, ) error { + if ss.otelTracer != nil { + var span trace.Span + ctx, span = ss.otelTracer.Start(ctx, "AddStream", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + ss.muSyncers.Lock() defer ss.muSyncers.Unlock() @@ -214,7 +229,7 @@ func (ss *SyncerSet) AddStream( if nodeAddress == ss.localNodeAddress { if syncer, err = newLocalSyncer( ss.ctx, ss.syncID, ss.globalSyncOpCtxCancel, ss.localNodeAddress, - ss.streamCache, []*SyncCookie{cookie}, ss.messages); err != nil { + ss.streamCache, []*SyncCookie{cookie}, ss.messages, ss.otelTracer); err != nil { return err } } else { @@ -224,7 +239,7 @@ func (ss *SyncerSet) AddStream( } if syncer, err = newRemoteSyncer( ss.ctx, ss.globalSyncOpCtxCancel, ss.syncID, nodeAddress, client, - []*SyncCookie{cookie}, ss.rmStream, ss.messages); err != nil { + []*SyncCookie{cookie}, ss.rmStream, ss.messages, ss.otelTracer); err != nil { return err } } diff --git a/core/node/rpc/sync/handler.go b/core/node/rpc/sync/handler.go index 229d47ed53..093c020d94 100644 --- a/core/node/rpc/sync/handler.go +++ b/core/node/rpc/sync/handler.go @@ -6,15 +6,14 @@ import ( "connectrpc.com/connect" "github.com/ethereum/go-ethereum/common" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - . "github.com/river-build/river/core/node/base" . "github.com/river-build/river/core/node/events" "github.com/river-build/river/core/node/nodes" . "github.com/river-build/river/core/node/protocol" "github.com/river-build/river/core/node/shared" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type ( @@ -102,7 +101,7 @@ func (h *handlerImpl) SyncStreams( req *connect.Request[SyncStreamsRequest], res *connect.ServerStream[SyncStreamsResponse], ) error { - op, err := NewStreamsSyncOperation(ctx, syncId, h.nodeAddr, h.streamCache, h.nodeRegistry) + op, err := NewStreamsSyncOperation(ctx, syncId, h.nodeAddr, h.streamCache, h.nodeRegistry, h.otelTracer) if err != nil { return err } diff --git a/core/node/rpc/sync/operation.go b/core/node/rpc/sync/operation.go index 70830fc05e..35141ccaed 100644 --- a/core/node/rpc/sync/operation.go +++ b/core/node/rpc/sync/operation.go @@ -2,6 +2,8 @@ package sync import ( "context" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "time" "github.com/river-build/river/core/node/logging" @@ -37,6 +39,8 @@ type ( streamCache *StreamCache // nodeRegistry is used to get the remote remoteNode endpoint from a thisNodeAddress address nodeRegistry nodes.NodeRegistry + // otelTracer is used to trace individual sync Send operations, tracing is disabled if nil + otelTracer trace.Tracer } // subCommand represents a request to add or remove a stream and ping sync operation @@ -68,6 +72,7 @@ func NewStreamsSyncOperation( node common.Address, streamCache *StreamCache, nodeRegistry nodes.NodeRegistry, + otelTracer trace.Tracer, ) (*StreamSyncOperation, error) { // make the sync operation cancellable for CancelSync syncOpCtx, cancel := context.WithCancelCause(ctx) @@ -81,6 +86,7 @@ func NewStreamsSyncOperation( commands: make(chan *subCommand, 64), streamCache: streamCache, nodeRegistry: nodeRegistry, + otelTracer: otelTracer, }, nil } @@ -98,7 +104,7 @@ func (syncOp *StreamSyncOperation) Run( syncers, messages, err := client.NewSyncers( syncOp.ctx, syncOp.cancel, syncOp.SyncID, syncOp.streamCache, - syncOp.nodeRegistry, syncOp.thisNodeAddress, cookies) + syncOp.nodeRegistry, syncOp.thisNodeAddress, cookies, syncOp.otelTracer) if err != nil { return err } @@ -178,6 +184,14 @@ func (syncOp *StreamSyncOperation) AddStreamToSync( return nil, err } + if syncOp.otelTracer != nil { + var span trace.Span + streamID, _ := shared.StreamIdFromBytes(req.Msg.GetSyncPos().GetStreamId()) + ctx, span = syncOp.otelTracer.Start(ctx, "addStreamToSync", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + cmd := &subCommand{ Ctx: ctx, AddStreamReq: req, @@ -199,6 +213,14 @@ func (syncOp *StreamSyncOperation) RemoveStreamFromSync( return nil, RiverError(Err_INVALID_ARGUMENT, "invalid syncId").Tag("syncId", req.Msg.GetSyncId()) } + if syncOp.otelTracer != nil { + var span trace.Span + streamID, _ := shared.StreamIdFromBytes(req.Msg.GetStreamId()) + ctx, span = syncOp.otelTracer.Start(ctx, "removeStreamFromSync", + trace.WithAttributes(attribute.String("stream", streamID.String()))) + defer span.End() + } + cmd := &subCommand{ Ctx: ctx, RmStreamReq: req, From 96796c1f6764b60faad858faa59080a3bb93c20d Mon Sep 17 00:00:00 2001 From: rbhnt Date: Fri, 7 Feb 2025 09:31:31 +0000 Subject: [PATCH 2/2] Media stream optimization: Introduce feature flag to sdk (#2274) --- packages/sdk/src/client.ts | 78 ++++ packages/sdk/src/tests/multi_ne/media.test.ts | 332 +++++++++++++++++- packages/stream-metadata/tests/testUtils.ts | 4 +- 3 files changed, 402 insertions(+), 12 deletions(-) diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 79ec9b7623..a17f3e13bf 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -834,6 +834,71 @@ export class Client userId: string | undefined, chunkCount: number, streamSettings?: PlainMessage, + ): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> { + assert(this.userStreamId !== undefined, 'userStreamId must be set') + if (!channelId && !spaceId && !userId) { + throw Error('channelId, spaceId or userId must be set') + } + if (spaceId) { + assert(isSpaceStreamId(spaceId), 'spaceId must be a valid streamId') + } + if (channelId) { + assert( + isChannelStreamId(channelId) || + isDMChannelStreamId(channelId) || + isGDMChannelStreamId(channelId), + 'channelId must be a valid streamId', + ) + } + if (userId) { + assert(isUserId(userId), 'userId must be a valid userId') + } + + const streamId = makeUniqueMediaStreamId() + + this.logCall('createMedia', channelId ?? spaceId, userId, streamId) + const inceptionEvent = await makeEvent( + this.signerContext, + make_MediaPayload_Inception({ + streamId: streamIdAsBytes(streamId), + channelId: channelId ? streamIdAsBytes(channelId) : undefined, + spaceId: spaceId ? streamIdAsBytes(spaceId) : undefined, + userId: userId ? addressFromUserId(userId) : undefined, + chunkCount, + settings: streamSettings, + }), + ) + + const response = await this.rpcClient.createStream({ + events: [inceptionEvent], + streamId: streamIdAsBytes(streamId), + }) + + const unpackedResponse = await unpackStream(response.stream, this.unpackEnvelopeOpts) + const streamView = new StreamStateView(this.userId, streamId) + streamView.initialize( + unpackedResponse.streamAndCookie.nextSyncCookie, + unpackedResponse.streamAndCookie.events, + unpackedResponse.snapshot, + unpackedResponse.streamAndCookie.miniblocks, + [], + unpackedResponse.prevSnapshotMiniblockNum, + undefined, + [], + undefined, + ) + + check(isDefined(streamView.prevMiniblockHash), 'prevMiniblockHash must be defined') + + return { streamId: streamId, prevMiniblockHash: streamView.prevMiniblockHash } + } + + async createMediaStreamNew( + channelId: string | Uint8Array | undefined, + spaceId: string | Uint8Array | undefined, + userId: string | undefined, + chunkCount: number, + streamSettings?: PlainMessage, ): Promise<{ creationCookie: CreationCookie }> { assert(this.userStreamId !== undefined, 'userStreamId must be set') if (!channelId && !spaceId && !userId) { @@ -1751,6 +1816,19 @@ export class Client } async sendMediaPayload( + streamId: string, + data: Uint8Array, + chunkIndex: number, + prevMiniblockHash: Uint8Array, + ): Promise<{ prevMiniblockHash: Uint8Array; eventId: string }> { + const payload = make_MediaPayload_Chunk({ + data: data, + chunkIndex: chunkIndex, + }) + return this.makeEventWithHashAndAddToStream(streamId, payload, prevMiniblockHash) + } + + async sendMediaPayloadNew( creationCookie: CreationCookie, last: boolean, data: Uint8Array, diff --git a/packages/sdk/src/tests/multi_ne/media.test.ts b/packages/sdk/src/tests/multi_ne/media.test.ts index 563a54f5fb..a5b6f081fa 100644 --- a/packages/sdk/src/tests/multi_ne/media.test.ts +++ b/packages/sdk/src/tests/multi_ne/media.test.ts @@ -23,7 +23,7 @@ describe('mediaTests', () => { async function bobCreateMediaStream( chunkCount: number, - ): Promise<{ creationCookie: CreationCookie }> { + ): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> { const spaceId = makeUniqueSpaceStreamId() await expect(bobsClient.createSpace(spaceId)).resolves.not.toThrow() @@ -35,6 +35,318 @@ describe('mediaTests', () => { return bobsClient.createMediaStream(channelId, spaceId, undefined, chunkCount) } + async function bobSendMediaPayloads( + streamId: string, + chunks: number, + prevMiniblockHash: Uint8Array, + ): Promise { + let prevHash = prevMiniblockHash + for (let i = 0; i < chunks; i++) { + const chunk = new Uint8Array(100) + // Create novel chunk content for testing purposes + chunk.fill(i, 0, 100) + const result = await bobsClient.sendMediaPayload(streamId, chunk, i, prevHash) + prevHash = result.prevMiniblockHash + } + return prevHash + } + + async function bobSendEncryptedMediaPayload( + streamId: string, + data: Uint8Array, + key: Uint8Array, + iv: Uint8Array, + prevMiniblockHash: Uint8Array, + ): Promise { + let prevHash = prevMiniblockHash + const { ciphertext } = await encryptAESGCM(data, key, iv) + const result = await bobsClient.sendMediaPayload(streamId, ciphertext, 0, prevHash) + prevHash = result.prevMiniblockHash + return prevHash + } + + function createTestMediaChunks(chunks: number): Uint8Array { + const data: Uint8Array = new Uint8Array(10 * chunks) + for (let i = 0; i < chunks; i++) { + const start = i * 10 + const end = start + 10 + data.fill(i, start, end) + } + return data + } + + async function bobCreateSpaceMediaStream( + spaceId: string, + chunkCount: number, + ): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> { + await expect(bobsClient.createSpace(spaceId)).resolves.not.toThrow() + const mediaInfo = await bobsClient.createMediaStream( + undefined, + spaceId, + undefined, + chunkCount, + ) + return mediaInfo + } + + test('clientCanCreateMediaStream', async () => { + await expect(bobCreateMediaStream(10)).resolves.not.toThrow() + }) + + test('clientCanCreateSpaceMediaStream', async () => { + const spaceId = makeUniqueSpaceStreamId() + await expect(bobCreateSpaceMediaStream(spaceId, 10)).resolves.not.toThrow() + }) + + test('clientCanSendMediaPayload', async () => { + const mediaStreamInfo = await bobCreateMediaStream(10) + await bobSendMediaPayloads(mediaStreamInfo.streamId, 10, mediaStreamInfo.prevMiniblockHash) + }) + + test('clientCanSendSpaceMediaPayload', async () => { + const spaceId = makeUniqueSpaceStreamId() + const mediaStreamInfo = await bobCreateSpaceMediaStream(spaceId, 10) + await expect( + bobSendMediaPayloads(mediaStreamInfo.streamId, 10, mediaStreamInfo.prevMiniblockHash), + ).resolves.not.toThrow() + }) + + test('clientCanSendEncryptedDerivedAesGmPayload', async () => { + const spaceId = makeUniqueSpaceStreamId() + const mediaStreamInfo = await bobCreateSpaceMediaStream(spaceId, 3) + const { iv, key } = await deriveKeyAndIV(spaceId) + const data = createTestMediaChunks(2) + await expect( + bobSendEncryptedMediaPayload( + mediaStreamInfo.streamId, + data, + key, + iv, + mediaStreamInfo.prevMiniblockHash, + ), + ).resolves.not.toThrow() + }) + + test('clientCanDownloadEncryptedDerivedAesGmPayload', async () => { + const spaceId = makeUniqueSpaceStreamId() + const mediaStreamInfo = await bobCreateSpaceMediaStream(spaceId, 2) + const { iv, key } = await deriveKeyAndIV(spaceId) + const data = createTestMediaChunks(2) + await bobSendEncryptedMediaPayload( + mediaStreamInfo.streamId, + data, + key, + iv, + mediaStreamInfo.prevMiniblockHash, + ) + const decryptedChunks = await bobsClient.getMediaPayload(mediaStreamInfo.streamId, key, iv) + expect(decryptedChunks).toEqual(data) + }) + + test('chunkIndexNeedsToBeWithinBounds', async () => { + const result = await bobCreateMediaStream(10) + const chunk = new Uint8Array(100) + await expect( + bobsClient.sendMediaPayload(result.streamId, chunk, -1, result.prevMiniblockHash), + ).rejects.toThrow() + await expect( + bobsClient.sendMediaPayload(result.streamId, chunk, 11, result.prevMiniblockHash), + ).rejects.toThrow() + }) + + test('chunkSizeCanBeAtLimit', async () => { + const result = await bobCreateMediaStream(10) + const chunk = new Uint8Array(500000) + await expect( + bobsClient.sendMediaPayload(result.streamId, chunk, 0, result.prevMiniblockHash), + ).resolves.not.toThrow() + }) + + test('chunkSizeNeedsToBeWithinLimit', async () => { + const result = await bobCreateMediaStream(10) + const chunk = new Uint8Array(500001) + await expect( + bobsClient.sendMediaPayload(result.streamId, chunk, 0, result.prevMiniblockHash), + ).rejects.toThrow() + }) + + test('chunkCountNeedsToBeWithinLimit', async () => { + await expect(bobCreateMediaStream(11)).rejects.toThrow() + }) + + test('clientCanOnlyPostToTheirOwnMediaStream', async () => { + const result = await bobCreateMediaStream(10) + const chunk = new Uint8Array(100) + + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + + await expect( + alicesClient.sendMediaPayload(result.streamId, chunk, 5, result.prevMiniblockHash), + ).rejects.toThrow() + await alicesClient.stop() + }) + + test('clientCanOnlyPostToTheirOwnPublicMediaStream', async () => { + const spaceId = makeUniqueSpaceStreamId() + const result = await bobCreateSpaceMediaStream(spaceId, 10) + const chunk = new Uint8Array(100) + + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + + await expect( + alicesClient.sendMediaPayload(result.streamId, chunk, 5, result.prevMiniblockHash), + ).rejects.toThrow() + await alicesClient.stop() + }) + + test('channelNeedsToExistBeforeCreatingMediaStream', async () => { + const nonExistentSpaceId = makeUniqueSpaceStreamId() + const nonExistentChannelId = makeUniqueChannelStreamId(nonExistentSpaceId) + await expect( + bobsClient.createMediaStream(nonExistentChannelId, nonExistentSpaceId, undefined, 10), + ).rejects.toThrow() + }) + + test('dmChannelNeedsToExistBeforeCreatingMediaStream', async () => { + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + + const nonExistentChannelId = makeDMStreamId(bobsClient.userId, alicesClient.userId) + await expect( + bobsClient.createMediaStream(nonExistentChannelId, undefined, undefined, 10), + ).rejects.toThrow() + await alicesClient.stop() + }) + + test('userCanUploadMediaToDmIfMember', async () => { + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + + const { streamId } = await bobsClient.createDMChannel(alicesClient.userId) + await expect( + bobsClient.createMediaStream(streamId, undefined, undefined, 10), + ).resolves.not.toThrow() + await expect( + alicesClient.createMediaStream(streamId, undefined, undefined, 10), + ).resolves.not.toThrow() + await alicesClient.stop() + }) + + test('userCanUploadMediaToGdmIfMember', async () => { + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + + const charliesClient = await makeTestClient() + await charliesClient.initializeUser() + charliesClient.startSync() + + const { streamId } = await bobsClient.createGDMChannel([ + alicesClient.userId, + charliesClient.userId, + ]) + await expect( + bobsClient.createMediaStream(streamId, undefined, undefined, 10), + ).resolves.not.toThrow() + await alicesClient.stop() + await charliesClient.stop() + }) + + test('userCannotUploadMediaToDmUnlessMember', async () => { + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + + const charliesClient = await makeTestClient() + await charliesClient.initializeUser() + charliesClient.startSync() + + const { streamId } = await bobsClient.createDMChannel(alicesClient.userId) + + await expect( + charliesClient.createMediaStream(streamId, undefined, undefined, 10), + ).rejects.toThrow() + await alicesClient.stop() + await charliesClient.stop() + }) + + // This test is flaky because there is a bug in GetStreamEx where sometimes the miniblock is not + // finalized before the client tries to fetch it. This is a known issue, see HNT-5291. + test.skip('mediaStreamGetStreamEx', async () => { + const { streamId, prevMiniblockHash } = await bobCreateMediaStream(10) + // Send a series of media chunks + await bobSendMediaPayloads(streamId, 10, prevMiniblockHash) + // Force server to flush minipool events into a block + await bobsClient.rpcClient.info( + new InfoRequest({ + debug: ['make_miniblock', streamId], + }), + { timeoutMs: 10000 }, + ) + + // Grab stream from both endpoints + const stream = await bobsClient.getStream(streamId) + const streamEx = await bobsClient.getStreamEx(streamId) + + // Assert exact content equality with bobSendMediaPayloads + expect(streamEx.mediaContent.info).toBeDefined() + expect(streamEx.mediaContent.info?.chunks.length).toEqual(10) + for (let i = 0; i < 10; i++) { + const chunk = new Uint8Array(100) + chunk.fill(i, 0, 100) + expect(streamEx.mediaContent.info?.chunks[i]).toBeDefined() + expect(streamEx.mediaContent.info?.chunks[i]).toEqual(chunk) + } + + // Assert equality of mediaContent between getStream and getStreamEx + // use-chunked-media.ts utilizes the tream.mediaContent.info property, so equality here + // will result in the same behavior in the client app. + expect(stream.mediaContent).toEqual(streamEx.mediaContent) + }) + + test('userMediaStream', async () => { + const alicesClient = await makeTestClient() + await alicesClient.initializeUser() + alicesClient.startSync() + await expect( + alicesClient.createMediaStream(undefined, undefined, alicesClient.userId, 10), + ).resolves.not.toThrow() + }) +}) + +describe('mediaTestsNew', () => { + let bobsClient: Client + + beforeEach(async () => { + bobsClient = await makeTestClient() + await bobsClient.initializeUser() + bobsClient.startSync() + }) + + afterEach(async () => { + await bobsClient.stop() + }) + + async function bobCreateMediaStream( + chunkCount: number, + ): Promise<{ creationCookie: CreationCookie }> { + const spaceId = makeUniqueSpaceStreamId() + await expect(bobsClient.createSpace(spaceId)).resolves.not.toThrow() + + const channelId = makeUniqueChannelStreamId(spaceId) + await expect( + bobsClient.createChannel(spaceId, 'Channel', 'Topic', channelId), + ).resolves.not.toThrow() + + return bobsClient.createMediaStreamNew(channelId, spaceId, undefined, chunkCount) + } + async function bobSendMediaPayloads( creationCookie: CreationCookie, chunks: number, @@ -45,7 +357,7 @@ describe('mediaTests', () => { // Create novel chunk content for testing purposes chunk.fill(i, 0, 100) const last = i == chunks - 1 - const result = await bobsClient.sendMediaPayload(cc, last, chunk, i) + const result = await bobsClient.sendMediaPayloadNew(cc, last, chunk, i) cc = new CreationCookie({ ...cc, prevMiniblockHash: new Uint8Array(result.creationCookie.prevMiniblockHash), @@ -63,7 +375,7 @@ describe('mediaTests', () => { iv: Uint8Array, ): Promise { const { ciphertext } = await encryptAESGCM(data, key, iv) - const result = await bobsClient.sendMediaPayload(creationCookie, last, ciphertext, 0) + const result = await bobsClient.sendMediaPayloadNew(creationCookie, last, ciphertext, 0) return result.creationCookie } @@ -82,7 +394,7 @@ describe('mediaTests', () => { chunkCount: number, ): Promise<{ creationCookie: CreationCookie }> { await expect(bobsClient.createSpace(spaceId)).resolves.not.toThrow() - const mediaInfo = await bobsClient.createMediaStream( + const mediaInfo = await bobsClient.createMediaStreamNew( undefined, spaceId, undefined, @@ -143,10 +455,10 @@ describe('mediaTests', () => { const result = await bobCreateMediaStream(10) const chunk = new Uint8Array(100) await expect( - bobsClient.sendMediaPayload(result.creationCookie, false, chunk, -1), + bobsClient.sendMediaPayloadNew(result.creationCookie, false, chunk, -1), ).rejects.toThrow() await expect( - bobsClient.sendMediaPayload(result.creationCookie, false, chunk, 11), + bobsClient.sendMediaPayloadNew(result.creationCookie, false, chunk, 11), ).rejects.toThrow() }) @@ -154,7 +466,7 @@ describe('mediaTests', () => { const result = await bobCreateMediaStream(10) const chunk = new Uint8Array(500000) await expect( - bobsClient.sendMediaPayload(result.creationCookie, false, chunk, 0), + bobsClient.sendMediaPayloadNew(result.creationCookie, false, chunk, 0), ).resolves.not.toThrow() }) @@ -162,7 +474,7 @@ describe('mediaTests', () => { const result = await bobCreateMediaStream(10) const chunk = new Uint8Array(500001) await expect( - bobsClient.sendMediaPayload(result.creationCookie, false, chunk, 0), + bobsClient.sendMediaPayloadNew(result.creationCookie, false, chunk, 0), ).rejects.toThrow() }) @@ -179,7 +491,7 @@ describe('mediaTests', () => { alicesClient.startSync() await expect( - alicesClient.sendMediaPayload(result.creationCookie, false, chunk, 5), + alicesClient.sendMediaPayloadNew(result.creationCookie, false, chunk, 5), ).rejects.toThrow() await alicesClient.stop() }) @@ -194,7 +506,7 @@ describe('mediaTests', () => { alicesClient.startSync() await expect( - alicesClient.sendMediaPayload(result.creationCookie, false, chunk, 5), + alicesClient.sendMediaPayloadNew(result.creationCookie, false, chunk, 5), ).rejects.toThrow() await alicesClient.stop() }) diff --git a/packages/stream-metadata/tests/testUtils.ts b/packages/stream-metadata/tests/testUtils.ts index abf2d295ef..c1cf3d5829 100644 --- a/packages/stream-metadata/tests/testUtils.ts +++ b/packages/stream-metadata/tests/testUtils.ts @@ -131,7 +131,7 @@ export async function encryptAndSendMediaPayload( const { ciphertext, secretKey, iv } = await encryptAESGCM(data) const chunkCount = Math.ceil(ciphertext.length / chunkSize) - const mediaStreamInfo = await client.createMediaStream( + const mediaStreamInfo = await client.createMediaStreamNew( undefined, spaceId, undefined, @@ -146,7 +146,7 @@ export async function encryptAndSendMediaPayload( for (let i = 0, index = 0; i < ciphertext.length; i += chunkSize, index++) { const chunk = ciphertext.slice(i, i + chunkSize) const last = ciphertext.length - i <= chunkSize - const { creationCookie } = await client.sendMediaPayload(cc, last, chunk, index) + const { creationCookie } = await client.sendMediaPayloadNew(cc, last, chunk, index) cc = new CreationCookie({ ...cc,