Skip to content

Commit

Permalink
Merge branch 'rbhnt/media-stream-optimization-cache-stream-when-creat…
Browse files Browse the repository at this point in the history
…ed' of github-hnt:towns-protocol/towns into rbhnt/media-stream-optimization-cache-stream-when-created
  • Loading branch information
rbhnt committed Feb 10, 2025
2 parents 70a1f5d + 499801b commit f2d500a
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 29 deletions.
7 changes: 3 additions & 4 deletions core/node/rpc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import (
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
eth_crypto "github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/crypto"
"github.com/river-build/river/core/node/events"
Expand All @@ -31,6 +28,8 @@ import (
. "github.com/river-build/river/core/node/shared"
"github.com/river-build/river/core/node/testutils"
"github.com/river-build/river/core/node/testutils/testfmt"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -1655,7 +1654,7 @@ func TestSyncSubscriptionWithTooSlowClient(t *testing.T) {
testfmt.Logf(t, "subscribe on node %s", node1.address)
syncPos := append(users, channels...)
syncOp, err := river_sync.NewStreamsSyncOperation(
ctx, syncID, node1.address, node1.service.cache, node1.service.nodeRegistry)
ctx, syncID, node1.address, node1.service.cache, node1.service.nodeRegistry, nil)
req.NoError(err, "NewStreamsSyncOperation")

syncOpResult := make(chan error)
Expand Down
24 changes: 22 additions & 2 deletions core/node/rpc/sync/client/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"

. "github.com/river-build/river/core/node/base"
. "github.com/river-build/river/core/node/events"
"github.com/river-build/river/core/node/logging"
. "github.com/river-build/river/core/node/protocol"
. "github.com/river-build/river/core/node/shared"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type localSyncer struct {
Expand All @@ -26,6 +27,9 @@ type localSyncer struct {

activeStreamsMu sync.Mutex
activeStreams map[StreamId]*Stream

// otelTracer is used to trace individual sync Send operations, tracing is disabled if nil
otelTracer trace.Tracer
}

func newLocalSyncer(
Expand All @@ -36,6 +40,7 @@ func newLocalSyncer(
streamCache *StreamCache,
cookies []*SyncCookie,
messages chan<- *SyncStreamsResponse,
otelTracer trace.Tracer,
) (*localSyncer, error) {
return &localSyncer{
globalSyncOpID: globalSyncOpID,
Expand All @@ -46,6 +51,7 @@ func newLocalSyncer(
cookies: cookies,
messages: messages,
activeStreams: make(map[StreamId]*Stream),
otelTracer: otelTracer,
}, nil
}

Expand Down Expand Up @@ -74,14 +80,28 @@ func (s *localSyncer) Address() common.Address {
}

func (s *localSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error {
if s.otelTracer != nil {
var span trace.Span
streamID, _ := StreamIdFromBytes(cookie.GetStreamId())
ctx, span = s.otelTracer.Start(ctx, "localSyncer::AddStream",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

streamID, err := StreamIdFromBytes(cookie.GetStreamId())
if err != nil {
return err
}
return s.addStream(ctx, streamID, cookie)
}

func (s *localSyncer) RemoveStream(_ context.Context, streamID StreamId) (bool, error) {
func (s *localSyncer) RemoveStream(ctx context.Context, streamID StreamId) (bool, error) {
if s.otelTracer != nil {
_, span := s.otelTracer.Start(ctx, "localSyncer::removeStream",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

s.activeStreamsMu.Lock()
defer s.activeStreamsMu.Unlock()

Expand Down
22 changes: 21 additions & 1 deletion core/node/rpc/sync/client/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (

"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/common"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/logging"
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/protocol/protocolconnect"
. "github.com/river-build/river/core/node/shared"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type remoteSyncer struct {
Expand All @@ -31,6 +32,8 @@ type remoteSyncer struct {
streams sync.Map
responseStream *connect.ServerStreamForClient[SyncStreamsResponse]
unsubStream func(streamID StreamId)
// otelTracer is used to trace individual sync Send operations, tracing is disabled if nil
otelTracer trace.Tracer
}

func newRemoteSyncer(
Expand All @@ -42,6 +45,7 @@ func newRemoteSyncer(
cookies []*SyncCookie,
unsubStream func(streamID StreamId),
messages chan<- *SyncStreamsResponse,
otelTracer trace.Tracer,
) (*remoteSyncer, error) {
syncStreamCtx, syncStreamCancel := context.WithCancel(ctx)
responseStream, err := client.SyncStreams(syncStreamCtx, connect.NewRequest(&SyncStreamsRequest{SyncPos: cookies}))
Expand Down Expand Up @@ -82,6 +86,7 @@ func newRemoteSyncer(
responseStream: responseStream,
remoteAddr: remoteAddr,
unsubStream: unsubStream,
otelTracer: otelTracer,
}

s.syncID = responseStream.Msg().GetSyncId()
Expand Down Expand Up @@ -228,6 +233,14 @@ func (s *remoteSyncer) Address() common.Address {
}

func (s *remoteSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error {
if s.otelTracer != nil {
var span trace.Span
streamID, _ := StreamIdFromBytes(cookie.GetStreamId())
ctx, span = s.otelTracer.Start(ctx, "remoteSyncer::AddStream",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

streamID, err := StreamIdFromBytes(cookie.GetStreamId())
if err != nil {
return err
Expand All @@ -246,6 +259,13 @@ func (s *remoteSyncer) AddStream(ctx context.Context, cookie *SyncCookie) error
}

func (s *remoteSyncer) RemoveStream(ctx context.Context, streamID StreamId) (bool, error) {
if s.otelTracer != nil {
var span trace.Span
ctx, span = s.otelTracer.Start(ctx, "remoteSyncer::removeStream",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

_, err := s.client.RemoveStreamFromSync(ctx, connect.NewRequest(&RemoveStreamFromSyncRequest{
SyncId: s.syncID,
StreamId: streamID[:],
Expand Down
23 changes: 19 additions & 4 deletions core/node/rpc/sync/client/syncer_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package client

import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -52,6 +54,8 @@ type (
syncers map[common.Address]StreamsSyncer
// streamID2Syncer maps from a stream to its syncer
streamID2Syncer map[StreamId]StreamsSyncer
// otelTracer is used to trace individual sync Send operations, tracing is disabled if nil
otelTracer trace.Tracer
}

// SyncCookieSet maps from a stream id to a sync cookie
Expand Down Expand Up @@ -87,6 +91,7 @@ func NewSyncers(
nodeRegistry nodes.NodeRegistry,
localNodeAddress common.Address,
cookies StreamCookieSetGroupedByNodeAddress,
otelTracer trace.Tracer,
) (*SyncerSet, <-chan *SyncStreamsResponse, error) {
var (
log = logging.FromCtx(ctx)
Expand All @@ -103,6 +108,7 @@ func NewSyncers(
syncers: syncers,
streamID2Syncer: streamID2Syncer,
messages: messages,
otelTracer: otelTracer,
}

// report these streams as down
Expand All @@ -125,7 +131,8 @@ func NewSyncers(
for nodeAddress, cookieSet := range cookies {
if nodeAddress == localNodeAddress { // stream managed by this node
syncer, err := newLocalSyncer(
ctx, syncID, globalSyncOpCtxCancel, localNodeAddress, streamCache, cookieSet.AsSlice(), messages)
ctx, syncID, globalSyncOpCtxCancel, localNodeAddress,
streamCache, cookieSet.AsSlice(), messages, ss.otelTracer)
if err != nil {
return nil, nil, err
}
Expand All @@ -140,7 +147,8 @@ func NewSyncers(
}

syncer, err := newRemoteSyncer(
ctx, globalSyncOpCtxCancel, syncID, nodeAddress, client, cookieSet.AsSlice(), ss.rmStream, messages)
ctx, globalSyncOpCtxCancel, syncID, nodeAddress, client, cookieSet.AsSlice(),
ss.rmStream, messages, ss.otelTracer)
if err != nil {
log.Warnw("Unable to connect to remote stream when starting stream sync",
"err", err, "remoteNode", nodeAddress)
Expand Down Expand Up @@ -186,6 +194,13 @@ func (ss *SyncerSet) AddStream(
streamID StreamId,
cookie *SyncCookie,
) error {
if ss.otelTracer != nil {
var span trace.Span
ctx, span = ss.otelTracer.Start(ctx, "AddStream",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

ss.muSyncers.Lock()
defer ss.muSyncers.Unlock()

Expand Down Expand Up @@ -214,7 +229,7 @@ func (ss *SyncerSet) AddStream(
if nodeAddress == ss.localNodeAddress {
if syncer, err = newLocalSyncer(
ss.ctx, ss.syncID, ss.globalSyncOpCtxCancel, ss.localNodeAddress,
ss.streamCache, []*SyncCookie{cookie}, ss.messages); err != nil {
ss.streamCache, []*SyncCookie{cookie}, ss.messages, ss.otelTracer); err != nil {
return err
}
} else {
Expand All @@ -224,7 +239,7 @@ func (ss *SyncerSet) AddStream(
}
if syncer, err = newRemoteSyncer(
ss.ctx, ss.globalSyncOpCtxCancel, ss.syncID, nodeAddress, client,
[]*SyncCookie{cookie}, ss.rmStream, ss.messages); err != nil {
[]*SyncCookie{cookie}, ss.rmStream, ss.messages, ss.otelTracer); err != nil {
return err
}
}
Expand Down
9 changes: 4 additions & 5 deletions core/node/rpc/sync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (

"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/common"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

. "github.com/river-build/river/core/node/base"
. "github.com/river-build/river/core/node/events"
"github.com/river-build/river/core/node/nodes"
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/shared"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

type (
Expand Down Expand Up @@ -102,7 +101,7 @@ func (h *handlerImpl) SyncStreams(
req *connect.Request[SyncStreamsRequest],
res *connect.ServerStream[SyncStreamsResponse],
) error {
op, err := NewStreamsSyncOperation(ctx, syncId, h.nodeAddr, h.streamCache, h.nodeRegistry)
op, err := NewStreamsSyncOperation(ctx, syncId, h.nodeAddr, h.streamCache, h.nodeRegistry, h.otelTracer)
if err != nil {
return err
}
Expand Down
24 changes: 23 additions & 1 deletion core/node/rpc/sync/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sync

import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"time"

"github.com/river-build/river/core/node/logging"
Expand Down Expand Up @@ -37,6 +39,8 @@ type (
streamCache *StreamCache
// nodeRegistry is used to get the remote remoteNode endpoint from a thisNodeAddress address
nodeRegistry nodes.NodeRegistry
// otelTracer is used to trace individual sync Send operations, tracing is disabled if nil
otelTracer trace.Tracer
}

// subCommand represents a request to add or remove a stream and ping sync operation
Expand Down Expand Up @@ -68,6 +72,7 @@ func NewStreamsSyncOperation(
node common.Address,
streamCache *StreamCache,
nodeRegistry nodes.NodeRegistry,
otelTracer trace.Tracer,
) (*StreamSyncOperation, error) {
// make the sync operation cancellable for CancelSync
syncOpCtx, cancel := context.WithCancelCause(ctx)
Expand All @@ -81,6 +86,7 @@ func NewStreamsSyncOperation(
commands: make(chan *subCommand, 64),
streamCache: streamCache,
nodeRegistry: nodeRegistry,
otelTracer: otelTracer,
}, nil
}

Expand All @@ -98,7 +104,7 @@ func (syncOp *StreamSyncOperation) Run(

syncers, messages, err := client.NewSyncers(
syncOp.ctx, syncOp.cancel, syncOp.SyncID, syncOp.streamCache,
syncOp.nodeRegistry, syncOp.thisNodeAddress, cookies)
syncOp.nodeRegistry, syncOp.thisNodeAddress, cookies, syncOp.otelTracer)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,6 +184,14 @@ func (syncOp *StreamSyncOperation) AddStreamToSync(
return nil, err
}

if syncOp.otelTracer != nil {
var span trace.Span
streamID, _ := shared.StreamIdFromBytes(req.Msg.GetSyncPos().GetStreamId())
ctx, span = syncOp.otelTracer.Start(ctx, "addStreamToSync",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

cmd := &subCommand{
Ctx: ctx,
AddStreamReq: req,
Expand All @@ -199,6 +213,14 @@ func (syncOp *StreamSyncOperation) RemoveStreamFromSync(
return nil, RiverError(Err_INVALID_ARGUMENT, "invalid syncId").Tag("syncId", req.Msg.GetSyncId())
}

if syncOp.otelTracer != nil {
var span trace.Span
streamID, _ := shared.StreamIdFromBytes(req.Msg.GetStreamId())
ctx, span = syncOp.otelTracer.Start(ctx, "removeStreamFromSync",
trace.WithAttributes(attribute.String("stream", streamID.String())))
defer span.End()
}

cmd := &subCommand{
Ctx: ctx,
RmStreamReq: req,
Expand Down
Loading

0 comments on commit f2d500a

Please sign in to comment.