Skip to content

Commit

Permalink
Switched to a generic function (#4)
Browse files Browse the repository at this point in the history
* Made lib generic

* Switched to 1.18 build

* Removed deps step

* Switched off modules

* Added mod init

* Adding GOPATH

* Try path again

* Used newer action

* Set unstable

* Fixed version number

* Try numbering again

* More version format nonsense

* Added paths

* Added go.mod

* Refactoring

* Generified README

* Clarified and commented

* Shifted to typed sync maps

Co-authored-by: Sudhir Jonathan <[email protected]>
  • Loading branch information
sudhirj and Sudhir Jonathan authored Mar 17, 2022
1 parent 7ca2c9e commit 469bc0e
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 71 deletions.
20 changes: 6 additions & 14 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,18 @@ jobs:
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.13
uses: actions/setup-go@v1
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: 1.13
go-version: '1.18.0-beta2'
stable: 'false'
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v1

- name: Get dependencies
run: |
go get -v -t -d ./...
if [ -f Gopkg.toml ]; then
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
dep ensure
fi
- name: Build
run: go build -v .

run: go build

- name: Run Tests
run: go test . --race
run: go test --race
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ A circular queue that processes jobs in parallel but returns results in FIFO.
```go
inputs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

inputChannel, outputChannel := NewCirque(3, func(i interface{}) interface{} {
inputChannel, outputChannel := NewCirque(3, func(i int) int {
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return i.(int) * 2
return i * 2
})

go func() {
Expand All @@ -18,7 +18,7 @@ go func() {

var output []int
for i := range outputChannel {
output = append(output, i.(int))
output = append(output, i)
}
fmt.Println(output)

Expand Down
101 changes: 53 additions & 48 deletions cirque.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,82 @@ import (
"sync"
)

type indexedValue struct {
value interface{}
index int64
}

// NewCirque creates a FIFO parallel queue that runs a given processor function on each job, similar to a parallel Map.
// NewCirque creates a FIFO parallel queue that runs a given
// processor function on each job, similar to a parallel Map.
//
// The method accepts a parallelism number, which the maximum number of jobs that are processed simultaneously,
// and a processor function that takes a job as input and returns a indexedValue as output. The processor function must be safe
// The method accepts a parallelism number, which the maximum
// number of jobs that are processed simultaneously,
// and a processor function that takes an input and returns
// an output. The processor function must be safe
// to call from multiple goroutines.
//
// It returns two channels, one into which inputs can be passed, and one from which outputs can be read.
// Closing the input channel will close the output channel after processing is complete. Do not close the output channel yourself.
func NewCirque(parallelism int64, processor func(interface{}) interface{}) (chan<- interface{}, <-chan interface{}) {
input := make(chan interface{})
output := make(chan interface{})
// It returns two channels, one into which inputs can be passed,
// and one from which outputs can be read. Closing the input channel
// will close the output channel after processing is complete. Do not
// close the output channel yourself.
func NewCirque[I any, O any](parallelism int64, processor func(I) O) (chan<- I, <-chan O) {
inputChannel := make(chan I)
outputChannel := make(chan O)

inputHolder := NewSyncMap[int64, I]()
outputHolder := NewSyncMap[int64, O]()

// let the output goroutine know every time an input is processed, so it
// can wake up and try to send outputs
processCompletionSignal := make(chan struct{})

// apply backpressure to make sure we're processing inputs only when outputs are
// actually being collected - otherwise we're going to fill up memory with processed
// jobs that aren't being taken out.
outputBackpressureSignal := make(chan struct{}, parallelism)

processedJobs := make(chan indexedValue)
semaphore := make(chan struct{}, parallelism)
go func() { // process inputs
poolWaiter := sync.WaitGroup{}
pool := make(chan indexedValue)
inflightInputs := sync.WaitGroup{}
inputPool := make(chan int64)

// Start worker pool of specified size
for workerID := int64(0); workerID < parallelism; workerID++ {
poolWaiter.Add(1)
for n := int64(0); n < parallelism; n++ {
inflightInputs.Add(1)
go func() {
for job := range pool {
processedJobs <- indexedValue{
value: processor(job.value),
index: job.index,
}
for index := range inputPool {
input, _ := inputHolder.Get(index)
outputHolder.Set(index, processor(input))
inputHolder.Delete(index)
processCompletionSignal <- struct{}{}
}
poolWaiter.Done()
inflightInputs.Done()
}()
}

index := int64(0)
for job := range input {
pool <- indexedValue{
value: job,
index: index,
}
index = index + 1
semaphore <- struct{}{}
for input := range inputChannel {
inputHolder.Set(index, input)
inputPool <- index
index++
outputBackpressureSignal <- struct{}{}
}
close(pool)
close(inputPool)

poolWaiter.Wait()
close(processedJobs)
inflightInputs.Wait()
close(processCompletionSignal)
}()

go func() { // send outputs in order
nextIndex := int64(0)
storedResults := map[int64]indexedValue{}
for res := range processedJobs {
storedResults[res.index] = res
canSend := true
for canSend {
if storedResult, ok := storedResults[nextIndex]; ok {
output <- storedResult.value
delete(storedResults, storedResult.index)
nextIndex = nextIndex + 1
<-semaphore
for range processCompletionSignal {
for true {
if output, ok := outputHolder.Get(nextIndex); ok {
outputChannel <- output
outputHolder.Delete(nextIndex)
nextIndex++
<-outputBackpressureSignal
} else {
canSend = false
break
}
}
}
close(output)
close(outputChannel)
}()

return input, output
return inputChannel, outputChannel
}
12 changes: 6 additions & 6 deletions cirque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func TestCirque(t *testing.T) {
var wipCount int64 = 0

var maxParallelism int64 = 3
inputChannel, outputChannel := NewCirque(maxParallelism, func(i interface{}) interface{} {
inputChannel, outputChannel := NewCirque(maxParallelism, func(i int) int {
atomic.AddInt64(&measuredParallelism, 1)
time.Sleep(time.Duration(rand.Int63n(10)) * time.Millisecond)
atomic.AddInt64(&measuredParallelism, -1)
return i.(int) * 2
return i * 2
})

go func() {
Expand All @@ -67,7 +67,7 @@ func TestCirque(t *testing.T) {
var actualOutput []int
for i := range outputChannel {
atomic.AddInt64(&wipCount, -1)
actualOutput = append(actualOutput, i.(int))
actualOutput = append(actualOutput, i)
}
if len(cs.expectedOutput) > 0 && !reflect.DeepEqual(cs.expectedOutput, actualOutput) {
t.Errorf("WRONG WRONG WRONG. Case: %s \n Expected: %v, Actual: %v,",
Expand All @@ -81,9 +81,9 @@ func TestCirque(t *testing.T) {
func ExampleNewCirque() {
inputs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

inputChannel, outputChannel := NewCirque(3, func(i interface{}) interface{} {
inputChannel, outputChannel := NewCirque(3, func(i int) int {
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return i.(int) * 2
return i * 2
})

go func() {
Expand All @@ -95,7 +95,7 @@ func ExampleNewCirque() {

var output []int
for i := range outputChannel {
output = append(output, i.(int))
output = append(output, i)
}
fmt.Println(output)

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/sudhirj/cirque

go 1.18
34 changes: 34 additions & 0 deletions map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cirque

import "sync"

type SyncMap[K comparable, V any] struct {
holder map[K]V
lock sync.RWMutex
}

func NewSyncMap[K comparable, V any]() *SyncMap[K, V] {
return &SyncMap[K, V]{
holder: make(map[K]V),
lock: sync.RWMutex{},
}
}

func (tm *SyncMap[K, V]) Set(key K, value V) {
tm.lock.Lock()
tm.holder[key] = value
tm.lock.Unlock()
}

func (tm *SyncMap[K, V]) Get(key K) (value V, ok bool) {
tm.lock.RLock()
defer tm.lock.RUnlock()
value, ok = tm.holder[key]
return
}

func (tm *SyncMap[K, V]) Delete(key K) {
tm.lock.Lock()
delete(tm.holder, key)
tm.lock.Unlock()
}

0 comments on commit 469bc0e

Please sign in to comment.