-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtranscripts.py
executable file
·311 lines (283 loc) · 11.4 KB
/
transcripts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#! /usr/bin/env python3
# System
import sys
import os
import errno
from argparse import ArgumentParser
import json
# Web
from urllib.request import urlopen
from bs4 import BeautifulSoup
# Logging
import logging
from logging import handlers
LOGGER = logging.getLogger(__name__)
SH = logging.StreamHandler()
FH = logging.handlers.RotatingFileHandler("log.log", maxBytes=5 * 1000000, backupCount = 5)
SH.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(message)s"))
FH.setFormatter(logging.Formatter("%(asctime)s:%(lineno)s:%(funcName)s:%(levelname)s:%(message)s"))
LOGGER.setLevel(logging.DEBUG)
LOGGER.addHandler(SH)
LOGGER.addHandler(FH)
from pprint import pprint
# NLP
import re
from collections import defaultdict
DESCRIPTION = """Extracts transcriptions of Donald J. Trump's speeches from the 2016 United States Presidential Election race."""
def get_arg_parser():
parser = ArgumentParser(prog=sys.argv[0], description=DESCRIPTION)
parser.add_argument("-i", "--info",
help = "set console logging output to INFO")
parser.add_argument("-d", "--debug",
help = "set console logging output to DEBUG")
parser.set_defaults(
list_start_url = None,
transcript_urls_filename = None,
raw_pages_filename = None,
keywords = None,
texts_filename = None
)
# Sub Parsers
subparsers = parser.add_subparsers(help = "Actions", dest = "commands")
subparsers.required = True
update_parser = subparsers.add_parser("update",
help="updates the list of URLs containing transcripts")
retrieve_parser = subparsers.add_parser("retrieve",
help = "retrieves the raw HTML from the list of URLs filtering by lower case keywords existing in the name of the article")
extract_parser = subparsers.add_parser("extract",
help = "extracts text from the raw HTML")
# Update URL List
update_parser.add_argument(
metavar = "<http://starturl.com/>",
default = None,
dest = "list_start_url",
help = "URL of webpage with list of transcript URLs ")
update_parser.add_argument(
metavar = "<transcriptUrls.json>",
default = None,
dest = "transcript_urls_filename",
help = "path to JSON of transcript URLs and their titles")
# Retrieving Raw HTML
retrieve_parser.add_argument(
metavar = "<transcriptUrls.json>",
default = None,
dest = "transcript_urls_filename",
help = "path to JSON of transcript URLs and their titles")
retrieve_parser.add_argument(
metavar = "<keyword1,keyword2,...>",
default = None,
dest = "keywords",
help = "comma separated list of all keywords that must appear in name of article to retrieve")
retrieve_parser.add_argument(
metavar = "<rawPagesFile.json>",
default = None,
dest = "raw_pages_filename",
help = "path to JSON of transcript pages raw HTML")
# Extracting Text
extract_parser.add_argument(
metavar = "<rawPagesFile.json>",
default = None,
dest = "raw_pages_filename",
help = "path to JSON of transcript pages raw HTML")
extract_parser.add_argument(
metavar = "<textDocuments.json>",
default = None,
dest = "texts_filename",
help = "path to JSON of extracted texts")
return parser
def save_as_json(object, filename, check = False):
LOGGER.debug("Saving dictionary as JSON to '%s'", filename)
if check and os.path.isfile(filename):
LOGGER.warning("File already exists!")
return False
with open(filename, 'w') as file:
json.dump(object, file)
return True
def open_json(filename, check = False):
LOGGER.debug("Loading JSON as dictionary:'%s'", filename)
if check and not os.path.isfile(filename):
LOGGER.error("File doesn't exist!")
return None
with open(filename, 'r') as file:
return json.load(file)
def get_page_text(url):
htmlText = None
LOGGER.debug("Opening:'%s'" % (url))
try:
with urlopen(url) as webpage:
htmlText = webpage.read().decode()
except Exception as e:
LOGGER.error(e)
return None
return htmlText
def get_urls_from_page(result_page_url):
url_names = []
next_page_url = None
LOGGER.debug("Retrieving transcript URLs from '%s'" % result_page_url)
page_string = get_page_text(result_page_url)
if not page_string:
LOGGER.warning("Failed to retrieve html for %s" % result_page_url)
return url_names, next_page_url
LOGGER.debug("Parsing")
bs = BeautifulSoup(page_string, "lxml")
headlines = bs.find_all("h1", {"class" : "headline"})
for h in headlines:
name = h.text
url = h.find("a")["href"]
#LOGGER.debug("Found '%s':%s" % (name, url))
url_names.append({"url" : url, "name" : name})
LOGGER.debug("Finding next page URL")
next_page_button = bs.find_all("a", {"class" : "next page-numbers"})
if len(next_page_button) == 1:
next_page_url = next_page_button[0]["href"]
else:
next_page_url = None
if len(next_page_button):
LOGGER.error("Encountered %d-many <a class='next page-number':"
% (len(next_page_button), next_page_button))
LOGGER.debug("Next Page:'%s'" % next_page_url)
return url_names, next_page_url
def get_transcript_urls(start_page_url, already_seen_urls = set()):
LOGGER.debug("Grabbing URLs and name of transcripts from search page")
LOGGER.debug("Starting at %s" % start_page_url)
next_page = start_page_url
url_names = []
result_urls = []
while next_page:
result_urls.append(next_page)
new_urls, next_page = get_urls_from_page(next_page)
for u in new_urls:
if u["url"] not in already_seen_urls:
LOGGER.debug("Adding:%s:%s" % (u["name"], u["url"]))
url_names.append(u)
else:
LOGGER.warning("skipping:%s:%s" % (u["name"], u["url"]))
if any(u["url"] in already_seen_urls for u in new_urls):
LOGGER.debug("Exiting")
break
LOGGER.debug("Moving onto %s" % next_page)
return url_names, result_urls
def get_transcript_html(url_names, keywords, already_downloaded_urls = set()):
url_name_htmls = []
[LOGGER.debug("Filtering name by: %s" % k.lower()) for k in keywords]
to_download = [(un["url"], un["name"]) for un in url_names
if
un["url"] not in already_downloaded_urls
and
all([x in un["name"].lower() for x in keywords])]
speeches = {transcript_basename(name) for url, name in to_download}
LOGGER.debug("Number of articles to retrieve: %d" % len(to_download))
LOGGER.debug("Number of speeches to retrieve: %d" % len(speeches))
for url, name in to_download:
basename = transcript_basename(name)
html = get_page_text(url)
url_name_htmls.append({"url" : url, "name" : name, "html" : html})
return url_name_htmls
def transcript_basename(name):
basename = re.sub(" – Part [\d+]","", name).strip()
if not (basename.endswith("2015") or basename.endswith("2016")):
basename = re.sub("([\w])[\d+]$","\g<1>", basename)
return basename
def strip_html(html):
paras = []
bs = BeautifulSoup(html, "lxml")
for p in bs.find_all("p"):
text = p.text
if "…" == text:
continue
if "###" == text:
continue
if "Partial transcript" in text:
continue
if "Excerpts from a" in text:
continue
if "Donald Trump:" in text:
continue
if "Transcript:" in text:
continue
if "Category:" in text:
continue
if "RSS Feed" in text:
continue
if "Posted by News Editor" in text:
continue
if "What The Folly?!" in text:
continue
if "Comments are closed." == text:
continue
if "Leave a Comment" == text:
continue
if not text:
continue
text = re.sub("…","...", text)
paras.append(text if text[-1] == ' ' else text + ' ')
return "\n".join(paras)
def extract_text(pages):
transcripts = defaultdict(list)
for p in pages["transcripts"]:
LOGGER.debug("Stripping HTML from %s" % p["name"])
transcripts[transcript_basename(p["name"])].append(strip_html(p["html"]))
texts = []
for name, parts in transcripts.items():
LOGGER.debug("Joining %2d articles for %s" % (len(parts), name))
texts.append({"name" : name, "text" : "\n".join(parts)})
return texts
def main():
# Parse Arguments
parser = get_arg_parser()
args = parser.parse_args()
print(args)
# Logging Information
if args.info:
SH.setLevel(logging.INFO)
if args.debug:
SH.setLevel(logging.DEBUG)
start_page_url = args.list_start_url
transcript_urls_filename = args.transcript_urls_filename
raw_pages_filename = args.raw_pages_filename
keywords = [k for k in map(lambda x: x.lower(), args.keywords.split(","))] if args.keywords else None
texts_filename = args.texts_filename
# Get Transcript URLs
urls = None
if transcript_urls_filename:
urls = open_json(transcript_urls_filename, check = True)
if start_page_url:
if not urls:
urls = {"transcripts" : [], "results" : []}
new_urls, result_urls = get_transcript_urls(start_page_url, {u["url"] for u in urls["transcripts"]})
LOGGER.info("Found %d new URLs" % len(new_urls))
LOGGER.info("First result page looked at:'%s'" % (result_urls[0]))
LOGGER.info("Last result page looked at:'%s'" % (result_urls[-1]))
urls["transcripts"].extend(new_urls)
urls["results"] = sorted([r for r in set(urls["results"] + result_urls)])
save_as_json(urls, transcript_urls_filename)
if urls:
LOGGER.info("Number of Transcript URLs : %d" % len(urls["transcripts"]))
LOGGER.info("Number of Result Pages Seen: %d" % len(urls["results"]))
# Get Transcript HTML
if raw_pages_filename:
if not texts_filename and not urls:
LOGGER.fatal("Transcript URLs file is required. Exiting.")
return errno.ENOENT
pages = open_json(raw_pages_filename, check = True)
if not pages:
pages = {"transcripts" : []}
if not texts_filename:
new_htmls = get_transcript_html(urls["transcripts"], keywords, {p["url"] for p in pages["transcripts"]})
LOGGER.info("New Downloads: %d" % len(new_htmls))
pages["transcripts"].extend(new_htmls)
save_as_json(pages, raw_pages_filename)
# Convert HTML to Text
if texts_filename:
if not pages:
LOGGER.fatal("Transcript HTML file is required. Exiting.")
return errno.ENOENT
texts = extract_text(pages)
LOGGER.info("Texts: %d" % len(texts))
save_as_json(texts, texts_filename)
return 0
if __name__ == '__main__':
LOGGER.info("Beginning Session")
rtn = main()
LOGGER.info("Ending Session")
sys.exit(rtn)