-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvaccineChecker.py
More file actions
executable file
·630 lines (513 loc) · 24 KB
/
vaccineChecker.py
File metadata and controls
executable file
·630 lines (513 loc) · 24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
#!/usr/bin/python3
# standard libraries
import enum
import traceback
import requests
import signal
import argparse
import inspect
import os
import time
import smtplib
import sys
import random
import urllib3
import syslog
import json
import re
import urllib3
from datetime import datetime
from datetime import timedelta
from email.mime.text import MIMEText
# non-standard libraries
import schedule
PROGRAM_DESCRIPTION="""
README:
This program is a daemon for inspecting vaccine provider websites and outputting
their availability status to an output `status.json` file.
This program always expects a valid 'websites.json' file passed as the
--websites argument. This file defines the websites to query.
An example file `input/websites.json` is provided in this source tree.
The sites in `websites.json` can be one of four `type` values:
* `phrase` : Looks for the presence or absence of phrases specified by `pos_phrase` or `neg_phrase`.
* `cvs`: Queries the `cvs.com` website with with the `state` and `city` parameters supplied.
* `heb`: Queries the `heb.com` website with with the `query` parameter supplied.
* `walgreens`: Queries the `walgreens.com` website with with the `query` parameter supplied.
If the argument --notification-rate is passed, this program expects a
valid 'credentials.json' file specified by the --credentials argument.
This file should contain the authentication credentials for an SMTP server
and login and email recipients for status messages to be sent to. Example
'credentials.json' file:
{
"email" : "foo@gmail.com",
"password" : "bar",
"recipients" : "foobar@tmomail.net, myname@yahoo.com",
"smtp_host" : "smtp.myserver.com",
"smtp_port" : 465
}
EXAMPLE USE (Command Line):
See 'vaccineChecker.py --help' for full argument set.
# run the daemon with a request rate of 5 minutes (300 seconds), outputting
# status to the default 'status' directory
./vaccineChecker.py --websites input/websites.json --request-rate 300
# run the daemon with a request rate of 10 minutes (600 seconds), outputting
# status to the 'output' directory, sending a periodic update of status to
# the email supplied in 'credentials.json'
./vaccineChecker.py --websites input/websites.json --request-rate 500 --output-dir output --credentials input/credentials.json
REQUIREMENTS:
This script was developed with Python 3.4.3 and the packages as specified
by the import directives.
If the 'walgreens' type in 'websites.json' is specified, the 'selenium' Python package
and the 'geckodriver' OS package is needed.
"""
'''
Handle Ctrl+C
'''
def SignalHandler(sig, frame):
print("INFO: Program interrupted via Ctrl-C. Exiting")
sys.exit(0)
'''
Enum class for provider status.
'''
class Availability(enum.Enum):
PROBABLY_NOT = "probably not"
MAYBE = "maybe"
PROBABLY = "probably"
'''
Primary class.
'''
class vaccineChecker(object):
##############################################
# to be populated by read of credentials.json
EMAIL = ""
PASSWORD = ""
RECIPIENTS = []
SMTP_HOST = ""
SMTP_PORT = 0
##############################################
MIN_REQUEST_RATE = 5 # seconds
MAX_ATTEMPTS = 0 # maximum runs of main while loop. 0 = run forever.
TIMEOUT = 10 # website access timeout (seconds)
m_attempts = 0 # total runs of main while loop
# command line arguments
m_websitesFile = "" # location of 'websites.json'
m_outputDir = "" # the directory were status.json gets written to
m_credentialsFile = "" # location of 'credentials.json'
m_notificationRate = 0 # how often, in minutes, we should send a emailed notification with script status
m_enableArchive = False # whether or not files should be written as archives in m_outputDir
m_requestRate = 0 # how often, in seconds, we should ask for website status
m_verbose = False # if set to true, prints out function name and process ID when logging
# initially populated with 'websites.json', but then updated continuously
m_websites = {}
# selenium object for accessing sites that require special navigation
m_sd = object()
'''
Setup.
'''
def __init__(self, websitesFile, outputDir, credentialsFile, notificationRate, enableArchive, requestRate, verbose):
self.DEBUG("INFO: Initializing....")
self.m_websitesFile = websitesFile
self.m_outputDir = outputDir
self.m_credentialsFile = credentialsFile
self.m_notificationRate = notificationRate
self.m_enableArchive = enableArchive
self.m_requestRate = requestRate
self.m_verbose = verbose
urllib3.disable_warnings() # for ignoring InsecureRequestWarning for https
# if configured, for confirmation things are going ok, send a text/email
if (0 != self.m_notificationRate):
self.read_credentials()
self.DEBUG("INFO: --notification-rate passed, configuring to send heartbeat message every %d minutes" % (self.m_notificationRate))
self.heartbeat()
schedule.every(self.m_notificationRate).minutes.do(self.heartbeat)
else:
self.DEBUG("INFO: --notification-rate not passed, no notifications will be sent.")
self.read_websites()
# check for selenium
for name, info in self.m_websites.items():
site = self.m_websites[name]
# currently only Walgreens requires selenium
if "walgreens" == site['type'].lower():
self.DEBUG("INFO: Setting up Python package 'selenium' for queries requiring user navigation (i.e Walgreens)...")
self.selenium_setup()
'''
Utility function for setting up selenium (needed for navigation on websites).
'''
def selenium_setup(self):
from selenium import webdriver
options = webdriver.firefox.options.Options()
options.headless = True
self.DEBUG("INFO: Creating selenium object...")
# assumes 'geckodriver' binary is in path
self.m_sd = webdriver.Firefox(options=options)
self.m_sd.set_page_load_timeout(30)
self.DEBUG("INFO: Done setting up selenium.")
'''
Utility function for logging. Send to standard out and syslog.
'''
def DEBUG(self, x):
if (self.m_verbose):
frame,filename,line_number,function_name,lines,index = inspect.stack()[1]
logLine = "[%s][%s|%s|%s] %s\n" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), os.getpid(), function_name, line_number, x)
else:
logLine = "[%s] %s\n" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), x)
sys.stdout.write(logLine)
syslog.syslog(logLine)
'''
Read in credentials.json.
'''
def read_credentials(self):
filename = self.m_credentialsFile
example = '''
{
"email" : "foo@gmail.com",
"password" : "bar",
"recipients" : "foobar@tmomail.net,myname@yahoo.com",
"smtp_host" : "smtp.myserver.com",
"smtp_port" : 465
}
'''
if (not os.path.exists(filename)):
self.DEBUG("ERROR: " + filename + ' file not found, exiting. example file contents: ' + example)
sys.exit(-1)
try:
f = open(filename)
c = json.loads(f.read())
f.close()
self.EMAIL = c['email']
self.PASSWORD = c['password']
self.RECIPIENTS = c['recipients']
self.SMTP_HOST = c['smtp_host']
self.SMTP_PORT = c['smtp_port']
self.DEBUG("INFO: Successfully read credentials file.")
except Exception as e:
self.DEBUG("ERROR: Problem reading file " + filename + '. valid example content: ' + example)
self.DEBUG(traceback.format_exc())
sys.exit(-1)
'''
Read in websites.json.
'''
def read_websites(self):
filename = self.m_websitesFile
example = '''
{
"name": "UT Health San Antonio",
"type" : "phrase",
"website": "https://schedule.utmedicinesa.com/Identity/Account/Register",
"neg_phrase": "are full",
"pos_phrase": "you confirm your understanding",
}
'''
if (not os.path.exists(filename)):
self.DEBUG("ERROR: " + filename + ' file not found, exiting. example file contents: ' + example)
sys.exit(-1)
try:
f = open(filename)
self.m_websites = json.loads(f.read())
# initialize things the user doesn't supply
for name, info in self.m_websites.items():
site = self.m_websites[name]
if "type" not in site:
self.DEBUG("ERROR: Each site in 'websites.json' must have a 'type'. See README.md")
sys.exit(-1)
if "status" not in site:
site["status"] = Availability.PROBABLY_NOT.value
if "update_time" not in site:
site["update_time"] = ""
f.close()
except Exception as e:
self.DEBUG("ERROR: Problem reading file " + filename + '. valid example content: ' + example)
self.DEBUG(traceback.format_exc())
sys.exit(-1)
'''
Given a string, logs it. If notifications are enabled, it sends an email to RECIPIENTS,
using the credentials in EMAIL and PASSWORD.
'''
def send_message(self, s):
self.DEBUG(s)
if (self.m_notificationRate == 0):
return
if (self.m_verbose):
frame,filename,line_number,function_name,lines,index = inspect.stack()[1]
m = "[%s]\n[%s|%s|%s]\n%s\n%s\n" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), os.getpid(), function_name, line_number, s, "http://sanantoniovaccine.com")
else:
m = "[%s]\n%s" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), s)
msg = MIMEText(m)
msg['Subject'] = os.path.basename(__file__) # script name
msg['From'] = self.EMAIL
msg['To'] = (', ').join(self.RECIPIENTS.split(','))
self.DEBUG("INFO: Attempting to send email with '%s'..." % (s))
server = smtplib.SMTP_SSL(self.SMTP_HOST, self.SMTP_PORT)
server.login(self.EMAIL, self.PASSWORD)
server.sendmail(self.EMAIL, self.RECIPIENTS.split(','), str(msg))
server.quit()
self.DEBUG("INFO: Successfully sent email to '%s'!" % self.RECIPIENTS)
'''
Utility function to let someone know the script is running a-ok.
'''
def heartbeat(self):
self.send_message("INFO: Heartbeat. m_attempts = '%d'." % (self.m_attempts))
'''
Handle when a website status changes (i.e. from Availability.PROBABLY_NOT to Availability.MAYBE)
'''
def handle_status(self, status, name, html):
site = self.m_websites[name]
if status.value != site['status']:
self.send_message("INFO: %s (%s) changed to %s" % (name, site['website'], status))
# save off HTML if passed
if "" != html and self.m_enableArchive:
archive_dir = self.m_outputDir + "/archive"
if (not os.path.exists(archive_dir)):
os.makedirs(archive_dir)
filename = archive_dir + "/" + name + ".html." + (datetime.now().strftime("%Y-%m-%d_%H%M%S"))
f = open(filename, "w")
f.write(html)
f.close()
self.DEBUG("INFO: Archiving file '%s'" % (filename))
else:
self.DEBUG("INFO: still %s for %s" % (status, name))
site['status'] = status.value
'''
For querying the Walgreens page for availability. Requires 'selenium' to be installed.
'''
def query_walgreens(self, name):
from selenium.common.exceptions import NoSuchElementException
site = self.m_websites[name]
s = "https://www.walgreens.com/findcare/vaccination/covid-19"
self.DEBUG("INFO: Requesting site '%s'" % (s))
self.m_sd.get(s)
btn = self.m_sd.find_element_by_css_selector('span.btn.btn__blue')
btn.click()
s = "https://www.walgreens.com/findcare/vaccination/covid-19/location-screening"
self.DEBUG("INFO: Requesting site '%s'" % (s))
self.m_sd.get(s)
element = self.m_sd.find_element_by_id("inputLocation")
element.clear()
q = site['query']
self.DEBUG("INFO: Asking Walgreens about the location '%s'" % (q))
element.send_keys(q)
button = self.m_sd.find_element_by_css_selector("button.btn")
button.click()
time.sleep(0.75)
timeout = time.time() + 30 # 30 sec timeout
self.DEBUG("INFO: Waiting for Walgreens result...")
response = object()
while True:
if (time.time() > timeout):
self.DEBUG("WARNING: Timeout waiting for Walgreens result, continuing")
try:
response = self.m_sd.find_element_by_css_selector("p.fs16")
break
except NoSuchElementException:
time.sleep(0.5)
self.DEBUG("INFO: Found Walgreens result '%s'" % (response.text))
if "Appointments unavailable" == response.text:
self.handle_status(Availability.PROBABLY_NOT, name, "")
elif "Please enter a valid city and state or ZIP" == response.text:
self.handle_status(Availability.PROBABLY_NOT, name, "")
else:
self.handle_status(Availability.MAYBE, name, "")
'''
For querying the CVS page for availability.
'''
def query_cvs(self, name):
self.DEBUG("INFO: Requesting information from CVS...")
site = self.m_websites[name]
state = site['state']
city = site['city']
response = requests.get("https://www.cvs.com/immunizations/covid-19-vaccine.vaccine-status.{}.json?vaccineinfo".format(state.lower()), headers={"Referer":"https://www.cvs.com/immunizations/covid-19-vaccine"})
payload = response.json()
self.DEBUG("INFO: Received response, parsing information from CVS...")
mappings = {}
try:
for item in payload["responsePayloadData"]["data"][state]:
mappings[item.get('city')] = item.get('status')
response = mappings[city.upper()]
if ("Fully Booked" == response):
self.handle_status(Availability.PROBABLY_NOT, name, "")
self.DEBUG("INFO: Found 'fully booked'")
else:
self.handle_status(Availability.MAYBE, name, "")
except KeyError as e:
self.handle_status(Availability.PROBABLY_NOT, name, "")
self.DEBUG("WARNING: Could not find state '%s' or city '%s' in CVS response" % (state, city))
'''
For querying the HEB page for availability.
'''
def query_heb(self, name):
self.DEBUG("INFO: Requesting information from HEB...")
site = self.m_websites[name]
d = requests.get("http://heb-ecom-covid-vaccine.hebdigital-prd.com/vaccine_locations.json").json()
self.DEBUG("INFO: Received response, parsing information from HEB...")
city = site['city'].upper()
try:
foundOne = False
for location in d['locations']:
if location["city"].upper() == city and location["openTimeslots"] != 0:
self.DEBUG("INFO: Found a match at HEB for '%s'! Zip code: '%s'. Open timeslots: %d" % (city, location['zip'], location['openTimeslots']))
self.handle_status(Availability.MAYBE, name, "")
foundOne = True
if not foundOne:
self.handle_status(Availability.PROBABLY_NOT, name, "")
except KeyError as e:
self.handle_status(Availability.PROBABLY_NOT, name, "")
self.DEBUG("WARNING: Could not find city '%s' in HEB response" % (city))
'''
primary loop. query the self.m_websites and keep track of status.
'''
def run(self):
# primary loop
while self.m_attempts < self.MAX_ATTEMPTS or self.MAX_ATTEMPTS == 0:
for name, info in self.m_websites.items():
site = self.m_websites[name]
try:
# special cases that require website navigation
if ("walgreens" == site['type'].lower()):
self.query_walgreens(name)
elif ("cvs" == site['type'].lower()):
self.query_cvs(name)
elif ("heb" == site['type'].lower()):
self.query_heb(name)
# regular case of looking at a confirmation/absence of phrase in HTML via use
# of 'pos_phrase' / 'neg_phrase'
elif ("phrase" == site['type'].lower()):
self.DEBUG("INFO: asking %s at %s ..." % (name, info['website']))
r = requests.get(site['website'], timeout=self.TIMEOUT, verify=False)
html = re.sub("(<!--.*?-->)", "", r.text, flags=re.DOTALL) # remove HTML comments, outdated information sometimes lives here
if site['pos_phrase'] != "" and site['pos_phrase'] in html:
self.handle_status(Availability.PROBABLY, name, html)
elif site['neg_phrase'] in html:
self.handle_status(Availability.PROBABLY_NOT, name, html)
else:
self.handle_status(Availability.MAYBE, name, html)
else:
self.DEBUG("WARNING: Type '%s' for website '%s' not found, skipping..." % (site['type'], site['website']))
continue
site['update_time'] = time.strftime("%d-%b-%Y %I:%M:%S %p")
except Exception as e:
if isinstance(e, requests.exceptions.Timeout):
self.DEBUG("WARNING: Timeout: " + str(e) + "...continuing")
continue
else:
self.DEBUG(traceback.format_exc())
self.DEBUG(("ERROR: Error when querying '%s'. Error type %s : %s" % (name, type(e).__name__, str(e))))
# currently only Walgreens requires selenium
if (site['type'].lower() == "walgreens"):
self.DEBUG("INFO: Resetting selenium...");
self.selenium_setup()
continue
self.handle_status(Availability.PROBABLY_NOT, name, "")
try:
# populate the file we use for communication with PHP
if (not os.path.exists(self.m_outputDir)):
os.makedirs(self.m_outputDir)
STATUS_JSON_FILENAME = "status.json"
filename = self.m_outputDir + "/" + STATUS_JSON_FILENAME
content = json.dumps(self.m_websites, indent=4)
f = open(filename, "w")
f.write(content)
f.close()
self.DEBUG("INFO: Wrote '%s'" % (filename))
# save off all that we make
if self.m_enableArchive:
archive_dir = self.m_outputDir + "/archive"
if (not os.path.exists(archive_dir)):
os.makedirs(archive_dir)
filename = archive_dir + "/" + STATUS_JSON_FILENAME + "." + (datetime.now().strftime("%Y-%m-%d_%H%M%S"))
f = open(filename, "w")
f.write(content)
f.close()
self.DEBUG("INFO: Archiving file %s" % (filename))
# give the good servers some time to rest
VARIANCE = 10 # seconds
sleeptime = random.randint(max(self.MIN_REQUEST_RATE, self.m_requestRate - VARIANCE), self.m_requestRate + VARIANCE)
self.DEBUG("INFO: checking again in %d seconds (%s)..." % (sleeptime, timedelta(seconds=sleeptime)))
time.sleep(sleeptime)
schedule.run_pending()
self.m_attempts += 1
except Exception as e:
self.DEBUG(traceback.format_exc())
self.send_message("Error during processing of type %s : %s ... exiting." % (type(e).__name__, str(e)))
sys.exit(-1)
self.DEBUG("INFO: All done. Bye!")
if __name__ == "__main__":
signal.signal(signal.SIGINT, SignalHandler)
parser = argparse.ArgumentParser(description=PROGRAM_DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
'--websites',
action="store",
dest="websitesFile",
help="The location of 'websites.json`. See README at the top of vaccineChecker.py for format.",
required=True,
metavar="[x]",
default="input/websites.json")
parser.add_argument(
'--output-dir',
action="store",
dest="outputDir",
help="The directory where 'status.json', which is read by 'index.php', will be written to. If --archive is passed, archives of of status will be written in an 'archive' subdirectory. Default directory is 'status'.",
required=False,
metavar="[x]",
default="status")
parser.add_argument(
'--credentials',
action="store",
dest="credentialsFile",
help="The location of 'credentials.json`. See README at the top of vaccineChecker.py for format.",
required=False,
metavar="[x]",
default="")
parser.add_argument(
'--notification-rate',
action="store",
dest="notificationRate",
help="If passed, defines how often, in minutes, the program will email the 'recipients' field in 'credentials.json' to send a periodic update of status and/or errors.",
required=False,
metavar="[x]",
default="")
parser.add_argument(
'--archive',
action="store_true",
dest="enableArchive",
help="If enabled, will write archives of 'status.json' and changed website HTML content to the directory specified in --output-dir",
required=False,
default=False)
parser.add_argument(
'--request-rate',
action="store",
dest="requestRate",
help="How often, in seconds, the status will be requested from the sites in 'websites.json'. Default is 300 seconds (5 minutes). Up to a 10 second jitter is intentionally added every request.",
required=False,
metavar="[x]",
default=5*60)
parser.add_argument(
'--verbose',
action="store_true",
dest="verbose",
help="If passed, prints out function name and process ID when logging.",
required=False,
default=False)
args = parser.parse_args()
try:
args.requestRate = int(args.requestRate)
except Exception as e:
print("ERROR: --request-rate must be a number")
sys.exit(-1)
if ("" != args.credentialsFile and "" == args.notificationRate):
print("INFO: 'credentials.json' location specified, but --notification-rate not passed. Assuming default notification rate of 60 minutes.")
args.notificationRate = "60"
if ("" != args.notificationRate and "" == args.credentialsFile):
print("ERROR: --credentials file location must be passed when --notification-rate is supplied")
sys.exit(-1)
if (args.notificationRate != ""):
try:
args.notificationRate = int(args.notificationRate)
if (args.notificationRate < 0):
raise Exception()
except Exception as e:
print("ERROR: --notification-rate must be a positive number")
sys.exit(-1)
else:
args.notificationRate = 0
vc = vaccineChecker(args.websitesFile, args.outputDir, args.credentialsFile, args.notificationRate, args.enableArchive, args.requestRate, args.verbose)
vc.run()