-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathsubscriber.go
127 lines (108 loc) · 3.1 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package rx
import (
"context"
"github.com/jjeffcaii/reactor-go"
"github.com/rsocket/rsocket-go/payload"
)
var (
// EmptySubscriber is a blank Subscriber.
EmptySubscriber Subscriber = &subscriber{}
// EmptyRawSubscriber is a blank native Subscriber in reactor-go.
EmptyRawSubscriber = reactor.NewSubscriber(reactor.OnNext(func(v reactor.Any) error {
return nil
}))
)
// Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
type Subscription = reactor.Subscription
// Subscriber will receive call to OnSubscribe(Subscription) once after passing an instance of Subscriber to Publisher#SubscribeWith
type Subscriber interface {
// OnNext represents data notification sent by the Publisher in response to requests to Subscription#Request.
OnNext(payload payload.Payload)
// OnError represents failed terminal state.
OnError(error)
// OnComplete represents successful terminal state.
OnComplete()
// OnSubscribe invoked after Publisher subscribed.
// No data will start flowing until Subscription#Request is invoked.
OnSubscribe(context.Context, Subscription)
}
type subscriberFacade struct {
Subscriber
}
type subscriber struct {
fnOnSubscribe FnOnSubscribe
fnOnNext FnOnNext
fnOnComplete FnOnComplete
fnOnError FnOnError
}
func NewSubscriberFacade(s Subscriber) reactor.Subscriber {
return subscriberFacade{
Subscriber: s,
}
}
func (s subscriberFacade) OnNext(any reactor.Any) {
s.Subscriber.OnNext(any.(payload.Payload))
}
func (s *subscriber) OnNext(payload payload.Payload) {
if s == nil || s.fnOnNext == nil {
return
}
if err := s.fnOnNext(payload); err != nil {
s.OnError(err)
}
}
func (s *subscriber) OnError(err error) {
if s != nil && s.fnOnError != nil {
s.fnOnError(err)
}
}
func (s *subscriber) OnComplete() {
if s != nil && s.fnOnComplete != nil {
s.fnOnComplete()
}
}
func (s *subscriber) OnSubscribe(ctx context.Context, su Subscription) {
if s != nil && s.fnOnSubscribe != nil {
s.fnOnSubscribe(ctx, su)
} else {
su.Request(RequestMax)
}
}
// SubscriberOption is option of subscriber.
// You can call OnNext, OnComplete, OnError or OnSubscribe.
type SubscriberOption func(*subscriber)
// OnNext returns s SubscriberOption handling Next event.
func OnNext(onNext FnOnNext) SubscriberOption {
return func(s *subscriber) {
s.fnOnNext = onNext
}
}
// OnComplete returns s SubscriberOption handling Complete event.
func OnComplete(onComplete FnOnComplete) SubscriberOption {
return func(s *subscriber) {
s.fnOnComplete = onComplete
}
}
// OnError returns s SubscriberOption handling Error event.
func OnError(onError FnOnError) SubscriberOption {
return func(i *subscriber) {
i.fnOnError = onError
}
}
// OnSubscribe returns s SubscriberOption handling Subscribe event.
func OnSubscribe(onSubscribe FnOnSubscribe) SubscriberOption {
return func(i *subscriber) {
i.fnOnSubscribe = onSubscribe
}
}
// NewSubscriber create a new Subscriber with custom options.
func NewSubscriber(opts ...SubscriberOption) Subscriber {
if len(opts) < 1 {
return EmptySubscriber
}
s := &subscriber{}
for _, opt := range opts {
opt(s)
}
return s
}