Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions 01_generate_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import sqlite3
import pandas as pd
from settings import DB_PATH, RAW_DATA_PATH

def fetch_data(db_path):
conn = sqlite3.connect(db_path)
df = pd.read_sql_query("SELECT * FROM elevator_calls", conn)
conn.close()

return df.dropna()



if __name__ == "__main__":
df = fetch_data(DB_PATH)
df.to_pickle(RAW_DATA_PATH)
91 changes: 91 additions & 0 deletions 02_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pandas as pd
import numpy as np
from settings import RAW_DATA_PATH, PROCESSED_DATA_PATH
from tqdm import tqdm

def basic_time_features(df):
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values("timestamp").reset_index(drop=True)
df["hour"] = df["timestamp"].dt.hour
df["weekday"] = df["timestamp"].dt.weekday
df["day_name"] = df["timestamp"].dt.day_name()
df["is_weekday"] = df["weekday"].isin([0,1,2,3,4])
df["is_weekend"] = df["weekday"].isin([5,6])
df["resting_floor"] = df["calling_floor"].shift(1)
return df

def cyclical_features(df):
df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24)
df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24)
df["weekday_sin"] = np.sin(2 * np.pi * df["weekday"] / 7)
df["weekday_cos"] = np.cos(2 * np.pi * df["weekday"] / 7)
df.drop(columns=["hour", "weekday"], inplace=True)
return df

def rolling_floor_features(df):
df["calls_from_calling_floor_last_5min"] = 0
df["calls_from_calling_floor_last_hour"] = 0
df["calls_from_calling_floor_last_day"] = 0
df["calls_from_calling_floor_last_7_days"] = 0
df["avg_time_between_calls_from_calling_floor"] = np.nan

for floor in tqdm(df["calling_floor"].unique(), desc="Rolling features by floor"):
mask = df["calling_floor"] == floor
floor_df = df.loc[mask].copy()
floor_df = floor_df.set_index("timestamp")
floor_df = floor_df.sort_index()

calls_last_5min = floor_df.rolling("5min").count()["calling_floor"] - 1
calls_last_hour = floor_df.rolling("1h").count()["calling_floor"] - 1
calls_last_day = floor_df.rolling("1d").count()["calling_floor"] - 1
calls_last_7_days = floor_df.rolling("7d").count()["calling_floor"] - 1
avg_time_between_calls = floor_df.index.to_series().diff().dt.total_seconds().rolling(10, min_periods=1).mean()

idx = df.index[mask]
df.loc[idx, "calls_from_calling_floor_last_5min"] = calls_last_5min.values
df.loc[idx, "calls_from_calling_floor_last_hour"] = calls_last_hour.values
df.loc[idx, "calls_from_calling_floor_last_day"] = calls_last_day.values
df.loc[idx, "calls_from_calling_floor_last_7_days"] = calls_last_7_days.values
df.loc[idx, "avg_time_between_calls_from_calling_floor"] = avg_time_between_calls.values
return df

def conditional_freq_given_resting_floor(df):
df["conditional_call_freq_given_resting_floor_last_7_days"] = 0
df["conditional_call_freq_given_resting_floor_last_5_days"] = 0
df["conditional_call_freq_given_resting_floor_last_24_hours"] = 0

for i, row in tqdm(df.iterrows(), total=len(df), desc="Conditional freq given resting floor"):
curr_time = row["timestamp"]
curr_calling_floor = row["calling_floor"]
curr_resting_floor = row["resting_floor"]

mask_time_7d = (df["timestamp"] < curr_time) & (df["timestamp"] >= curr_time - pd.Timedelta(days=7))
mask_resting = (df["resting_floor"] == curr_resting_floor)
mask_calling = (df["calling_floor"] == curr_calling_floor)
df.loc[i, "conditional_call_freq_given_resting_floor_last_7_days"] = df.loc[mask_time_7d & mask_resting & mask_calling].shape[0]

mask_time_5d = (df["timestamp"] < curr_time) & (df["timestamp"] >= curr_time - pd.Timedelta(days=5))
df.loc[i, "conditional_call_freq_given_resting_floor_last_5_days"] = df.loc[mask_time_5d & mask_resting & mask_calling].shape[0]

mask_time_24h = (df["timestamp"] < curr_time) & (df["timestamp"] >= curr_time - pd.Timedelta(hours=24))
df.loc[i, "conditional_call_freq_given_resting_floor_last_24_hours"] = df.loc[mask_time_24h & mask_resting & mask_calling].shape[0]

return df

def preprocess(df):
df = basic_time_features(df)
df = cyclical_features(df)
df = rolling_floor_features(df)
df = conditional_freq_given_resting_floor(df)
for col in df.select_dtypes(bool).columns:
df[col] = df[col].astype(int)

df = df.iloc[1:].reset_index(drop=True)
df.fillna(0, inplace=True)
return df

if __name__ == "__main__":
df = pd.read_pickle(RAW_DATA_PATH)
df = df.rename(columns={"from_floor": "calling_floor"})
df_processed = preprocess(df)
df_processed.to_pickle(PROCESSED_DATA_PATH)
26 changes: 26 additions & 0 deletions 03_train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pandas as pd
import pickle
from xgboost import XGBClassifier
from settings import PROCESSED_DATA_PATH, MODEL_PATH, FEATURES, TARGET, TEST_SIZE

def train():
df = pd.read_pickle(PROCESSED_DATA_PATH)
df["target"] = df[TARGET].shift(-1)
df = df.dropna(subset=["target"])
df = df.sort_values("timestamp").reset_index(drop=True)
X = df[FEATURES]
y = df["target"].astype(int)
split_index = int(len(df) * (1 - TEST_SIZE))
X_train = X.iloc[:split_index]
y_train = y.iloc[:split_index]
X_test = X.iloc[split_index:]
y_test = y.iloc[split_index:]
model = XGBClassifier()
model.fit(X_train, y_train)
print(f"Train score: {model.score(X_train, y_train):.4f}")
print(f"Test score: {model.score(X_test, y_test):.4f}")
with open(MODEL_PATH, "wb") as f:
pickle.dump(model, f)

if __name__ == "__main__":
train()
115 changes: 115 additions & 0 deletions README_solution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Elevator Resting Floor Data Pipeline

This repository implements the core of a data engineering and feature preparation pipeline for the "elevator resting floor" modeling challenge.

## Structure

- **generate_dataset.py**
Extracts raw event data from the SQLite database and saves it as a pickle artifact for further processing.

- **preprocess.py**
Performs feature engineering on event data, including time-based, cyclical, rolling, and conditional features. Data is output in a clean, ML-ready tabular format. All boolean fields are encoded as integers (0/1).

- **train.py**
Trains a prediction model to anticipate the next calling floor based on historical demand and engineered features. Uses a temporal split to separate train/test, reflecting real-world deployment.

- **model.py**
Defines a class for loading the trained model and producing predictions given new data. Features are recomputed for single-record inference, using defaults for features that require historical context.

- **settings.py**
Centralizes all file paths, feature lists, and relevant pipeline constants.

## Data Model

Events are stored in a table named `elevator_calls`, with at minimum:
- `timestamp`
- `calling_floor`

During preprocessing, additional features are derived, including:
- Previous resting floor
- Time-based and cyclical encodings
- Rolling counts and averages per floor
- Conditional frequencies based on the previous resting floor

## Key Design Choices

- **Temporal split:**
Train/test sets are split chronologically to simulate realistic production inference and avoid data leakage.

- **Feature set:**
Engineered features include recent call frequencies, conditional stats (e.g., "calls from floor X when resting at Y"), and cyclical encodings for hours/days.

- **Data artifacts:**
Intermediate datasets and models are versioned and stored as pickles for reproducibility.

- **Extendability:**
The current schema and scripts are straightforward to adapt for multi-elevator scenarios or richer sensor data.

## Usage

1. Generate or extract event data:
```
python 01_generate_dataset.py
````

2. Preprocess features:
```
python 02_preprocess.py
```

3. Train the model:
```
python 03_train.py
```

4. Run inference (from a notebook or script):
```
from model import ElevetorModel
m = ElevetorModel()
m.predict({"timestamp": "2024-06-01 10:00:00", "calling_floor": 5, "resting_floor": 3})
```

## Next Steps
- **Collect production data:**
Integrate this pipeline with a live data source (e.g., sensor logs, building management API) to gather real elevator usage data. Real behavioral data will reveal meaningful demand patterns and allow for richer feature engineering.

- **Iterate on feature engineering:**
With real data, re-evaluate which features are most predictive. Additional signals—such as trip direction, user IDs, or elevator occupancy—could be incorporated. Time-of-day and seasonality patterns should be validated and potentially modeled more granularly.

- **Model evaluation and tuning:**
Compare different algorithms (tree-based models, time series approaches, etc.), run cross-validation, and monitor for overfitting. Feature importance analysis will help prioritize which features drive prediction quality.

- **Error analysis:**
Systematically analyze cases where the model mispredicts. Use these insights to refine feature sets, handle edge cases, and improve the definition of the "optimal" resting floor policy (e.g., optimizing for average wait time vs. call frequency).

- **Scalability:**
Extend the schema and pipeline to handle multiple elevators and more complex building layouts. Add support for batch ingestion and parallel processing as needed.

- **Productionization:**
Package the pipeline for deployment as a service (API, scheduled job, etc.). Add monitoring for data drift and model performance. Establish processes for regular retraining and validation as new data arrives.

- **Testing and validation:**
Add automated tests to cover core feature transformations and edge cases. Validate that the data pipeline handles missing values, outliers, and schema changes gracefully.

---

## Assumptions

- **Synthetic data:**
The current pipeline uses generated data to demonstrate the end-to-end flow. No assumptions about actual user behavior, peak times, or real demand cycles are built in; all results and scores should be interpreted in that light. The model is designed to be robust to real data once available.

- **Resting floor logic:**
The “resting_floor” feature is approximated as the last known floor at which a call occurred. In a production setting, this would ideally be recorded explicitly when the elevator becomes idle.

- **Single-elevator focus:**
The data model assumes a single elevator in the building. Extension to multiple elevators would require an additional `elevator_id` field and further schema adjustments.

- **Border cases (e.g., move-ins/move-outs, group calls):**
The pipeline currently does not account for outlier scenarios such as bulk move-ins/move-outs (unusually high demand from a single floor), or simultaneous calls by multiple users. All events are treated as independent, single-person elevator calls. In real deployments, these cases may need special handling or outlier filtering.

- **No assumption on trip direction or group occupancy:**
Calls are treated as single-direction, single-user events. The system does not currently attempt to infer or record whether multiple users are entering/exiting together, or the direction of elevator travel (up vs. down) for each call.

---


Binary file added artifacts/data/processed_data.pkl
Binary file not shown.
Binary file added artifacts/data/raw_data.pkl
Binary file not shown.
Binary file added artifacts/models/model.pkl
Binary file not shown.
Binary file added db/elevator_calls.sqlite
Binary file not shown.
75 changes: 75 additions & 0 deletions elevator_model_test.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "fc05d032",
"metadata": {},
"source": [
"\n",
"# Elevator Prediction Model – Test Notebook"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "26a62f57",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"from model import ElevetorModel"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "2cab65fd",
"metadata": {},
"outputs": [],
"source": [
"clf = ElevetorModel()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "44176d73",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Given the current resting floor 2, the predicted floor is 3\n"
]
}
],
"source": [
"sample_input = {\"timestamp\": \"2024-06-01 10:32:00\", \"calling_floor\": 7, \"resting_floor\": 2}\n",
"predicted = clf.predict(sample_input)\n",
"print(f\"Given the current resting floor {sample_input['resting_floor']}, the predicted floor is {predicted}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
40 changes: 40 additions & 0 deletions model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pickle
import numpy as np
import pandas as pd
from settings import MODEL_PATH, FEATURES

class ElevetorModel:
def __init__(self, model_path=MODEL_PATH):
with open(model_path, "rb") as f:
self.model = pickle.load(f)

def compute_features(self, data_json):
df = pd.DataFrame([data_json])
df["timestamp"] = pd.to_datetime(df["timestamp"])
df["hour"] = df["timestamp"].dt.hour
df["weekday"] = df["timestamp"].dt.weekday
df["day_name"] = df["timestamp"].dt.day_name()
df["is_weekday"] = df["weekday"].isin([0,1,2,3,4]).astype(int)
df["is_weekend"] = df["weekday"].isin([5,6]).astype(int)

df["resting_floor"] = data_json.get("resting_floor", 1)
df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24)
df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24)
df["weekday_sin"] = np.sin(2 * np.pi * df["weekday"] / 7)
df["weekday_cos"] = np.cos(2 * np.pi * df["weekday"] / 7)

df["calls_from_calling_floor_last_5min"] = 0
df["calls_from_calling_floor_last_hour"] = 0
df["calls_from_calling_floor_last_day"] = 0
df["calls_from_calling_floor_last_7_days"] = 0
df["avg_time_between_calls_from_calling_floor"] = 0
df["conditional_call_freq_given_resting_floor_last_7_days"] = 0
df["conditional_call_freq_given_resting_floor_last_5_days"] = 0
df["conditional_call_freq_given_resting_floor_last_24_hours"] = 0

return df[FEATURES]

def predict(self, data_json):
features = self.compute_features(data_json)
return int(self.model.predict(features)[0])

4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pandas
scikit-learn
xgboost
tqdm
23 changes: 23 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
DB_PATH = "db/elevator_calls.sqlite"
RAW_DATA_PATH = "artifacts/data/raw_data.pkl"
PROCESSED_DATA_PATH = "artifacts/data/processed_data.pkl"
MODEL_PATH = "artifacts/models/model.pkl"

FEATURES = [
"calling_floor",
"resting_floor",
"hour_sin", "hour_cos", "weekday_sin", "weekday_cos",
"calls_from_calling_floor_last_5min",
"calls_from_calling_floor_last_hour",
"calls_from_calling_floor_last_day",
"calls_from_calling_floor_last_7_days",
"avg_time_between_calls_from_calling_floor",
"conditional_call_freq_given_resting_floor_last_7_days",
"conditional_call_freq_given_resting_floor_last_5_days",
"conditional_call_freq_given_resting_floor_last_24_hours",
"is_weekday",
"is_weekend"
]
TARGET = "calling_floor"
TEST_SIZE = 0.2
RANDOM_STATE = 42