-
Notifications
You must be signed in to change notification settings - Fork 182
/
ws.py
executable file
·343 lines (286 loc) · 14.1 KB
/
ws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
#!/usr/bin/env python3
# coding=utf-8
# requires https://pypi.python.org/pypi/websocket-client/
from excepthook import uncaught_exception, install_thread_excepthook
import sys
sys.excepthook = uncaught_exception
install_thread_excepthook()
# !! Important! Be careful when adding code/imports before this point.
# Our except hook is installed here, so any errors before this point
# won't be caught if they're not in a try-except block.
# Hence, please avoid adding code before this comment; if it's necessary,
# test it thoroughly.
import os
import platform
# noinspection PyPackageRequirements
import websocket
from threading import Thread
import traceback
from bodyfetcher import BodyFetcher
import chatcommunicate
from datetime import datetime
from spamhandling import check_if_spam_json
from globalvars import GlobalVars
from datahandling import (load_pickle, PICKLE_STORAGE, load_files, filter_auto_ignored_posts,
refresh_site_id_dict_if_needed_and_get_issues)
from metasmoke import Metasmoke
from metasmoke_cache import MetasmokeCache
from deletionwatcher import DeletionWatcher
from editwatcher import EditWatcher
import json
import time
import requests
import dns.resolver
# noinspection PyPackageRequirements
from tld.utils import update_tld_names, TldIOError
from helpers import (exit_mode, log, Helpers, log_exception, add_to_global_bodyfetcher_queue_in_new_thread,
tell_debug_rooms_recovered_websocket)
from flovis import Flovis
from tasks import Tasks
import chatcommands
MAX_SE_WEBSOCKET_RETRIES = 5
# Python 3.6.0 is the bare minimum needed to run SmokeDetector.
MIN_PYTHON_VERSION = (3, 6, 0) # Below this version we know SmokeDetector has issues.
# Our CI testing is actually on the latest patch (dot dot) release of the minor
# version specified in MIN_TESTED_PYTHON_VERSION.
# MIN_TESTED_PYTHON_VERSION is the earliest version on which we do CI testing as of the last time this
# was manually updated. The version on which we *actually* do CI testing is the latest release, whatever that
# is at the time the test is run and GitHub Actions has updated their Python configurations. It is
# expected that the version mentioned below will not be updated on a regular basis and that it *will be*
# out of date most of the time.
MIN_TESTED_PYTHON_VERSION = (3, 9, 20)
THIS_PYTHON_VERSION = tuple(map(int, platform.python_version_tuple()))
MIN_ELAPSED_SEND_SITE_ID_ISSUES_TO_CHAT = 2 * 60 * 60 # 2 hours in seconds
if os.path.isfile("plugin.py"):
try:
import plugin
except Exception:
exc_type, exc_obj, exc_tb = sys.exc_info()
error_msg = "{}: {}\n{}".format(exc_type.__name__, exc_obj, traceback.format_tb(exc_tb))
log('warning', "Error while importing plugin:\n" + error_msg)
# Ignore and move on
levels = {
'debug': 0,
'info': 1,
'warning': 2,
'error': 3
}
if any('--loglevel' in x for x in sys.argv):
idx = ['--loglevel' in x for x in sys.argv].index(True)
arg = sys.argv[idx].split('=')
if len(arg) >= 2:
Helpers.min_log_level = levels[arg[-1]]
else:
Helpers.min_log_level = 0
else:
Helpers.min_log_level = 0
if THIS_PYTHON_VERSION < MIN_PYTHON_VERSION:
msg = "SmokeDetector requires Python version {0}.{1}.{2} or newer to run.".format(*MIN_PYTHON_VERSION)
raise RuntimeError(msg)
# However, 3.5 is already deprecated so we need to prepare for this
# with a warning in the logs about it.
if THIS_PYTHON_VERSION < MIN_TESTED_PYTHON_VERSION:
msg = ('SmokeDetector is tested on the latest released version of Python {0}.{1}.'
' SmokeDetector may or may not work with Python versions earlier than that.'
' Code changes which break SmokeDetector in earlier Python versions may be'
' made without notice.'
' Please consider upgrading your instances of'
' SmokeDetector to use Python {0}.{1}.{2} or newer.'.format(*MIN_TESTED_PYTHON_VERSION))
log('warning', msg)
if not GlobalVars.metasmoke_host:
log('info', "metasmoke host not found. Set it as metasmoke_host in the config file. "
"See https://github.com/Charcoal-SE/metasmoke.")
if not GlobalVars.metasmoke_key:
log('info', "No metasmoke key found, which is okay if both are running on the same host")
if not GlobalVars.metasmoke_ws_host:
log('info', "No metasmoke websocket host found, which is okay if you're anti-websocket")
# Initiate DNS
#
# Based on additional research, at this point in the code *nothing* has done anything from a
# DNS or network resolution perspective - not for WebSockets nor for dnspython and the
# default resolver in it. Since this activates and initializes the DNS *long* before
# the chat or metasmoke websockets have been initiated, this is a 'safe space' to
# begin initialization of the DNS data.
if GlobalVars.dns_nameservers != 'system':
dns.resolver.get_default_resolver().nameservers = GlobalVars.config.dns_nameservers.split(',')
if GlobalVars.dns_cache_enabled:
dns.resolver.get_default_resolver().cache = dns.resolver.Cache(GlobalVars.dns_cache_interval)
# noinspection PyProtectedMember
def restart_automatically():
Metasmoke.send_statistics()
chatcommunicate.tell_rooms_with("debug", "{}: Executing automatic scheduled reboot.".format(GlobalVars.location))
time.sleep(6)
exit_mode("reboot")
def load_ms_cache_data():
"""
Load cached data from a pickle file on disk. Should really only need to be called once, on startup.
:returns: None
"""
if os.path.isfile(os.path.join(PICKLE_STORAGE, 'metasmokeCacheData.p')):
data = load_pickle('metasmokeCacheData.p')
MetasmokeCache._cache = data['cache']
MetasmokeCache._expiries = data['expiries']
# Restart after 6 hours, put this thing here so it doesn't get stuck at updating TLD or logging in indefinitely
Tasks.later(restart_automatically, after=21600)
try:
update_tld_names()
except TldIOError as ioerr:
# That we were unable to update the TLD names isn't actually a fatal error, so just log it and continue.
strerror = str(ioerr)
if "permission denied:" in strerror.lower():
if "/usr/local/lib/python" in strerror and "/dist-packages/" in strerror:
err_msg = "WARNING: Cannot update TLD names, due to `tld` being system-wide installed and not " \
"user-level installed. Skipping TLD names update. \n"
if "/home/" in strerror and ".local/lib/python" in strerror and "/site-packages/tld/" in strerror:
err_msg = "WARNING: Cannot read/write to user-space `tld` installation, check permissions on the " \
"path. Skipping TLD names update. \n"
else:
err_msg = strerror
elif "certificate verify failed" in strerror.lower():
# Ran into this error in testing on Windows, best to throw a warn if we get this...
err_msg = "WARNING: Cannot verify SSL connection for TLD names update; skipping TLD names update."
else:
err_msg = strerror
log_exception(type(ioerr), ioerr, err_msg, True, log_level="warning")
if "ChatExchangeU" in os.environ:
log('debug', "ChatExchange username loaded from environment")
username = os.environ["ChatExchangeU"]
elif GlobalVars.chatexchange_u:
log('debug', "ChatExchange username loaded from config")
username = GlobalVars.chatexchange_u
else:
log('error', "No ChatExchange username provided. Set it in config or provide it via environment variable")
exit_mode("shutdown")
if "ChatExchangeP" in os.environ:
log('debug', "ChatExchange password loaded from environment")
password = os.environ["ChatExchangeP"]
elif GlobalVars.chatexchange_p:
log('debug', "ChatExchange password loaded from config")
password = GlobalVars.chatexchange_p
else:
log('error', "No ChatExchange password provided. Set it in config or provide it via environment variable")
exit_mode("shutdown")
# We need an instance of bodyfetcher before load_files() is called
GlobalVars.bodyfetcher = BodyFetcher()
if GlobalVars.flovis_host:
GlobalVars.flovis = Flovis(GlobalVars.flovis_host)
load_files()
load_ms_cache_data()
filter_auto_ignored_posts()
GlobalVars.standby_mode = "standby" in sys.argv
GlobalVars.no_se_activity_scan = 'no_se_activity_scan' in sys.argv
GlobalVars.no_deletion_watcher = 'no_deletion_watcher' in sys.argv
GlobalVars.no_edit_watcher = 'no_edit_watcher' in sys.argv
GlobalVars.no_chat_ws_activity_timeout = 'no_chat_ws_activity_timeout' in sys.argv
chatcommunicate.init(username, password)
Tasks.periodic(Metasmoke.send_status_ping_and_verify_scanning_if_active, interval=60)
if GlobalVars.standby_mode:
chatcommunicate.tell_rooms_with("debug", GlobalVars.standby_message)
Metasmoke.send_status_ping()
while GlobalVars.standby_mode:
time.sleep(3)
chatcommunicate.join_command_rooms()
se_site_id_issues = refresh_site_id_dict_if_needed_and_get_issues()
if (se_site_id_issues):
send_se_site_id_issues_to_chat = False
with GlobalVars.site_id_dict_lock:
if GlobalVars.site_id_dict_issues_into_chat_timestamp + MIN_ELAPSED_SEND_SITE_ID_ISSUES_TO_CHAT >= time.time():
GlobalVars.site_id_dict_issues_into_chat_timestamp = time.time()
send_se_site_id_issues_to_chat = True
if send_se_site_id_issues_to_chat:
chatcommunicate.tell_rooms_with("debug", " ".join(se_site_id_issues))
# noinspection PyProtectedMember
def check_socket_connections():
socket_failure = False
with chatcommunicate._clients_lock:
for client in chatcommunicate._clients.values():
if client.last_activity and (datetime.utcnow() - client.last_activity).total_seconds() >= 60:
socket_failure = True
if socket_failure:
exit_mode("socket_failure")
if not GlobalVars.no_chat_ws_activity_timeout:
Tasks.periodic(check_socket_connections, interval=90)
log('info', '{} active'.format(GlobalVars.location))
log('info', 'MS host: {}'.format(GlobalVars.metasmoke_host))
def setup_websocket(attempt, max_attempts):
try:
ws = websocket.create_connection(GlobalVars.se_websocket_url, timeout=GlobalVars.se_websocket_timeout)
ws.send("155-questions-active")
return ws
except websocket.WebSocketException:
log('warning', 'WS failed to create SE websocket connection. Attempt {} of {}.'.format(attempt, max_attempts))
return None
def init_se_websocket_or_reboot(max_tries, tell_debug_room_on_error=False):
for tries in range(1, 1 + max_tries, 1):
ws = setup_websocket(tries, max_tries)
if ws:
break
else:
# Wait and hopefully network issues will be solved
time.sleep(10)
else:
error_message = 'SE WebSocket: Max retries exceeded. Exiting, maybe a restart will kick things.'
log('error', error_message)
if tell_debug_room_on_error:
chatcommunicate.tell_rooms_with("debug", error_message)
time.sleep(6) # Make it more likely the message is actually sent to the rooms prior to rebooting.
exit_mode("reboot")
return ws
if not GlobalVars.no_se_activity_scan:
ws = init_se_websocket_or_reboot(MAX_SE_WEBSOCKET_RETRIES)
ws_connect_time = time.time()
ws_hb_time = None
GlobalVars.deletion_watcher = DeletionWatcher()
GlobalVars.edit_watcher = EditWatcher()
if "first_start" in sys.argv:
chatcommunicate.tell_rooms_with('debug', GlobalVars.s if GlobalVars.on_branch else GlobalVars.s_reverted)
Tasks.periodic(Metasmoke.send_statistics, interval=600)
metasmoke_ws_t = Thread(name="metasmoke websocket", target=Metasmoke.init_websocket)
metasmoke_ws_t.start()
while not GlobalVars.no_se_activity_scan:
try:
a = ws.recv()
if a is not None and a != "":
message = json.loads(a)
action = message["action"]
if action == "hb":
ws_hb_time = time.time()
ws.send("hb")
if action == "155-questions-active":
data = json.loads(message['data'])
hostname = data['siteBaseHostAddress']
question_id = data['id']
if GlobalVars.flovis is not None:
GlobalVars.flovis.stage('received', hostname, question_id, json.loads(a))
is_spam = False
if GlobalVars.bodyfetcher.special_cases.get(hostname, GlobalVars.bodyfetcher.threshold) > 1:
# If the queue threshold depth is 1 and there are no special cases, then there's not
# much benefit to pre-testing, as there isn't a wait for the queue to fill to the threshold.
# The site will, however, be behind any site which is already queued.
is_spam, reason, why = check_if_spam_json(a)
add_to_global_bodyfetcher_queue_in_new_thread(hostname, question_id, True if is_spam else None,
source="155-questions-active")
GlobalVars.edit_watcher.subscribe(hostname=hostname, question_id=question_id)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
now = datetime.utcnow()
delta = now - GlobalVars.startup_utc_date
seconds = delta.total_seconds()
tr = traceback.format_exc()
exception_only = ''.join(traceback.format_exception_only(type(e), e)).strip()
n = os.linesep
logged_msg = str(now) + " UTC" + n + exception_only + n + tr + n + n
log('error', logged_msg)
log_exception(exc_type, exc_obj, exc_tb)
if seconds < 180 and exc_type not in {websocket.WebSocketConnectionClosedException, requests.ConnectionError}:
# noinspection PyProtectedMember
exit_mode("early_exception")
if not GlobalVars.no_se_activity_scan:
ws.close() # Close the prior WebSocket, if open.
ws = init_se_websocket_or_reboot(MAX_SE_WEBSOCKET_RETRIES, tell_debug_room_on_error=True)
tell_debug_rooms_recovered_websocket("main SE", e, ws_connect_time, ws_hb_time)
ws_connect_time = time.time()
ws_hb_time = None
while GlobalVars.no_se_activity_scan:
# Sleep for longer than the automatic restart
time.sleep(30000)