forked from ReactiveX/RxGo
-
Notifications
You must be signed in to change notification settings - Fork 2
/
optionalsingle.go
105 lines (89 loc) · 2.58 KB
/
optionalsingle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package rxgo
import "context"
// OptionalSingleEmpty is the constant returned when an OptionalSingle is empty.
var OptionalSingleEmpty = Item{}
// OptionalSingle is an optional single.
type OptionalSingle interface {
Iterable
Get(opts ...Option) (Item, error)
Map(apply Func, opts ...Option) OptionalSingle
Run(opts ...Option) Disposed
}
// OptionalSingleImpl implements OptionalSingle.
type OptionalSingleImpl struct {
parent context.Context
iterable Iterable
}
// Get returns the item or rxgo.OptionalEmpty. The error returned is if the context has been cancelled.
// This method is blocking.
func (o *OptionalSingleImpl) Get(opts ...Option) (Item, error) {
option := parseOptions(opts...)
ctx := option.buildContext(o.parent)
observe := o.Observe(opts...)
for {
select {
case <-ctx.Done():
return Item{}, ctx.Err()
case v, ok := <-observe:
if !ok {
return OptionalSingleEmpty, nil
}
return v, nil
}
}
}
// Map transforms the items emitted by an OptionalSingle by applying a function to each item.
func (o *OptionalSingleImpl) Map(apply Func, opts ...Option) OptionalSingle {
return optionalSingle(o.parent, o, func() operator {
return &mapOperatorOptionalSingle{apply: apply}
}, false, true, opts...)
}
// Observe observes an OptionalSingle by returning its channel.
func (o *OptionalSingleImpl) Observe(opts ...Option) <-chan Item {
return o.iterable.Observe(opts...)
}
type mapOperatorOptionalSingle struct {
apply Func
}
func (op *mapOperatorOptionalSingle) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
res, err := op.apply(ctx, item.V)
if err != nil {
dst <- Error(err)
operatorOptions.stop()
return
}
dst <- Of(res)
}
func (op *mapOperatorOptionalSingle) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}
func (op *mapOperatorOptionalSingle) end(_ context.Context, _ chan<- Item) {
}
func (op *mapOperatorOptionalSingle) gatherNext(_ context.Context, item Item, dst chan<- Item, _ operatorOptions) {
switch item.V.(type) {
case *mapOperatorOptionalSingle:
return
}
dst <- item
}
// Run creates an observer without consuming the emitted items.
func (o *OptionalSingleImpl) Run(opts ...Option) Disposed {
dispose := make(chan struct{})
option := parseOptions(opts...)
ctx := option.buildContext(o.parent)
go func() {
defer close(dispose)
observe := o.Observe(opts...)
for {
select {
case <-ctx.Done():
return
case _, ok := <-observe:
if !ok {
return
}
}
}
}()
return dispose
}