Skip to content

Commit

Permalink
Python code for rebuilding the dataset from scratch
Browse files Browse the repository at this point in the history
  • Loading branch information
cvanlabe committed Sep 16, 2022
1 parent 0d1ead1 commit 2f4f3fb
Show file tree
Hide file tree
Showing 15 changed files with 1,554 additions and 0 deletions.
36 changes: 36 additions & 0 deletions dbconnection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from sqlalchemy import create_engine

from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy.pool import StaticPool

import logging

logger = logging.getLogger("unsc_db_filler")

Base = declarative_base()


class DBConnection:
def __init__(self, host: str, dbname: str, user: str, password: str) -> None:
self.host: str = host
self.dbname: str = dbname
self.user: str = user
self.password: str = password
self.engine: Engine = create_engine(
f"postgresql+psycopg2://{user}:{password}@{host}/{dbname}",
echo=True,
poolclass=StaticPool,
)
self.session: Session = sessionmaker(bind=self.engine)()
Base.metadata.create_all(self.engine)

@property
def connection_string(self) -> str:
return f"host={self.host} dbname={self.dbname} user={self.user} password={self.password}"

def get_session(self) -> Session:
return self.session

def get_engine(self) -> Engine:
return self.engine
20 changes: 20 additions & 0 deletions download_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from job import Job


class DownloadJob(Job):
"""
A Download job has a url it needs to go and download. As well as a destination file to save the download to.
It can be enqueued to a `Queue` instance for retrying when failing.
"""

def __init__(self, url: str, dest_file: str, description: str = "") -> None:
super().__init__(description=description)
self.url: str = url
self.dest_file: str = dest_file

def info(self):
return f"Job for {self.description} from {self.url} to {self.dest_file}"

def __repr__(self) -> str:
return f"Job for {self.description} from {self.url} to {self.dest_file}"
150 changes: 150 additions & 0 deletions html_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import logging
import requests

from download_job import DownloadJob
from job_queue import JobQueue

logger = logging.getLogger("unsc_db_filler")


class DownloadFailed(Exception):
pass


class HTMLDownloader:
"""
This class is responsible for Downloading the UNSC pages we need to build our dataset.
They are the general Meeting pages, and the Veto page.
This class uses a Job Queue since the un.org web servers frequently give random errors.
"""

def __init__(self, path: str = "./", since: int = 1946, until: int = 2021) -> None:
self.path = path
self.since = since
self.until = until
self.job_queue = JobQueue()

def fetch_meeting_tables(self) -> None:
"""
Fetches all the UN Meeting tables webpages between the year `self.since`
and `self.until` to the location `self.path`.
:param since: The first year to fetch the tables for
:param until: The last year to fetch the tables for
:return:
"""
for year in range(self.since, self.until):
# Some pages on the UN website end with .html, others with .htm...
# it seems that prior to the year 1994, pages are .html, after that, .htm.
# Since we do not want to check all these pages individually, we'll just try
# and go to the .html version, and if that fails go to .htm
if (
year == 2022
): # TODO: UN made it again inconsistent... we can query https://research.un.org/en/docs/sc/quick/meetings/2022
# and query the iframe in which this page is rendered to figure out what the real url is we need to grab.
url = f"https://www.un.org/depts/dhl/resguide/scact{year}_table_en.html"
elif year < 1994:
url = f"https://www.un.org/depts/dhl/resguide/scact{year}_table_en.html"
else:
url = f"https://www.un.org/depts/dhl/resguide/scact{year}_table_en.htm"

# Originally we just tried html. If it failed, try .htm.
# That trick was not reliable..the UN sometimes has both .html and .html..
# but one is then incomplete!
#
# if res.status_code == 404:
# logger.info("Failed to fetch .html, retrying .htm...")
# url = f"https://www.un.org/depts/dhl/resguide/scact{year}_table_en.htm"
# res = fetch_url(url)
#
# Leaving in this comment as 'historical software engineering story telling'
# https://www.youtube.com/watch?v=4PaWFYm0kEw&t=48s

download_job = DownloadJob(
url=url,
dest_file=f"scact{year}_table_en.html",
description=f"UNSC Meeting Table for Year `{year}`",
)

self.job_queue.enqueue(download_job)

# In 2020 the UN started to document Covid remote meetings in another page.
# Half their page was in the previous pages, half their page is on the new page:
# https://www.un.org/depts/dhl/resguide/SC_2020-revised.html
#
# We create another downloadjob for 2020:
if year == 2020:
download_job = DownloadJob(
url="https://www.un.org/depts/dhl/resguide/SC_2020-revised.html",
dest_file=f"scact2020_covid_table_en.html",
description=f"UNSC Meeting Covid Table for Year `{year}`",
)

self.job_queue.enqueue(download_job)

self.job_queue.process(self.download)

def fetch_veto_table(self) -> None:
"""
Enqueues the download job for downloading the veto table from the UNSC website,
and starts processing it
"""
url = "https://www.un.org/depts/dhl/resguide/scact_veto_table_en.htm"

download_job = DownloadJob(
url=url,
dest_file=f"scact_veto_table_en.html",
description=f"UNSC Veto Table",
)
self.job_queue.enqueue(download_job)
self.job_queue.process(self.download)

def download(self, job: DownloadJob) -> None:
"""
Downloads the web page requested in a given DownloadJob
and writes it to disk as the file specified in the DownloadJob
This method is passed to the Job Queue process function to do the real heavy lifting
:param job: the DownloadJob we use to download the real data we need
"""
logger.info("Downloading %s - BEGIN", job.info())

# No try..except here.. let it crash...
# the job queue processing jobs needs to know when it failed
res = self.fetch_url(job.url)

logger.info("Downloading %s - END", job.info())

if res.status_code == 200:
self.write_to_disk(content=res.text, file=job.dest_file)
else:
raise DownloadFailed(
f"Unable to download the html file for {job.dest_file}...status code was {res.status_code}"
)

def fetch_url(self, url: str) -> requests.Response:
"""
Fetch a given url and return a requests Response
:param url: the URL we want to fetch
:return: the requests Response
"""
logger.info("Fetching %s", url)
return requests.get(url)

def write_to_disk(self, content: str, file: str) -> None:
"""
Writes a given content for a given year, to a file called
scact<year>_table_en.html in the SCRATCH_FOLDER location.
:param content: The content that needs to be written to file
:param file: the filename in which the content will be written
"""
output_file = f"{self.path}/{file}"
logger.info("Writing content to file '%s' BEGIN", output_file)

with open(output_file, "w") as f:
f.write(content)
logger.info("Writing content to file '%s' END", output_file)
28 changes: 28 additions & 0 deletions job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import ABC, abstractmethod


class Job(ABC):
"""
A job that can be queued to a `Queue` instance for retrying when failing.
"""

def __init__(self, description: str = "") -> None:
self.complete: bool = False
self.attempts: int = 1
self.description: str = description

@abstractmethod
def info(self):
pass

@property
def complete(self):
return self._complete

@complete.setter
def complete(self, val: bool):
self._complete = val

def __repr__(self) -> str:
return f"Job complete status: {self.complete} -- Attempts: {self.attempts} -- Description: {self.description}"
67 changes: 67 additions & 0 deletions job_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from job import Job

import logging

logger = logging.getLogger("unsc_db_filler")


class JobQueue:
"""
This class represents a FIFO queue which processes jobs.
It has a list of jobs it needs to execute,
a list of successfully processed jobs,
a list of failed jobs (after having retried `retries` (by default 20) times).
"""

def __init__(self, retries: int = 20) -> None:
self.jobs: list = []
self.processed: list = []
self.failed: set = set()
self.retries: int = retries

def enqueue(self, job: Job) -> None:
self.jobs.append(job)

def dequeue(self, job: Job) -> None:
self.jobs.remove(job)

def size(self) -> int:
return len(self.jobs)

def process(self, function) -> None:
"""
Try to process the jobs list using the passed function.
Whenever the function completes without throwing an error,
we assume it was successful. If we have an error running the function,
we will retry the job, and add it back to the queue.
If the function failed 20 times, we give up and add the job to failed jobs list.
:param function: the function we want to run on the job
"""
while True:
if len(self.jobs) == 0:
break

logger.info("=====================================")
try:
# Take the oldest job from the queue
job = self.jobs.pop(0)
logger.info("Trying time %s for job %s", job.attempts, job)
function(job)
job.complete = True
except Exception as e:
logger.info(
"Failed to process job '%s': %s .. retrying later",
job.info(),
e,
)

# Increment failed attempts of job
job.attempts += 1
# Add the failed job again to the queue if it didn't fail the max retries times yet
if job.attempts <= self.retries:
self.jobs.append(job)
else:
# If we failed more than the max retry times, remove it from the queue, and list it as a failed job.
self.failed.add(job)
Loading

0 comments on commit 2f4f3fb

Please sign in to comment.