diff --git a/utils/protoproxy_test.go b/utils/protoproxy_test.go index 2363f289e..cda8b8ea2 100644 --- a/utils/protoproxy_test.go +++ b/utils/protoproxy_test.go @@ -15,7 +15,7 @@ package utils import ( - "runtime" + "sync" "sync/atomic" "testing" "time" @@ -27,83 +27,103 @@ import ( func TestProtoProxy(t *testing.T) { t.Run("basics", func(t *testing.T) { - numGoRoutines := runtime.NumGoroutine() - proxy, numParticipants, freeze := createTestProxy() + var ( + refreshInterval = 10 * time.Millisecond + mu = &sync.Mutex{} + ) + proxy, numParticipants, freeze := createTestProxy(refreshInterval, mu) + defer proxy.Stop() + + mu.Lock() + { + select { + case <-proxy.Updated(): + t.Fatal("should not have received an update") + default: + } - select { - case <-proxy.Updated(): - t.Fatal("should not have received an update") - default: + // should not have changed, initial value should persist + require.EqualValues(t, 0, proxy.Get().NumParticipants) } - - // should not have changed, initial value should persist - require.EqualValues(t, 0, proxy.Get().NumParticipants) + mu.Unlock() // immediate change proxy.MarkDirty(true) - time.Sleep(100 * time.Millisecond) - - require.EqualValues(t, 2, numParticipants.Load()) - require.EqualValues(t, 1, proxy.Get().NumParticipants) + time.Sleep(refreshInterval) + mu.Lock() + { + require.EqualValues(t, 2, numParticipants.Load()) + require.EqualValues(t, 1, proxy.Get().NumParticipants) + + // queued updates + proxy.MarkDirty(false) + select { + case <-proxy.Updated(): + // consume previous notification + default: + } + require.EqualValues(t, 1, proxy.Get().NumParticipants) - // queued updates - proxy.MarkDirty(false) - select { - case <-proxy.Updated(): - // consume previous notification - default: + // freeze and ensure that updates are not triggered + freeze.Store(true) } - require.EqualValues(t, 1, proxy.Get().NumParticipants) + mu.Unlock() - // freeze and ensure that updates are not triggered - freeze.Store(true) // freezing and consuming the previous notification to ensure counter does not increase in updateFn select { case <-proxy.Updated(): - case <-time.After(100 * time.Millisecond): + + case <-time.After(refreshInterval): t.Fatal("should have received an update") } + // possible that ticker was updated while markDirty queued another update - require.GreaterOrEqual(t, int(proxy.Get().NumParticipants), 2) + np := proxy.Get().NumParticipants + require.GreaterOrEqual(t, int(np), 2) // trigger another update, but should not get notification as freeze is in place and the model should not have changed - proxy.MarkDirty(false) - time.Sleep(500 * time.Millisecond) - select { - case <-proxy.Updated(): - t.Fatal("should not have received an update") - default: - } - require.EqualValues(t, 2, proxy.Get().NumParticipants) - - // ensure we didn't leak - proxy.Stop() - - for i := 0; i < 10; i++ { - if runtime.NumGoroutine() <= numGoRoutines { - break + mu.Lock() + { + proxy.MarkDirty(false) + select { + case <-proxy.Updated(): + t.Fatal("should not have received an update") + default: + require.EqualValues(t, np, proxy.Get().NumParticipants) } - time.Sleep(100 * time.Millisecond) } - require.LessOrEqual(t, runtime.NumGoroutine(), numGoRoutines) + mu.Unlock() + }) t.Run("await next update after marking dirty", func(t *testing.T) { - proxy, _, _ := createTestProxy() + var ( + refreshInterval = 10 * time.Millisecond + mu = &sync.Mutex{} + ) + proxy, _, _ := createTestProxy(refreshInterval, mu) require.EqualValues(t, 0, proxy.Get().NumParticipants) <-proxy.MarkDirty(true) require.EqualValues(t, 1, proxy.Get().NumParticipants) }) t.Run("await resolves when proxy is stopped", func(t *testing.T) { - proxy, _, _ := createTestProxy() + var ( + refreshInterval = 10 * time.Millisecond + mu = &sync.Mutex{} + ) + proxy, _, _ := createTestProxy(refreshInterval, mu) done := proxy.MarkDirty(true) proxy.Stop() <-done }) t.Run("multiple awaits resolve for one update", func(t *testing.T) { - proxy, _, _ := createTestProxy() + var ( + refreshInterval = 10 * time.Millisecond + mu = &sync.Mutex{} + ) + proxy, _, _ := createTestProxy(refreshInterval, mu) done0 := proxy.MarkDirty(false) done1 := proxy.MarkDirty(true) <-done0 @@ -112,7 +132,7 @@ func TestProtoProxy(t *testing.T) { }) t.Run("await resolve when there is no change", func(t *testing.T) { - proxy := NewProtoProxy[*livekit.Room](10*time.Millisecond, func() *livekit.Room { return nil }) + proxy := NewProtoProxy(10*time.Millisecond, func() *livekit.Room { return nil }) done := proxy.MarkDirty(true) time.Sleep(100 * time.Millisecond) select { @@ -123,19 +143,22 @@ func TestProtoProxy(t *testing.T) { }) } -func createTestProxy() (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) { +func createTestProxy(refreshInterval time.Duration, mu *sync.Mutex) (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) { // uses an update func that increments numParticipants each time - var numParticipants atomic.Uint32 - var freeze atomic.Bool - return NewProtoProxy[*livekit.Room]( - 10*time.Millisecond, - func() *livekit.Room { - if !freeze.Load() { - defer numParticipants.Add(1) - } - return &livekit.Room{ - NumParticipants: numParticipants.Load(), - } - }, - ), &numParticipants, &freeze + var ( + numParticipants atomic.Uint32 + freeze atomic.Bool + ) + updateFn := func() *livekit.Room { + mu.Lock() + defer mu.Unlock() + + if !freeze.Load() { + defer numParticipants.Add(1) + } + return &livekit.Room{ + NumParticipants: numParticipants.Load(), + } + } + return NewProtoProxy(refreshInterval, updateFn), &numParticipants, &freeze }