Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions engine/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ package engine
// arrival of new work unit(s). Notifiers essentially behave like
// channels in that they can be passed by value and still allow concurrent
// updates of the same internal state.
//
// CAUTION: While individual notification send/receive operations are atomic, the handoff from the
// sender to the receiver is not. This means that when Notify() is called concurrently or in fast
// succession, some notifications may be dropped even if there are consumers waiting on the Channel().
// The Notifier should only be used for usecases that can tolerate dropped notifications.
type Notifier struct {
// Illustrative description of the Notifier:
// * When the gate is activated, it will let a _single_ person step through the gate.
Expand All @@ -20,6 +25,21 @@ type Notifier struct {
// Passing through the gate corresponds to receiving from the `Channel()`.
// As we don't want the routine sending the notification to wait until a worker
// routine reads from the channel, we need a buffered channel with capacity 1.
//
// CAUTION: the handoff of non-blocking writes and reads on a channel are NOT atomic operations.
// The order of concurrent reads and non-blocking writes is entirely dependent on the go scheduler.
// Consider the following scenario:
// - One or more routines are blocked on the channel
// - One or more routines are producing notifications in fast succession (or concurrently)
// Intuitively, you might expect that if there is a waiting consumer, then the consumer will take
// the notification as soon as it is available. This is true when performing a blocking write,
// however, when performing a non-blocking write (i.e. with a select/default statement), the read
// happens when the go scheduler executes the routine. This means that the producer could fill
// the buffered channel and exercise the default bypass while one or more consumers are waiting
// to receive notifications.
// This can be problematic when using the notifier to communicate work for a pool of workers.
// In some situations, only a subset of workers are unblocked even if there is enough work for
// all workers.

notifier chan struct{} // buffered channel with capacity 1
}
Expand Down
108 changes: 108 additions & 0 deletions engine/trapdoor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package engine

import (
"sync"
)

// Trapdoor is a concurrency primitive to sleep worker routines if no work is available. A routine
// providing work activates the trapdoor. Repeated activation of an already activated trapdoor are
// no-ops. Once activated, a single routine can pass the trapdoor and the trapdoor deactivates again.
// If multiple routines are blocked on the trapdoor, each call to `Activate()` will wake up a single
// routine.
//
// Activation and passing of the trapdoor are atomic operations.
// Trapdoor establishes a happens-before relationship: the completion of the `Activate()` call happens
// before the channel returned by `Channel()` is closed.
//
// Trapdoor is distinct from Notifier in that the send/receive handoff is atomic. This means that if
// there are many blocked consumers, every call to `Activate()` will wake up a single consumer,
// regardless of the order in which goroutines are executed by the go scheduler.
//
// Trapdoor is concurrency safe.
//
// CAUTION: Trapdoor must not be copied or passed by value.
type Trapdoor struct {
_ sync.Mutex // prevent copying of the struct

activated bool
shutdown bool
cond *sync.Cond
}

// NewTrapdoor creates a new instance of a Trapdoor
// The returned trapdoor must be closed with Close() to avoid leaking goroutines.
// CAUTION: Trapdoor must not be copied or passed by value.
func NewTrapdoor() *Trapdoor {
return &Trapdoor{
cond: sync.NewCond(&sync.Mutex{}),
}
}

// Close shuts down goroutines used by the trapdoor.
// After `Close()` is called, any new or existing channels returned by `Channel()` will never be closed,
// and subsequent calls to `Activate()` will cause a panic.
// This method is idempotent. Multiple calls to `Close()` are no-ops.
func (td *Trapdoor) Close() {
td.cond.L.Lock()
defer td.cond.L.Unlock()
if !td.shutdown {
td.shutdown = true
td.cond.Broadcast() // wake up all blocked routines
}
}

// Activate activates the trapdoor and wakes a single worker.
//
// Calls to `Activate()` after `Close()` will panic.
func (td *Trapdoor) Activate() {
td.cond.L.Lock()
defer td.cond.L.Unlock()

if td.shutdown {
panic("call to trapdoor.Activate() after close")
}

// always send the signal, even if the trapdoor was already activated. This guarantees that if
// there are blocked workers, each call to `Activate()` will wake up a single worker.
// `cond.Signal()` is a no-op if no routines are waiting.
td.activated = true
td.cond.Signal()
}

// pass blocks the calling routine if the trapdoor is not activated or shutdown, otherwise it returns
// immediately. The trapdoor is deactivated after passing. Returns false if the trapdoor is shutdown.
func (td *Trapdoor) pass() bool {
td.cond.L.Lock()
defer td.cond.L.Unlock()

if td.shutdown {
return false
}

if !td.activated {
td.cond.Wait() // only return when awoken by Broadcast or Signal
}
td.activated = false

// since cond.Wait() internally releases the lock, `shutdown` may have changed since we last checked.
return !td.shutdown
}

// Channel returns a channel that is closed when the trapdoor is activated.
// Each call to `Channel()` returns a new channel.
//
// IMPORTANT: `Channel()` must be called for each wait. Since the returned channel is closed to signal
// that the trapdoor is activated, it cannot be reused for subsequent notifications.
func (td *Trapdoor) Channel() <-chan struct{} {
ch := make(chan struct{})
started := make(chan struct{})
go func() {
close(started)

if td.pass() {
close(ch)
}
}()
<-started
return ch
}
238 changes: 238 additions & 0 deletions engine/trapdoor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package engine

import (
"context"
"fmt"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

// TestTrapdoor_NoNotificationsInitialization verifies that Notifier is initialized without notifications
func TestTrapdoor_NoNotificationsInitialization(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
trapdoor := NewTrapdoor()
defer trapdoor.Close()

ch := trapdoor.Channel()

synctest.Wait() // wait for the channel routine to be started and blocked

assertNotClosed(t, ch, "expected no notification to be available")
})
}

// TestTrapdoor_Close verifies that Close() behaves as expected by performing the following test:
// - Channels created before and after Close() are never closed.
// - Calling Activate() after Close() panics.
// - Calling Channel() after Close() does not panic.
func TestTrapdoor_Close(t *testing.T) {
t.Parallel()

synctest.Test(t, func(t *testing.T) {
trapdoor := NewTrapdoor()

// get channels before and after Close() is called. neither should ever be closed.
// each call to Channel() returns a new channel
beforeClose := trapdoor.Channel()
trapdoor.Close()
afterClose := trapdoor.Channel() // this should not panic

synctest.Wait() // wait for channel routines to be started and blocked

assertNotClosed(t, beforeClose, "beforeClose channel should not be closed")
assertNotClosed(t, afterClose, "afterClose channel should not be closed")

// now call Activate(). this should cause a panic, and neither channel should be closed.
defer func() {
assert.NotNil(t, recover(), "expected panic")

synctest.Wait() // make sure any processing has completed (there shouldn't be any)
assertNotClosed(t, beforeClose, "beforeClose channel should not be closed")
assertNotClosed(t, afterClose, "afterClose channel should not be closed")
}()

trapdoor.Activate()
})
}

// TestTrapdoor_ActivateMany calls Activate() many times and verifies that
// - the trapdoor accepts them all without a notification being consumed
// - only one notification is internally stored and subsequent attempts to read a notification block
func TestTrapdoor_ActivateMany(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
trapdoor := NewTrapdoor()
defer trapdoor.Close()

var counter sync.WaitGroup
for range 10 {
counter.Go(func() {
trapdoor.Activate()
})
}
counter.Wait()

// Note: because the channel is closed within a goroutine that's started after Channel() is called,
// there is a race between checking the returned channel, and the goroutine internally calling
// trapdoor.pass() then sending the signal. Therefore, we must use synctest to ensure that the
// goroutine is started and blocked.

// attempt to consume first notification:
// expect that one notification should be available
ch := trapdoor.Channel()
synctest.Wait()
assertClosed(t, ch, "expected one notification to be available")

// attempt to consume second notification
// expect that no notification is available
ch = trapdoor.Channel()
synctest.Wait()
assertNotClosed(t, ch, "expected no notification to be available")
})
}

// TestTrapdoor_ManyBlockedConsumers spawns many blocked worker rountines, then sends Activate() calls
// for each worker. All workers should wake up without any special synchronization.
//
// This test specifically does not use synctest to avoid any potential special behavior introduced
// by the framework. Instead, a small sleep is used to wait for workers to all block before sending
// Activate() calls.
func TestTrapdoor_ManyBlockedConsumers(t *testing.T) {
t.Parallel()
singleTestRun := func(t *testing.T) {
t.Parallel()
trapdoor := NewTrapdoor()
defer trapdoor.Close()

// spawn 100 worker routines to each wait for a notification
workerCount := int32(100)
pendingWorkers := atomic.NewInt32(workerCount)

for range workerCount {
ready := make(chan struct{})
go func() {
close(ready)
<-trapdoor.Channel()
pendingWorkers.Dec()
}()
<-ready
}

// we are specifically not using synctest to avoid any special behavior that may be introduced.
// Using a sleep here to ensure that all workers have progressed from launching their goroutines
// to blocking on the channel.
time.Sleep(10 * time.Millisecond)

// send Activate() calls as quickly as possible. trapdoor does not require synchronization
// in this case. each call should wake up one worker.
for range workerCount {
trapdoor.Activate()
}

// wait for all workers to complete
require.Eventually(t, func() bool {
return pendingWorkers.Load() == 0
}, 2*time.Second, time.Millisecond)
}

for r := range 100 {
t.Run(fmt.Sprintf("run %d", r), singleTestRun)
}
}

// TestTrapdoor_AllWorkProcessed spawns many routines pushing work and fewer routines consuming work.
// We require that all work is eventually processed.
func TestTrapdoor_AllWorkProcessed(t *testing.T) {
t.Parallel()
singleTestRun := func(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

trapdoor := NewTrapdoor()
defer trapdoor.Close()

producerCount := int32(10)
producerJobs := int32(10)
pendingWorkQueue := make(chan struct{}, producerCount*producerJobs)
consumedWork := atomic.NewInt32(0)

// start the consumers first, otherwise we might finish pushing all jobs, before any of
// our consumer has started listening to the queue.

processAllPending := func() {
for {
select {
case <-pendingWorkQueue:
consumedWork.Inc()
default:
return
}
}
}

for range producerCount / 2 {
go func() {
for {
select {
case <-ctx.Done():
return
case <-trapdoor.Channel():
processAllPending()
}
}
}()
}

// wait for all consumers to block on the channel
synctest.Wait()

for range producerCount {
go func() {
for range producerJobs {
pendingWorkQueue <- struct{}{}
trapdoor.Activate()
}
}()
}

// wait for all producers and consumers to block. at this point, all jobs should be consumed.
synctest.Wait()

// verify all scheduled jobs were consumed.
assert.Equal(t, producerCount*producerJobs, consumedWork.Load())

// shutdown blocked consumers and wait for them to complete
cancel()
synctest.Wait()
})
}

for r := range 100 {
t.Run(fmt.Sprintf("run %d", r), singleTestRun)
}
}

func assertClosed(t *testing.T, ch <-chan struct{}, errMsg string) {
select {
case <-ch:
default:
t.Error(errMsg)
}
}

func assertNotClosed(t *testing.T, ch <-chan struct{}, errMsg string) {
select {
case <-ch:
t.Error(errMsg)
default:
}
}
Loading