9
9
10
10
import io .reactivex .Flowable ;
11
11
import io .reactivex .Observable ;
12
- import io .reactivex .processors .FlowableProcessor ;
13
12
import io .reactivex .processors .PublishProcessor ;
14
13
import io .reactivex .subjects .Subject ;
15
14
@@ -28,7 +27,7 @@ public class RxBus2 {
28
27
* PublishProcessor 同时充当了Observer和Observable的角色
29
28
*/
30
29
@ SuppressWarnings ("rawtypes" )
31
- private ConcurrentHashMap <Object , List <FlowableProcessor >> subjectMapper = new ConcurrentHashMap <>();
30
+ private ConcurrentHashMap <Object , List <PublishProcessor >> subjectMapper = new ConcurrentHashMap <>();
32
31
33
32
private RxBus2 () {
34
33
}
@@ -50,14 +49,14 @@ private static class RxBus2Holder {
50
49
*/
51
50
@ SuppressWarnings ({"rawtypes" })
52
51
public <T > Flowable <T > register (@ NonNull Object tag ) {
53
- List <FlowableProcessor > subjectList = subjectMapper .get (tag );
52
+ List <PublishProcessor > subjectList = subjectMapper .get (tag );
54
53
if (null == subjectList ) {
55
54
subjectList = new ArrayList <>();
56
55
subjectMapper .put (tag , subjectList );
57
56
}
58
57
59
58
//考虑到多线程原因使用toSerialized方法
60
- FlowableProcessor <T > processor = (FlowableProcessor <T >) PublishProcessor .create ().toSerialized ();
59
+ PublishProcessor <T > processor = (PublishProcessor <T >) PublishProcessor .create ().toSerialized ();
61
60
subjectList .add (processor );
62
61
return processor ;
63
62
}
@@ -69,7 +68,7 @@ public <T> Flowable<T> register(@NonNull Object tag) {
69
68
*/
70
69
@ SuppressWarnings ("rawtypes" )
71
70
public void unregister (@ NonNull Object tag ) {
72
- List <FlowableProcessor > subjectList = subjectMapper .get (tag );
71
+ List <PublishProcessor > subjectList = subjectMapper .get (tag );
73
72
if (null != subjectList ) {
74
73
subjectMapper .remove (tag );
75
74
}
@@ -89,7 +88,7 @@ public RxBus2 unregister(@NonNull Object tag,
89
88
return getInstance ();
90
89
}
91
90
92
- List <FlowableProcessor > subjectList = subjectMapper .get (tag );
91
+ List <PublishProcessor > subjectList = subjectMapper .get (tag );
93
92
if (null != subjectList ) {
94
93
// 从subjectList中删去observable
95
94
subjectList .remove ((Subject <?>) observable );
@@ -118,9 +117,9 @@ public void post(@NonNull Object content) {
118
117
*/
119
118
@ SuppressWarnings ({"unchecked" , "rawtypes" })
120
119
public void post (@ NonNull Object tag , @ NonNull Object content ) {
121
- List <FlowableProcessor > subjectList = subjectMapper .get (tag );
120
+ List <PublishProcessor > subjectList = subjectMapper .get (tag );
122
121
if (!isEmpty (subjectList )) {
123
- for (FlowableProcessor subject : subjectList ) {
122
+ for (PublishProcessor subject : subjectList ) {
124
123
subject .onNext (content );
125
124
}
126
125
}
@@ -133,7 +132,7 @@ public void post(@NonNull Object tag, @NonNull Object content) {
133
132
* @return
134
133
*/
135
134
@ SuppressWarnings ("rawtypes" )
136
- public static boolean isEmpty (Collection <FlowableProcessor > collection ) {
135
+ public static boolean isEmpty (Collection <PublishProcessor > collection ) {
137
136
return null == collection || collection .isEmpty ();
138
137
}
139
138
}
0 commit comments