-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasync_producer.py
150 lines (119 loc) · 4.23 KB
/
async_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# async_producer.py
#
# Implement same functionality as producer.py but without threads
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0 # sequence number avoids case when deadlines are identical
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, func = heapq.heappop(self.sleeping)
# deadline, func = self.sleeping.pop(0)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler()
# -------------------------------------------------------
class Result:
def __init__(self, value=None, exc=None):
self.value = value
self.exc = exc
def result(self):
if self.exc:
raise self.exc
else:
return self.value
class QueueClosed(Exception):
pass
# Implement a queuing object
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque() # All getters waiting for data
self._closed = False # Can queue be used anymore?
def close(self):
self._closed = True
if self.waiting and not self.items:
for func in self.waiting:
sched.call_soon(func)
# We put something on the queue and if something is waiting then pop it off and pas to Scheduler
def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
# Do we call it right away?
# func() -----> not a good idea as might get deep calls, recursion, etc.
sched.call_soon(func)
def get(self, callback):
# Wait until an item is available. Then return it.
if self.items:
callback(
Result(value=self.items.popleft())) # Trigger a callback if data is available, still runs if "closed"
else:
# No items available (must wait)
if self._closed:
callback(Result(exc=QueueClosed())) # Error result
else:
self.waiting.append(lambda: self.get(callback)) # no data arrange to execute later
def producer(q, count):
# Can't use this for loop as it will block until complete - anti async
# for n in range(count):
# print('Producing', n)
# q.put(n)
# time.sleep(1)
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n + 1))
else:
print("Producer done")
q.close() # No more items will be produced
# q.put(None) # 'sentinel' to shut down
_run(0)
def consumer(q):
# def _consume(item): # This is the callback
def _consume(result):
try:
item = result.result()
# if item is None: # <<<<<<< Queue closed check (Error)
# print('Consumer done')
# else:
print('Consuming', item) # <<<<<<<< Queue item (Result)
sched.call_soon(lambda: consumer(q))
except QueueClosed:
print('Consumer done')
q.get(callback=_consume)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q, ))
sched.run()
# while True:
# item = q.get() # PROBLEM HERE: .get() waiting
# if item is None:
# break
# print('Consuming', item)
# print('Consumer done')
#
#
# q = queue.Queue() # Thread safe queue
# threading.Thread(target=producer, args=(q, 10)).start()
# threading.Thread(target=consumer, args=(q,)).start()