forked from ReactiveX/RxGo
-
Notifications
You must be signed in to change notification settings - Fork 2
/
duration.go
101 lines (87 loc) · 1.63 KB
/
duration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package rxgo
import (
"context"
"time"
"github.com/stretchr/testify/mock"
)
// Infinite represents an infinite wait time
var Infinite int64 = -1
// Duration represents a duration
type Duration interface {
duration() time.Duration
}
type duration struct {
d time.Duration
}
func (d *duration) duration() time.Duration {
return d.d
}
// WithDuration is a duration option
func WithDuration(d time.Duration) Duration {
return &duration{
d: d,
}
}
var tick = struct{}{}
type causalityDuration struct {
fs []execution
}
type execution struct {
f func()
isTick bool
}
func timeCausality(elems ...interface{}) (context.Context, Observable, Duration) {
ch := make(chan Item, 1)
fs := make([]execution, len(elems)+1)
ctx, cancel := context.WithCancel(context.Background())
for i, elem := range elems {
i := i
elem := elem
if elem == tick {
fs[i] = execution{
f: func() {},
isTick: true,
}
} else {
switch elem := elem.(type) {
default:
fs[i] = execution{
f: func() {
ch <- Of(elem)
},
isTick: false,
}
case error:
fs[i] = execution{
f: func() {
ch <- Error(elem)
},
isTick: false,
}
}
}
}
fs[len(elems)] = execution{
f: func() {
cancel()
},
isTick: false,
}
return ctx, FromChannel(ch), &causalityDuration{fs: fs}
}
func (d *causalityDuration) duration() time.Duration {
pop := d.fs[0]
pop.f()
d.fs = d.fs[1:]
if pop.isTick {
return time.Nanosecond
}
return time.Minute
}
type mockDuration struct {
mock.Mock
}
func (m *mockDuration) duration() time.Duration {
args := m.Called()
return args.Get(0).(time.Duration)
}