-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtest_rx.py
75 lines (56 loc) · 1.79 KB
/
test_rx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# we use gevent
import pycond as pc
import gevent
from threading import Event, current_thread as ct
import rx.scheduler.eventloop as e
from rx.scheduler.eventloop import GEventScheduler
from gevent import monkey
import time
import sys
monkey.patch_all()
# _thn = lambda msg, data: print('thread:', cur_thread().name, msg, data)
def tn(): return ct().name.replace('Thread', '')
GS = GEventScheduler(gevent)
# GS = None
Rx, rx = pc.import_rx()
# set this higher and watch mem getting constant:
now, count, prnt = time.time, 1, 0
class F:
def blocking(k, v, cfg, data, **kw):
data[k] = tn()
return True, v
blocking2 = blocking
class Tests:
"""Asserting some special cases"""
cond = {
'root': [':b1', 'and', ':b2'],
'b1': ['a', 'and', ':blocking'],
'b2': ['a', 'and', ':blocking2'],
}
def test_rx_async1_prefix(self):
"""is prefix working over async runs"""
res = {}
def d(i):
return {'payload': {'a': i + 1}}
rxop = lambda **kw: pc.rxop(
Tests.cond,
into='mod',
prefix='payload',
scheduler=GS,
lookup_provider=F,
asyn=['blocking', 'blocking2'],
)
l = []
# Rx.interval(0, scheduler=GS).pipe(
rxcond = rxop()
s = Rx.from_(range(count)).pipe(rx.map(d), rxcond, rx.take(count))
s.subscribe(lambda x: l.append(x))
while not l:
time.sleep(0.01)
# same thread set in def blocking:
t = [l[0]['payload'].pop(b) for b in ['blocking', 'blocking2']]
assert t[0] == t[1]
assert 'Dummy' in t[0]
assert l == [{'mod': {'b2': True, 'b1': True, 'root': True}, 'payload': {'a': 1}}]
if __name__ == '__main__':
Tests().test_rx_async1_prefix()