-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_async_worker_test.go
178 lines (156 loc) · 4.23 KB
/
example_async_worker_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package goiteratorhelper_test
import (
"context"
"fmt"
"maps"
"slices"
"sync"
"time"
"github.com/ngicks/go-iterator-helper/hiter"
"github.com/ngicks/go-iterator-helper/hiter/async"
"github.com/ngicks/go-iterator-helper/hiter/iterable"
"github.com/ngicks/go-iterator-helper/hiter/mapper"
"github.com/ngicks/go-iterator-helper/x/exp/xiter"
)
// Example async worker channel demonstrates usage of [hiter.Chan], [hiter.ChanSend].
// It sends values from seq to worker running on separates goroutines.
// Workers work on values and then send results back to the main goroutine.
func Example_async_worker_channel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
works := []string{"foo", "bar", "baz"}
in := make(chan string, 5)
out := make(chan hiter.KeyValue[string, error])
var wg sync.WaitGroup
wg.Add(3)
for range 3 {
go func() {
defer wg.Done()
_, _ = hiter.ChanSend(
ctx,
out,
xiter.Map(
func(s string) hiter.KeyValue[string, error] {
return hiter.KeyValue[string, error]{
K: "✨" + s + "✨" + s + "✨",
V: nil,
}
},
hiter.Chan(ctx, in),
),
)
}()
}
var wg2 sync.WaitGroup
wg2.Add(1)
go func() {
defer wg2.Done()
wg.Wait()
close(out)
}()
_, _ = hiter.ChanSend(ctx, in, slices.Values(works))
close(in)
results := maps.Collect(hiter.FromKeyValue(hiter.Chan(ctx, out)))
for result, err := range iterable.MapSorted[string, error](results).Iter2() {
fmt.Printf("result = %s, err = %v\n", result, err)
}
wg2.Wait()
// Output:
// result = ✨bar✨bar✨, err = <nil>
// result = ✨baz✨baz✨, err = <nil>
// result = ✨foo✨foo✨, err = <nil>
}
// Example async worker map demonstrates usage of async.Map.
// At the surface it is similar to [xiter.Map2]. Actually it calls mapper in separate goroutine.
// If you don't care about order of element,
// just send values to workers through a channel and send back through another channel.
func Example_async_worker_map() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
works := []string{"foo", "bar", "baz"}
// The order is kept.
for result, err := range async.Map(
ctx,
/*queueLimit*/ 10,
/*workerLimit*/ 5,
/*mapper*/ func(ctx context.Context, s string) (string, error) {
return "✨" + s + "✨" + s + "✨", nil
},
slices.Values(works),
) {
fmt.Printf("result = %s, err = %v\n", result, err)
}
// Output:
// result = ✨foo✨foo✨, err = <nil>
// result = ✨bar✨bar✨, err = <nil>
// result = ✨baz✨baz✨, err = <nil>
}
func Example_async_worker_map_graceful_cancellation() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
works := []string{"foo", "bar", "baz"}
workerCtx, cancelWorker := context.WithCancel(context.Background())
defer cancelWorker()
for result, err := range async.Map(
ctx,
/*queueLimit*/ 1,
/*workerLimit*/ 1,
/*mapper*/ func(ctx context.Context, s string) (string, error) {
combined, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-ctx.Done():
case <-combined.Done():
case <-workerCtx.Done():
}
cancel()
}()
if combined.Err() != nil {
return "", combined.Err()
}
return "✨" + s + "✨" + s + "✨", nil
},
mapper.Cancellable(1, workerCtx, slices.Values(works)),
) {
fmt.Printf("result = %s, err = %v\n", result, err)
cancelWorker()
}
// Output:
// result = ✨foo✨foo✨, err = <nil>
// result = ✨bar✨bar✨, err = <nil>
}
func Example_async_chunk() {
var (
wg sync.WaitGroup
in = make(chan int)
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(500 * time.Nanosecond)
defer ticker.Stop()
_, _ = hiter.ChanSend(ctx, in, hiter.Tap(func(int) { <-ticker.C }, hiter.Range(0, 20)))
close(in)
}()
first := true
var count int
for c := range async.Chunk(time.Microsecond, 5, hiter.Chan(ctx, in)) {
count++
for _, i := range c {
if !first {
fmt.Print(", ")
}
first = false
fmt.Printf("%d", i)
}
}
fmt.Println()
wg.Wait()
fmt.Printf("count > 0 = %t\n", count > 0)
// Output:
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
// count > 0 = true
}