Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 83 additions & 60 deletions utils/protoproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package utils

import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -27,83 +27,103 @@ import (

func TestProtoProxy(t *testing.T) {
t.Run("basics", func(t *testing.T) {
numGoRoutines := runtime.NumGoroutine()
proxy, numParticipants, freeze := createTestProxy()
var (
refreshInterval = 10 * time.Millisecond
mu = &sync.Mutex{}
)
proxy, numParticipants, freeze := createTestProxy(refreshInterval, mu)
defer proxy.Stop()

mu.Lock()
{
select {
case <-proxy.Updated():
t.Fatal("should not have received an update")
default:
}

select {
case <-proxy.Updated():
t.Fatal("should not have received an update")
default:
// should not have changed, initial value should persist
require.EqualValues(t, 0, proxy.Get().NumParticipants)
}

// should not have changed, initial value should persist
require.EqualValues(t, 0, proxy.Get().NumParticipants)
mu.Unlock()

// immediate change
proxy.MarkDirty(true)
time.Sleep(100 * time.Millisecond)

require.EqualValues(t, 2, numParticipants.Load())
require.EqualValues(t, 1, proxy.Get().NumParticipants)
time.Sleep(refreshInterval)
mu.Lock()
{
require.EqualValues(t, 2, numParticipants.Load())
require.EqualValues(t, 1, proxy.Get().NumParticipants)

// queued updates
proxy.MarkDirty(false)
select {
case <-proxy.Updated():
// consume previous notification
default:
}
require.EqualValues(t, 1, proxy.Get().NumParticipants)

// queued updates
proxy.MarkDirty(false)
select {
case <-proxy.Updated():
// consume previous notification
default:
// freeze and ensure that updates are not triggered
freeze.Store(true)
}
require.EqualValues(t, 1, proxy.Get().NumParticipants)
mu.Unlock()

// freeze and ensure that updates are not triggered
freeze.Store(true)
// freezing and consuming the previous notification to ensure counter does not increase in updateFn
select {
case <-proxy.Updated():
case <-time.After(100 * time.Millisecond):

case <-time.After(refreshInterval):
t.Fatal("should have received an update")
}

// possible that ticker was updated while markDirty queued another update
require.GreaterOrEqual(t, int(proxy.Get().NumParticipants), 2)
np := proxy.Get().NumParticipants
require.GreaterOrEqual(t, int(np), 2)

// trigger another update, but should not get notification as freeze is in place and the model should not have changed
proxy.MarkDirty(false)
time.Sleep(500 * time.Millisecond)
select {
case <-proxy.Updated():
t.Fatal("should not have received an update")
default:
}
require.EqualValues(t, 2, proxy.Get().NumParticipants)

// ensure we didn't leak
proxy.Stop()

for i := 0; i < 10; i++ {
if runtime.NumGoroutine() <= numGoRoutines {
break
mu.Lock()
{
proxy.MarkDirty(false)
select {
case <-proxy.Updated():
t.Fatal("should not have received an update")
default:
require.EqualValues(t, np, proxy.Get().NumParticipants)
}
time.Sleep(100 * time.Millisecond)
}
require.LessOrEqual(t, runtime.NumGoroutine(), numGoRoutines)
mu.Unlock()

})

t.Run("await next update after marking dirty", func(t *testing.T) {
proxy, _, _ := createTestProxy()
var (
refreshInterval = 10 * time.Millisecond
mu = &sync.Mutex{}
)
proxy, _, _ := createTestProxy(refreshInterval, mu)
require.EqualValues(t, 0, proxy.Get().NumParticipants)
<-proxy.MarkDirty(true)
require.EqualValues(t, 1, proxy.Get().NumParticipants)
})

t.Run("await resolves when proxy is stopped", func(t *testing.T) {
proxy, _, _ := createTestProxy()
var (
refreshInterval = 10 * time.Millisecond
mu = &sync.Mutex{}
)
proxy, _, _ := createTestProxy(refreshInterval, mu)
done := proxy.MarkDirty(true)
proxy.Stop()
<-done
})

t.Run("multiple awaits resolve for one update", func(t *testing.T) {
proxy, _, _ := createTestProxy()
var (
refreshInterval = 10 * time.Millisecond
mu = &sync.Mutex{}
)
proxy, _, _ := createTestProxy(refreshInterval, mu)
done0 := proxy.MarkDirty(false)
done1 := proxy.MarkDirty(true)
<-done0
Expand All @@ -112,7 +132,7 @@ func TestProtoProxy(t *testing.T) {
})

t.Run("await resolve when there is no change", func(t *testing.T) {
proxy := NewProtoProxy[*livekit.Room](10*time.Millisecond, func() *livekit.Room { return nil })
proxy := NewProtoProxy(10*time.Millisecond, func() *livekit.Room { return nil })
done := proxy.MarkDirty(true)
time.Sleep(100 * time.Millisecond)
select {
Expand All @@ -123,19 +143,22 @@ func TestProtoProxy(t *testing.T) {
})
}

func createTestProxy() (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) {
func createTestProxy(refreshInterval time.Duration, mu *sync.Mutex) (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) {
// uses an update func that increments numParticipants each time
var numParticipants atomic.Uint32
var freeze atomic.Bool
return NewProtoProxy[*livekit.Room](
10*time.Millisecond,
func() *livekit.Room {
if !freeze.Load() {
defer numParticipants.Add(1)
}
return &livekit.Room{
NumParticipants: numParticipants.Load(),
}
},
), &numParticipants, &freeze
var (
numParticipants atomic.Uint32
freeze atomic.Bool
)
updateFn := func() *livekit.Room {
mu.Lock()
defer mu.Unlock()

if !freeze.Load() {
defer numParticipants.Add(1)
}
return &livekit.Room{
NumParticipants: numParticipants.Load(),
}
}
return NewProtoProxy(refreshInterval, updateFn), &numParticipants, &freeze
}
Loading