-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathstation_daemon.py
164 lines (135 loc) · 6.21 KB
/
station_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#SEE https://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html
#for info on listeners
from config import *
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
from rootio.extensions import db
import zmq
from utils import ZMQ, init_logging
import json
import time
import sys
from multiprocessing import Process
import random
import redis
import isodate
from datetime import datetime
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
MESSAGE_QUEUE_PORT_WEB = ZMQ_FORWARDER_SPITS_OUT
# get access to telephony & web database
telephony_server = Flask("ResponseServer")
telephony_server.debug = True
from rootio.telephony.models import *
from rootio.radio.models import *
telephony_server.config['SQLALCHEMY_DATABASE_URI'] = SQLALCHEMY_DATABASE_URI
db = SQLAlchemy(telephony_server)
logger = init_logging('station_daemon')
# Daemon class
class StationDaemon(Station):
def __init__(self, station_id):
logger.info("Hello World")
self.gateway = 'sofia/gateway/utl'
self.caller_queue = []
self.active_workers = []
try:
self.station = db.session.query(Station).filter(Station.id == station_id).one()
except Exception, e:
logger.error('Could not load one unique station', exc_info=True)
logger.info("Initializing station: {}".format(self.station.name))
# This is for UTL outgoing ONLY. Should be moved to a utility just for the gateway, or such.
try:
self.r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=OUTGOING_NUMBERS_REDIS_DB)
except Exception, e:
logger.error('Could not open redis connection', exc_info=True)
# INITIATE OUTGOING NUMBERS HERE
# Hereafter, stations can do a r.rpoplpush('outgoing_unused','outgoing_busy') to get a number
# or a r.lrem('outgoing_busy', 0, somenumber) to return it -- SHOULD be atomic :(
while self.r.rpop('outgoing_unused') != None:
pass
while self.r.rpop('outgoing_busy') != None:
pass
for i in range(OUTGOING_NUMBER_BOTTOM, OUTGOING_NUMBER_TOP+1):
self.r.rpush('outgoing_unused', '0'+str(i))
# INITIATE IS_MASTER KEYS
for k in self.r.keys('is_master_*'):
self.r.set(k,'none')
# start listeners
self.start_listeners()
########################################################################
# Listeners for messages on calls, sms,
# program changes, and db updates
########################################################################
# Listener function, running
def listener(self, channel, function):
port = MESSAGE_QUEUE_PORT_WEB
context = zmq.Context()
socket_sub = context.socket(zmq.SUB)
socket_sub.connect(ZMQ_FORWARDER_SPITS_OUT)
socket_sub.setsockopt(zmq.SUBSCRIBE, str(channel))
stream_sub = zmqstream.ZMQStream(socket_sub)
stream_sub.on_recv(function)
print "Connected to publisher with port %s" % port
ioloop.IOLoop.instance().start()
print "Worker has stopped processing messages."
#https://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html
def start_listeners(self):
call_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.call'), self.process_call))
call_listener.start()
self.active_workers.append(call_listener)
sms_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.sms'), self.process_sms))
sms_listener.start()
self.active_workers.append(sms_listener)
program_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.program'), self.process_program))
program_listener.start()
self.active_workers.append(program_listener)
db_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.db'), self.process_db))
db_listener.start()
self.active_workers.append(db_listener)
# respond to call-related messages
def process_call(self, msg):
logger.info("Processing call: {}".format(msg))
# respond to sms messages
def process_sms(self, msg):
logger.info("Processing sms: {}".format(msg))
# respond to program changes
def process_program(self, msg):
logger.info("Processing program: {}".format(msg))
import news_report
logger.info("for station {}".format(self.station.name))
self.program = news_report.News(3, self.station)
# respond to db changes
def process_db(self, msg):
change_dict = json.loads(msg[1]) #pull payload from message
logger.info("Processing db change: {}".format(change_dict))
logger.info("about to test if conditions right to launch a program")
if (change_dict['operation'] == 'insert' or change_dict['operation'] == 'update') and isodate.parse_datetime(change_dict['start_time']) <= datetime.now():
logger.info("We have successful conditions to launch a program!")
import news_report
self.program = news_report.News(3, self.station.id)
time.sleep(3) #this should really be a callback! see below for process_connected_transmitter()
self.program.report()
self.program.teardown()
# respond to successful connect to transmitter phone
def process_connected_transmitter(self, msg):
pass
# self test of message server to see if daemons are receiving
def test_receivers():
port = MESSAGE_QUEUE_PORT_WEB
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
message_topics = ['station.7.program', 'station.7.call','station.7.program', 'station.7.program','sms.station.6', 'call.station.6']
topic = message_topics[random.randrange(0, len(message_topics))]
dicked = {'this':'that',"if":'then', "1":1,"2":2}
messagedata = json.dumps(dicked)
print "%s %s" % (topic, messagedata)
socket.send("%s %s" % (topic, messagedata))
time.sleep(1)
# Silly launch of fake daemons
#stations = db.session.query(Station).all()
#daemons = []
#for i in stations:
# daemons.append(StationDaemon(i.id))
#test_receivers()