-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsend_email.py
122 lines (107 loc) · 4.55 KB
/
send_email.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
import os
import sys
import time
import json
import pickle
import logging
import logging.handlers
from traceback import print_exc
from pathlib import Path
import elasticsearch
# squelch warnings when importing htcondor
os.environ["CONDOR_CONFIG"] = os.environ.get("CONDOR_CONFIG", "/dev/null")
import accounting
from accounting.push_totals_to_es import push_totals_to_es
args = accounting.parse_args(sys.argv[1:])
logger = logging.getLogger("accounting")
logger.setLevel(logging.INFO)
if args.debug:
logger.setLevel(logging.DEBUG)
if args.log_file is not None:
fh = logging.handlers.RotatingFileHandler(str(args.log_file), backupCount=9, maxBytes=10_000_000)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
fh.setLevel(logger.getEffectiveLevel())
logger.addHandler(fh)
logger.info(f"=== {sys.argv[0]} STARTING UP ({' '.join(sys.argv[1:])}) ===")
if not args.quiet:
sh = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(message)s")
sh.setFormatter(formatter)
sh.setLevel(logger.getEffectiveLevel())
logger.addHandler(sh)
if args.report_period != "daily" or not args.restart:
for tries in range(3):
try:
logger.info(f"Filtering data using {args.filter.__name__}")
filtr = args.filter(**vars(args))
raw_data = filtr.get_filtered_data()
except elasticsearch.exceptions.ConnectionTimeout:
logger.info(f"Elasticsearch connection timed out, trying again (try {tries+1})")
time.sleep(4**(tries+1))
continue
break
else:
logger.error(f"Could not connect to Elasticsearch")
sys.exit(1)
if args.report_period == "daily":
last_data_file = Path(f"last_data_{args.filter.__name__}.pickle")
logger.debug(f"Dumping data to {last_data_file}")
with last_data_file.open("wb") as f:
pickle.dump(raw_data, f, pickle.HIGHEST_PROTOCOL)
else:
last_data_file = Path(f"last_data_{args.filter.__name__}.pickle")
logger.debug(f"Reading data from {last_data_file}")
with last_data_file.open("rb") as f:
raw_data = pickle.load(f)
logger.info(f"Filtering data using {args.filter.__name__}")
filtr = args.filter(**vars(args), skip_init=True)
filtr.data = raw_data
table_names = list(raw_data.keys())
logger.debug(f"Got {len(table_names)} tables: {', '.join(table_names)}")
csv_files = {}
for table_name in table_names:
logger.debug(f"Collapsing data for {table_name} table")
table_data = filtr.merge_filtered_data(filtr.get_filtered_data(), table_name)
logger.debug(f"{table_name} table has {len(table_data)} rows")
logger.debug(f"Generating CSV for {table_name}")
csv_files[table_name] = accounting.write_csv(table_data, filtr.name, table_name, **vars(args))
table_files = [csv_files[name] for name in ["Projects", "Users", "Schedds", "Site", "Institution", "Machine", "Jobs", "JobRequests", "JobUsages"] if name in csv_files]
logger.info(f"Formatting data using {args.formatter.__name__}")
formatter = args.formatter(table_files, **vars(args))
logger.debug(f"Generating HTML")
html = formatter.get_html()
last_html_file = Path(f"last_html_{args.formatter.__name__}.html")
logger.debug(f"Dumping HTML to {last_html_file}")
with last_html_file.open("w") as f:
f.write(html)
logger.info("Sending email")
try:
accounting.send_email(
subject=formatter.get_subject(**vars(args)),
html=html,
table_files=table_files,
**vars(args))
except Exception:
logger.exception("Caught exception while sending email")
if args.quiet:
print_exc(file=sys.stderr)
if args.report_period in ["daily", "weekly", "monthly"] and not args.do_not_upload:
logger.info("Pushing daily totals to Elasticsearch")
try:
push_totals_to_es(table_files, "daily_totals", **vars(args))
except Exception as e:
logger.error("Could not push daily totals to Elasticsearch")
if args.debug:
logger.exception("Error follows")
# Push summary data to tables in Tiger
if Path("tiger-es-summary-config.json").exists():
logger.info("Pushing daily totals to Tiger Elasticsearch")
try:
tiger_args = json.load(Path("tiger-es-summary-config.json").open("r"))
push_totals_to_es(table_files, "usage-summary-000001", **tiger_args)
except Exception as e:
logger.error("Could not push daily totals to Tiger Elasticsearch")
if args.debug:
logger.exception("Error follows")