From 30c8d952eead8f315eb86fde68ed3986ffd609c0 Mon Sep 17 00:00:00 2001 From: Baruni Prabaharan <239645217+barunip@users.noreply.github.com> Date: Sun, 18 Jan 2026 13:18:39 -0500 Subject: [PATCH] Assignment 1 responses --- 02_activities/assignments/assignment_1.ipynb | 340 ++++++++++++++++++- 1 file changed, 329 insertions(+), 11 deletions(-) diff --git a/02_activities/assignments/assignment_1.ipynb b/02_activities/assignments/assignment_1.ipynb index 45cfc9cd7..e326588d2 100644 --- a/02_activities/assignments/assignment_1.ipynb +++ b/02_activities/assignments/assignment_1.ipynb @@ -26,11 +26,15 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Write your code below.\n", + "\n", + "# Load environment variables from a .env file\n", + "%load_ext dotenv\n", + "%dotenv\n", "\n" ] }, @@ -55,14 +59,34 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "PRICE_DATA: ../../05_src/data/prices/\n", + "Number of parquet files found: 3083\n" + ] + } + ], "source": [ "import os\n", "from glob import glob\n", "\n", "# Write your code below.\n", + "import dask.dataframe as dd\n", + "\n", + "# Load the PRICE_DATA environment variable\n", + "PRICE_DATA = os.getenv(\"PRICE_DATA\")\n", + "\n", + "# Use glob to find all parquet files under PRICE_DATA (recursively)\n", + "parquet_files = glob(os.path.join(PRICE_DATA, \"**\", \"*.parquet\"), recursive=True)\n", + "\n", + "# Confirm results\n", + "print(f\"PRICE_DATA: {PRICE_DATA}\")\n", + "print(f\"Number of parquet files found: {len(parquet_files)}\")\n", "\n" ] }, @@ -88,12 +112,103 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DateOpenHighLowCloseAdj_CloseVolumesourcetickerYearClose_lag_1Adj_Close_lag_1returnshi_lo_range
\n", + "
" + ], + "text/plain": [ + "Empty DataFrame\n", + "Columns: [Date, Open, High, Low, Close, Adj_Close, Volume, source, ticker, Year, Close_lag_1, Adj_Close_lag_1, returns, hi_lo_range]\n", + "Index: []" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "# Write your code below.\n", - "\n" + "import numpy as np\n", + "import dask.dataframe as dd\n", + "\n", + "# Assumes `parquet_files` is already defined.\n", + "dd_px = dd.read_parquet(parquet_files)\n", + "\n", + "# Normalize adjusted-close column name if needed\n", + "if \"Adj_Close\" not in dd_px.columns and \"Adj Close\" in dd_px.columns:\n", + " dd_px = dd_px.rename(columns={\"Adj Close\": \"Adj_Close\"})\n", + "\n", + "# Ensure Date is datetime64[ns] (no-op if already correct)\n", + "if dd_px[\"Date\"].dtype.kind not in (\"M\",):\n", + " dd_px = dd_px.assign(Date=dd_px[\"Date\"].astype(\"M8[ns]\"))\n", + "\n", + "# ---- Build explicit meta (schema) for the output with the four new columns ----\n", + "meta = (\n", + " dd_px._meta.assign(\n", + " Close_lag_1=np.float64(),\n", + " Adj_Close_lag_1=np.float64(),\n", + " returns=np.float64(),\n", + " hi_lo_range=np.float64(),\n", + " )\n", + ")\n", + "\n", + "# ---- Per-ticker feature logic: sort by Date, add lags/returns/range ----\n", + "def _per_ticker_features(pdf):\n", + " # NOTE: This function is executed by Dask on each group partition;\n", + " # the returned result is stitched back into a single Dask DataFrame.\n", + " pdf = pdf.sort_values(\"Date\")\n", + " pdf[\"Close_lag_1\"] = pdf[\"Close\"].shift(1)\n", + " pdf[\"Adj_Close_lag_1\"] = pdf[\"Adj_Close\"].shift(1)\n", + " pdf[\"returns\"] = pdf[\"Close\"] / pdf[\"Close_lag_1\"] - 1\n", + " pdf[\"hi_lo_range\"] = pdf[\"High\"] - pdf[\"Low\"]\n", + " return pdf\n", + "\n", + "# Group by ticker and apply the feature function with explicit meta\n", + "dd_feat = dd_px.groupby(\"ticker\", group_keys=False).apply(_per_ticker_features, meta=meta)\n", + "\n", + "# Quick peek \n", + "dd_feat.head()\n" ] }, { @@ -108,11 +223,197 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": null, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
indexDateOpenHighLowCloseAdj_CloseVolumesourcetickerYearClose_lag_1Adj_Close_lag_1returnshi_lo_rangereturns_ma10
01373152001-07-1915.1015.2915.0015.1711.40439434994300.0ACN.csvACN2001NaNNaNNaN0.29NaN
11373162001-07-2015.0515.0514.8015.0111.2841089238500.0ACN.csvACN200115.1711.404394-0.0105470.25NaN
21373172001-07-2315.0015.0114.5515.0011.2765877501000.0ACN.csvACN200115.0111.284108-0.0006660.46NaN
31373182001-07-2414.9514.9714.7014.8611.1713413537300.0ACN.csvACN200115.0011.276587-0.0093330.27NaN
41373192001-07-2514.7014.9514.6514.9511.2389994208100.0ACN.csvACN200114.8611.1713410.0060570.30NaN
\n", + "
" + ], + "text/plain": [ + " index Date Open High Low Close Adj_Close Volume \\\n", + "0 137315 2001-07-19 15.10 15.29 15.00 15.17 11.404394 34994300.0 \n", + "1 137316 2001-07-20 15.05 15.05 14.80 15.01 11.284108 9238500.0 \n", + "2 137317 2001-07-23 15.00 15.01 14.55 15.00 11.276587 7501000.0 \n", + "3 137318 2001-07-24 14.95 14.97 14.70 14.86 11.171341 3537300.0 \n", + "4 137319 2001-07-25 14.70 14.95 14.65 14.95 11.238999 4208100.0 \n", + "\n", + " source ticker Year Close_lag_1 Adj_Close_lag_1 returns hi_lo_range \\\n", + "0 ACN.csv ACN 2001 NaN NaN NaN 0.29 \n", + "1 ACN.csv ACN 2001 15.17 11.404394 -0.010547 0.25 \n", + "2 ACN.csv ACN 2001 15.01 11.284108 -0.000666 0.46 \n", + "3 ACN.csv ACN 2001 15.00 11.276587 -0.009333 0.27 \n", + "4 ACN.csv ACN 2001 14.86 11.171341 0.006057 0.30 \n", + "\n", + " returns_ma10 \n", + "0 NaN \n", + "1 NaN \n", + "2 NaN \n", + "3 NaN \n", + "4 NaN " + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "# Write your code below.\n", + "\n", + "\n", + "# Convert the Dask DataFrame to a pandas DataFrame\n", + "# Assumes `dd_feat` exists from the previous step\n", + "pdf_feat = dd_feat.reset_index().compute()\n", + "\n", + "# Ensure chronological order within each ticker\n", + "pdf_feat = pdf_feat.sort_values([\"ticker\", \"Date\"])\n", + "\n", + "# Add a 10-day moving average of returns (per ticker)\n", + "# transform keeps the result aligned to the original rows\n", + "pdf_feat[\"returns_ma10\"] = (\n", + " pdf_feat.groupby(\"ticker\")[\"returns\"]\n", + " .transform(lambda s: s.rolling(window=10, min_periods=10).mean())\n", + ")\n", + "\n", + "# Optional quick check\n", + "pdf_feat.head()\n", + "\n", "\n" ] }, @@ -123,7 +424,24 @@ "Please comment:\n", "\n", "+ Was it necessary to convert to pandas to calculate the moving average return?\n", + "No. Dask can compute a moving average. \n", + "Converting to pandas was a pragmatic choice for simplicity and quick validation, but it isn’t strictly required for this feature.\n", "+ Would it have been better to do it in Dask? Why?\n", + "Yes, in the case where data size is large or you want to stay scalable. \n", + "1. Scalability & memory: \n", + "Dask: Keeps computation lazy and chunked by partitions. \n", + "pandas: Fine for small/medium data, but it breaks scalability and risks memory pressure.\n", + "\n", + "2. Performance & parallelism\n", + "Dask: Rolling operations can be parallelized across partitions. W\n", + "pandas: Runs eagerly on one process; you lose the benefits of the Dask task graph you built up to this point.\n", + "\n", + "3. Operational consistency\n", + "Keeping the whole pipeline in Dask means one execution model (lazy graph → write to Parquet), consistent dtypes/schemas, and easier productionization (no sudden memory spikes from .compute()).\n", + "\n", + "\n", + "\n", + "\n", "\n", "(1 pt)" ] @@ -165,7 +483,7 @@ ], "metadata": { "kernelspec": { - "display_name": "env", + "display_name": "production-env", "language": "python", "name": "python3" }, @@ -179,7 +497,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.0" + "version": "3.11.14" } }, "nbformat": 4,