Skip to content

Commit

Permalink
updated process script and test script
Browse files Browse the repository at this point in the history
  • Loading branch information
Girish3320 committed Jan 3, 2025
1 parent 6f8eb8e commit e47655e
Show file tree
Hide file tree
Showing 16 changed files with 9,742 additions and 93 deletions.
26 changes: 4 additions & 22 deletions scripts/us_bls/jolts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,8 @@ Additional information about each dataframe.
"""

IMPORTANT:
If any new statistical variable comes in future , it is to be added to the dictionaries : _dcid_map in map_config.py

### Import Procedure

The below script will download the data, generate csv and mcf files.

`/usr/bin/python3 scripts/us_bls/jolts/bls_jolts.py`

Execute the 'bls_jolts.py' script by using the following commands:

- if you want to perform "download and process", run the below command:

`python3 bls_jolts.py`
- if you want to perform "only download", run the below command:

`python3 bls_jolts.py --mode=download`

- if you want to perform "only process", run the below command:

`python3 bls_jolts.py --mode=process`

If any new statistical variable comes in future , it is to be added to the dictionaries :
1. _dcid_map in map_config.py
2. _mcf_map in mcf_config.py


183 changes: 133 additions & 50 deletions scripts/us_bls/jolts/bls_jolts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,66 @@
Dataset being processed: https://download.bls.gov/pub/time.series/jt/
"""
import sys
import os
import textwrap
from absl import app
from absl import logging
from absl import flags
from retry import retry
import time
import pandas as pd
import requests
from map_config import _dcid_map
import fileinput

_DOWNLOAD_DETAILS = {
"SERIES": {
"url": "https://download.bls.gov/pub/time.series/jt/jt.series",
"file_name": "jolts_input_jt_series.csv"
},
"JOBOPENINGS": {
"url":
"https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings",
"file_name":
"jolts_input_jt_job_openings.csv"
},
"HIRES": {
"url": "https://download.bls.gov/pub/time.series/jt/jt.data.3.Hires",
"file_name": "jolts_input_jt_job_hires.csv"
},
"TOTAL_SEPERATIONS": {
"url":
"https://download.bls.gov/pub/time.series/jt/jt.data.4.TotalSeparations",
"file_name":
"jolts_input_jt_totlal_separations.csv"
},
"QUITS": {
"url": "https://download.bls.gov/pub/time.series/jt/jt.data.5.Quits",
"file_name": "jolts_input_jt_total_quits.csv"
},
"LAYOFFS": {
"url":
"https://download.bls.gov/pub/time.series/jt/jt.data.6.LayoffsDischarges",
"file_name":
"jolts_input_jt_total_layoffs.csv"
},
"OTHER_SEPERATIONS": {
"url":
"https://download.bls.gov/pub/time.series/jt/jt.data.7.OtherSeparations",
"file_name":
"jolts_input_jt_total_other_separations.csv"
}
}

_FLAGS = flags.FLAGS
flags.DEFINE_string('mode', '', 'Options: download or process')

_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
_INPUT_FILES = _MODULE_DIR + "/input_files"
if not os.path.exists(_INPUT_FILES):
os.mkdir(_INPUT_FILES)
_OUTPUT_FILES = _MODULE_DIR + "/output_files"
if not os.path.exists(_OUTPUT_FILES):
os.mkdir(_OUTPUT_FILES)

# JOLTS dataset contains both NAICS industry codes and BLS jolts aggregations.
# Existing NAICS Codes are mapped directly while
# custom JOLTS codes include a colon distinguishing their new name.
Expand Down Expand Up @@ -73,8 +117,23 @@
'929000:State and local government excluding education' # New Code
}

header = {'User-Agent': '[email protected]'}


# Retry configuration to handle retries for failed downloads.
@retry(exceptions=Exception, tries=5, delay=5, backoff=2, logger=logging)
def download_with_retries(url, file_name):
"""Download file with retry logic."""
fil_path = os.path.join(_INPUT_FILES, file_name)
logging.info(f"Downloading url {url} to file name {fil_path}")
with open(fil_path, 'wb') as out_file:
res = requests.get(url, headers=header)
res.raise_for_status()
if res.status_code == 200:
out_file.write(res.content)

def generate_cleaned_dataframe():

def generate_cleaned_dataframe(_INPUT_FILES):
"""Fetches and combines BLS Jolts data sources, with retry logic for file downloads.
Downloads detailed series information from the entire JOLTS dataset.
Expand All @@ -85,21 +144,13 @@ def generate_cleaned_dataframe():
schema_mapping: List of tuples that contains information for each dataset.
"""

header = {'User-Agent': '[email protected]'}

# Retry configuration to handle retries for failed downloads.
@retry(exceptions=Exception, tries=5, delay=5, backoff=2, logger=logging)
def download_with_retries(url):
"""Download file with retry logic."""
return pd.read_csv(url, storage_options=header, sep="\\s+")

# Download and process series description
try:
series_desc = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.series",
storage_options=header,
converters={'industry_code': str},
sep="\\t")
series_desc = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['SERIES']['file_name']),
converters={'industry_code': str},
sep="\\t",
engine="python")
series_desc.columns = [
'series_id', 'seasonal', 'industry_code', 'state_code', 'area_code',
'sizeclass_code', 'dataelement_code', 'ratelevel_code',
Expand All @@ -108,60 +159,63 @@ def download_with_retries(url):
]
series_desc["series_id"] = series_desc["series_id"].apply(
lambda x: x.strip())
series_desc.to_csv("jolts_input_jt_series.csv")
series_desc = series_desc.set_index("series_id")
except Exception as e:
logging.fatal(f"Failed to download series description: {e}")
return

# Download and process other datasets with retries
try:
job_openings = download_with_retries(
"https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings")
job_openings.to_csv("jolts_input_jt_job_openings.csv")
job_openings = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['JOBOPENINGS']['file_name']),
sep="\\s+",
engine="python")
except Exception as e:
logging.fatal(f"Failed to download job openings data: {e}")
return

try:
job_hires = download_with_retries(
"https://download.bls.gov/pub/time.series/jt/jt.data.3.Hires")
job_hires.to_csv("jolts_input_jt_job_hires.csv")
job_hires = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['HIRES']['file_name']),
sep="\\s+",
engine="python")

except Exception as e:
logging.fatal(f"Failed to download job hires data: {e}")
return

try:
total_seps = download_with_retries(
"https://download.bls.gov/pub/time.series/jt/jt.data.4.TotalSeparations"
)
total_seps.to_csv("jolts_input_jt_totlal_separations.csv")
total_seps = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['TOTAL_SEPERATIONS']['file_name']),
sep="\\s+",
engine="python")
except Exception as e:
logging.fatal(f"Failed to download total separations data: {e}")
return

try:
total_quits = download_with_retries(
"https://download.bls.gov/pub/time.series/jt/jt.data.5.Quits")
total_quits.to_csv("jolts_input_jt_total_quits.csv")
total_quits = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['QUITS']['file_name']),
sep="\\s+",
engine="python")
except Exception as e:
logging.fatal(f"Failed to download total quits data: {e}")
return

try:
total_layoffs = download_with_retries(
"https://download.bls.gov/pub/time.series/jt/jt.data.6.LayoffsDischarges"
)
total_layoffs.to_csv("jolts_input_jt_total_layoffs.csv")
total_layoffs = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['LAYOFFS']['file_name']),
sep="\\s+",
engine="python")
except Exception as e:
logging.fatal(f"Failed to download total layoffs data: {e}")
return

try:
total_other_seps = download_with_retries(
"https://download.bls.gov/pub/time.series/jt/jt.data.7.OtherSeparations"
)
total_other_seps.to_csv("jolts_input_jt_total_other_separations.csv")
total_other_seps = pd.read_csv(os.path.join(
_INPUT_FILES, _DOWNLOAD_DETAILS['OTHER_SEPERATIONS']['file_name']),
sep="\\s+",
engine="python")
except Exception as e:
logging.fatal(f"Failed to download other separations data: {e}")
return
Expand Down Expand Up @@ -215,14 +269,19 @@ def period_year_to_iso_8601(row):
jolts_df = jolts_df.merge(series_desc[series_cols],
left_on=["series_id"],
right_index=True)
jolts_df.to_csv("before_query.csv", index=False)
file_name = "before_query.csv"
file_path = os.path.join(_INPUT_FILES, file_name)
jolts_df.to_csv(file_path, index=False)

# Drop rate data, preliminary data, and non-national data.
jolts_df = jolts_df.query("ratelevel_code == 'L'")
jolts_df = jolts_df.query("footnote_codes != 'P'")
jolts_df = jolts_df.query("state_code == '00'")
jolts_df = jolts_df.query('sizeclass_code == 0')
jolts_df.to_csv("after_query.csv", index=False)

file_name = "after_query.csv"
file_path = os.path.join(_INPUT_FILES, file_name)
jolts_df.to_csv(file_path, index=False)

# Map industries.
def jolts_code_map(row):
Expand Down Expand Up @@ -263,7 +322,7 @@ def row_to_stat_var(row):
return jolts_df, schema_mapping


def process(jolts_df, schema_mapping):
def process(jolts_df, schema_mapping, _OUTPUT_FILES):
"""Creates Statistical Variable nodes.
A new statistical industry is needed for each of the 6 job variables
Expand Down Expand Up @@ -294,7 +353,9 @@ def process(jolts_df, schema_mapping):

try:
# Output the schema mapping to a new file.
with open("BLSJolts_StatisticalVariables.mcf", "w+",
with open(os.path.join(_OUTPUT_FILES,
"BLSJolts_StatisticalVariables.mcf"),
"w+",
newline="") as f_out:
logging.info(
"Started writing statistical variable schemas to 'BLSJolts_StatisticalVariables.mcf'."
Expand Down Expand Up @@ -344,21 +405,43 @@ def process(jolts_df, schema_mapping):
def main(_):
mode = _FLAGS.mode

# Check for the download mode
if mode == "download" or mode == "":
logging.info("Downloading files...")
generate_cleaned_dataframe() # This function handles file download
# generate_cleaned_dataframe() # This function handles file download
for series in _DOWNLOAD_DETAILS:
download_url = _DOWNLOAD_DETAILS[series]
download_with_retries(download_url['url'],
download_url['file_name'])
logging.info("Download completed!")

# Ensure processing mode is executed if needed (or default mode when no mode is specified)
if mode == "process" or mode == "":
logging.info("Processing data...")
jolts_df, schema_mapping = generate_cleaned_dataframe()

# Check if the required CSV files are downloaded before processing
if not os.path.exists(
os.path.join(_INPUT_FILES, "jolts_input_jt_series.csv")):
logging.error(
"Required CSV files are missing. Please download the data first."
)
return

# Combine the downloaded CSV files into a single DataFrame if needed
jolts_df, schema_mapping = generate_cleaned_dataframe(
_INPUT_FILES) # Call the function to get the data

# Process and output final cleaned CSV and MCF
final_columns = ['Date', 'StatisticalVariable', 'Value']
jolts_df.loc[:, final_columns].to_csv("BLSJolts.csv",
index=False,
encoding="utf-8")
process(jolts_df, schema_mapping)
logging.info("Process completed!")
output_csv_path = os.path.join(_OUTPUT_FILES,
"BLSJolts_StatisticalVariables.csv")
jolts_df.loc[:, final_columns].to_csv(output_csv_path)

# Call the process function to write statistical variables to MCF
process(jolts_df, schema_mapping, _OUTPUT_FILES)
logging.info(
f"Process completed! Output saved to {output_csv_path} and MCF file."
)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit e47655e

Please sign in to comment.