-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotify-aurora.py
81 lines (58 loc) · 2.56 KB
/
notify-aurora.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import logging
import os
import sys
import re
import requests
import json
from datetime import date, timedelta
from bs4 import BeautifulSoup
from imap_tools import MailBox, AND
from urlextract import URLExtract
logging.basicConfig(stream=sys.stdout, format='{"message": "%(message)s"}', level=logging.INFO)
def get_aurora_forecast():
raw_data = requests.get('https://services.swpc.noaa.gov/text/3-day-forecast.txt').content.decode('utf-8')
kp_lines = re.findall("^.*[0-9]{2}-[0-9]{2}UT.*$", raw_data, re.MULTILINE)
kp_days = [
date.today().strftime("%d/%m") + ': ',
(date.today() + timedelta(days=1)).strftime("%d/%m") + ': ',
(date.today() + timedelta(days=2)).strftime("%d/%m") + ': ']
for kp_line in kp_lines:
logging.info(kp_line)
kp_numbers = re.findall(' [0-9.]+ ', kp_line)
kp_days[0] += round_kp(kp_numbers[0]) + " "
kp_days[1] += round_kp(kp_numbers[1]) + " "
kp_days[2] += round_kp(kp_numbers[2]) + " "
kp_days_stripped = [item.strip() for item in kp_days]
logging.info(kp_days_stripped)
return "Kp for 3hr blocks UTC time.\n" + "\n".join(kp_days_stripped)
def round_kp(kp_index):
return str(round(float(kp_index.strip()), 1))
def extract_map_share_url(text):
urls = URLExtract().find_urls(text)
for url in urls:
if re.search("txtmsg", url):
logging.info('sending via ' + url)
return url
def create_map_share_payload(url, text):
soup = BeautifulSoup(requests.get(url).content, 'html.parser')
message_id = soup.find("input", {"id": "MessageId"}).get('value')
guid = soup.find("input", {"id": "Guid"}).get('value')
reply_address = soup.find("input", {"id": "ReplyAddress"}).get('value')
return {'ReplyAddress': reply_address,
'ReplyMessage': text,
'MessageId': message_id,
'Guid': guid}
def notify_map_share(url, text):
payload = create_map_share_payload(url, text)
logging.debug(payload)
session = requests.Session()
response = session.post(
'https://eur.explore.garmin.com/TextMessage/TxtMsg',
headers={'User-Agent': 'Mozilla/5.0'},
data=payload)
logging.info(response.headers)
with MailBox(os.getenv('IMAP_URL')).login(os.getenv('IMAP_LOGIN'), os.getenv('IMAP_PASSWORD'), 'INBOX') as mailbox:
forecast_text = get_aurora_forecast()
for msg in mailbox.fetch(AND(from_=os.getenv('IMAP_FROM', '[email protected]'), new=True)):
map_share_url = extract_map_share_url(msg.text)
notify_map_share(map_share_url, forecast_text)