Skip to content

Commit

Permalink
Merge pull request #1191 from mindsdb/new_ts_defaults
Browse files Browse the repository at this point in the history
[ENH] New ts defaults
  • Loading branch information
paxcema authored Sep 15, 2023
2 parents 68d64d4 + e626327 commit 0fa5389
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 46 deletions.
42 changes: 13 additions & 29 deletions lightwood/api/json_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def generate_json_ai(
]
)
else:

# add neural model
if not tss.is_timeseries:
submodels.extend(
[
Expand All @@ -226,10 +228,11 @@ def generate_json_ai(
"stop_after": "$problem_definition.seconds_per_mixer",
"search_hyperparameters": True,
},
}
},
]
)

# add other models
if (not tss.is_timeseries or tss.horizon == 1) and dtype_dict[target] not in (dtype.num_array, dtype.cat_array):
submodels.extend(
[
Expand All @@ -255,34 +258,15 @@ def generate_json_ai(
},
]
)
elif tss.is_timeseries and tss.horizon > 1 and tss.use_previous_target and \
dtype_dict[target] in (dtype.integer, dtype.float, dtype.quantity):

submodels.extend(
[
{
"module": "SkTime",
"args": {
"stop_after": "$problem_definition.seconds_per_mixer",
"horizon": "$problem_definition.timeseries_settings.horizon",
},
},
{
"module": "ETSMixer",
"args": {
"stop_after": "$problem_definition.seconds_per_mixer",
"horizon": "$problem_definition.timeseries_settings.horizon",
},
},
{
"module": "ARIMAMixer",
"args": {
"stop_after": "$problem_definition.seconds_per_mixer",
"horizon": "$problem_definition.timeseries_settings.horizon",
},
}
]
)
# special forecasting dispatch
elif tss.is_timeseries:
submodels.extend([
{
"module": "XGBoostArrayMixer",
"args": {},
},
])

model = {
"module": "BestOf",
Expand Down Expand Up @@ -571,7 +555,7 @@ def add_implicit_values(json_ai: JsonAI) -> JsonAI:
"target_encoder", "$encoders[self.target]"
)

elif mixers[i]["module"] == "LightGBMArray":
elif mixers[i]["module"] in ("LightGBMArray", "XGBoostArrayMixer"):
mixers[i]["args"]["input_cols"] = mixers[i]["args"].get(
"input_cols", "$input_cols"
)
Expand Down
3 changes: 2 additions & 1 deletion lightwood/mixer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from lightwood.mixer.neural import Neural
from lightwood.mixer.neural_ts import NeuralTs
from lightwood.mixer.xgboost import XGBoostMixer
from lightwood.mixer.xgboost_array import XGBoostArrayMixer
from lightwood.mixer.random_forest import RandomForest
from lightwood.mixer.sktime import SkTime
from lightwood.mixer.arima import ARIMAMixer
Expand Down Expand Up @@ -43,4 +44,4 @@

__all__ = ['BaseMixer', 'Neural', 'NeuralTs', 'LightGBM', 'RandomForest', 'LightGBMArray', 'Unit', 'Regression',
'SkTime', 'QClassic', 'ProphetMixer', 'ETSMixer', 'ARIMAMixer', 'NHitsMixer', 'GluonTSMixer', 'XGBoostMixer',
'TabTransformerMixer']
'TabTransformerMixer', 'XGBoostArrayMixer']
45 changes: 32 additions & 13 deletions lightwood/mixer/nhits.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd
from neuralforecast import NeuralForecast
from neuralforecast.models.nhits import NHITS
from neuralforecast.models.nbeats import NBEATS
from neuralforecast.losses.pytorch import MQLoss

from lightwood.helpers.log import log
Expand All @@ -20,6 +21,7 @@ class NHitsMixer(BaseMixer):
model_path: str
hyperparam_search: bool
default_config: dict
SUPPORTED_MODELS = ('nhits', 'nbeats')

def __init__(
self,
Expand Down Expand Up @@ -56,7 +58,14 @@ def __init__(
self.grouped_by = ['__default'] if not ts_analysis['tss'].group_by else ts_analysis['tss'].group_by
self.group_boundaries = {} # stores last observed timestamp per series
self.train_args = train_args.get('trainer_args', {}) if train_args else {}
self.train_args['early_stop_patience_steps'] = self.train_args.get('early_stop_patience_steps', 10)

# we set a fairly aggressive training schedule by default
self.train_args['early_stop_patience_steps'] = self.train_args.get('early_stop_patience_steps', 1)
self.train_args['val_check_steps'] = self.train_args.get('val_check_steps', 10)
self.train_args['learning_rate'] = self.train_args.get('learning_rate', 3e-3)
self.train_args['mlp_units'] = self.train_args.get('mlp_units', [[128, 128], [128, 128]])
self.train_args['random_seed'] = self.train_args.get('random_seed', 1)

self.conf_level = self.train_args.pop('conf_level', [90])
for level in self.conf_level:
assert 0 <= level <= 100, f'A provided level is not in the [0, 100] range (found: {level})'
Expand All @@ -74,18 +83,24 @@ def __init__(
'T': 'hourly', # NOTE: use another pre-trained model once available
'S': 'hourly' # NOTE: use another pre-trained model once available
}
self.model = None
self.model_class_str = self.train_args.get('model_class', 'nhits').lower()
assert self.model_class_str in NHitsMixer.SUPPORTED_MODELS, f'Provided model class ({self.model_class_str}) is not supported. Supported models are: {NHitsMixer.SUPPORTED_MODELS}' # noqa
self.model_class = NBEATS if self.model_class_str == 'nbeats' else NHITS
self.model_name = None
self.model_names = {
'hourly': 'nhits_m4_hourly.ckpt', # hourly (non-tiny)
'daily': 'nhits_m4_daily.ckpt', # daily
'monthly': 'nhits_m4_monthly.ckpt', # monthly
'yearly': 'nhits_m4_yearly.ckpt', # yearly
'nhits': {
'hourly': 'nhits_m4_hourly.ckpt', # hourly (non-tiny)
'daily': 'nhits_m4_daily.ckpt', # daily
'monthly': 'nhits_m4_monthly.ckpt', # monthly
'yearly': 'nhits_m4_yearly.ckpt', # yearly
},
'nbeats': {} # TODO: complete
}
self.model_name = None
self.model = None

def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
"""
Fits the N-HITS model.
Fits the NeuralForecast model.
""" # noqa
log.info('Started fitting N-HITS forecasting model')

Expand All @@ -110,7 +125,7 @@ def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
None)
self.model_name = self.model_names['hourly'] if self.model_name is None else self.model_name
ckpt_url = self.base_url + self.model_name
self.model = NHITS.load_from_checkpoint(ckpt_url)
self.model = self.model_class.load_from_checkpoint(ckpt_url)

if not self.window < self.model.hparams.n_time_in:
log.info(f'NOTE: Provided window ({self.window}) is smaller than specified model input length ({self.model.hparams.n_time_in}). Will train a new model from scratch.') # noqa
Expand All @@ -126,8 +141,8 @@ def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
new_window = max(1, n_time - self.horizon - 1)
self.window = new_window
log.info(f'Window {self.window} is too long for data provided (group: {df[gby].value_counts()[::-1].index[0]}), reducing window to {new_window}.') # noqa
model = NHITS(h=n_time_out, input_size=self.window, **self.train_args, loss=MQLoss(level=self.conf_level))
self.model = NeuralForecast(models=[model], freq=self.ts_analysis['sample_freqs']['__default'])
model = self.model_class(h=n_time_out, input_size=self.window, **self.train_args, loss=MQLoss(level=self.conf_level)) # noqa
self.model = NeuralForecast(models=[model], freq=self.ts_analysis['sample_freqs']['__default'],)
self.model.fit(df=Y_df, val_size=n_ts_val)
log.info('Successfully trained N-HITS forecasting model.')

Expand Down Expand Up @@ -156,7 +171,11 @@ def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs],
level = max(self.conf_level)

target_cols = ['prediction', 'lower', 'upper']
pred_cols = ['NHITS-median', f'NHITS-lo-{level}', f'NHITS-hi-{level}']
pred_cols = [
f'{self.model_class_str.upper()}-median',
f'{self.model_class_str.upper()}-lo-{level}',
f'{self.model_class_str.upper()}-hi-{level}'
]

input_df, idxs = self._make_initial_df(deepcopy(ds.data_frame))
length = sum(ds.encoded_ds_lengths) if isinstance(ds, ConcatedEncodedDs) else len(ds)
Expand Down Expand Up @@ -189,7 +208,7 @@ def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs],

def _make_initial_df(self, df, mode='inference'):
"""
Prepares a dataframe for the NHITS model according to what neuralforecast expects.
Prepares a dataframe for the model according to what neuralforecast expects.
If a per-group boundary exists, this method additionally drops out all observations prior to the cutoff.
""" # noqa
Expand Down
4 changes: 2 additions & 2 deletions lightwood/mixer/xgboost.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def __init__(
self.use_optuna = use_optuna
self.params = {}
self.fit_on_dev = fit_on_dev
self.cls_dtypes = [dtype.categorical, dtype.binary] # , dtype.cat_tsarray] # TODO
self.float_dtypes = [dtype.float, dtype.quantity] # , dtype.num_tsarray] # TODO
self.cls_dtypes = [dtype.categorical, dtype.binary, dtype.cat_tsarray]
self.float_dtypes = [dtype.float, dtype.quantity, dtype.num_tsarray]
self.num_dtypes = [dtype.integer] + self.float_dtypes
self.supports_proba = dtype_dict[target] in self.cls_dtypes
self.stable = True
Expand Down
97 changes: 97 additions & 0 deletions lightwood/mixer/xgboost_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import Dict, List, Union, Optional
from copy import deepcopy

import numpy as np
import pandas as pd

from lightwood.helpers.log import log
from lightwood.encoder.base import BaseEncoder
from lightwood.mixer.base import BaseMixer
from lightwood.mixer.xgboost import XGBoostMixer
from lightwood.api.types import PredictionArguments, TimeseriesSettings
from lightwood.data.encoded_ds import EncodedDs, ConcatedEncodedDs


class XGBoostArrayMixer(BaseMixer):
"""XGBoost-based model, intended for usage in forecasting tasks."""
models: List[XGBoostMixer]
submodel_stop_after: float
target: str
supports_proba: bool
ts_analysis: Dict
tss: TimeseriesSettings

def __init__(
self,
stop_after: float,
target: str,
dtype_dict: Dict[str, str],
input_cols: List[str],
fit_on_dev: bool,
target_encoder: BaseEncoder,
ts_analysis: Dict[str, object],
use_stl: bool,
tss: TimeseriesSettings
):
super().__init__(stop_after)
self.tss = tss
self.horizon = tss.horizon
self.submodel_stop_after = stop_after / self.horizon
self.target = target
self.offset_pred_cols = [f'{self.target}_timestep_{i}' for i in range(1, self.horizon)]
if set(input_cols) != {self.tss.order_by}:
input_cols.remove(self.tss.order_by)
for col in self.offset_pred_cols:
dtype_dict[col] = dtype_dict[self.target]
self.models = [XGBoostMixer(self.submodel_stop_after,
target_col,
dtype_dict,
input_cols,
False, # fit_on_dev,
False, # use_optuna
target_encoder)
for _, target_col in zip(range(self.horizon), [target] + self.offset_pred_cols)]
self.ts_analysis = ts_analysis
self.supports_proba = False
self.use_stl = False
self.stable = False

def _fit(self, train_data: EncodedDs, dev_data: EncodedDs, submodel_method='fit') -> None:
original_train = deepcopy(train_data.data_frame)
original_dev = deepcopy(dev_data.data_frame)

for timestep in range(self.horizon):
getattr(self.models[timestep], submodel_method)(train_data, dev_data)

# restore dfs
train_data.data_frame = original_train
dev_data.data_frame = original_dev

def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
log.info('Started fitting LGBM models for array prediction')
self._fit(train_data, dev_data, submodel_method='fit')

def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs, args: Optional[dict] = None) -> None:
log.info('Updating array of LGBM models...')
self._fit(train_data, dev_data, submodel_method='partial_fit')

def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs],
args: PredictionArguments = PredictionArguments()) -> pd.DataFrame:
if args.predict_proba:
log.warning('This model does not output probability estimates')

original_df = deepcopy(ds.data_frame)
length = sum(ds.encoded_ds_lengths) if isinstance(ds, ConcatedEncodedDs) else len(ds)
ydf = pd.DataFrame(0, # zero-filled
index=np.arange(length),
columns=[f'prediction_{i}' for i in range(self.horizon)])

for timestep in range(self.horizon):
ydf[f'prediction_{timestep}'] = self.models[timestep](ds, args)['prediction'].values

if self.models[0].positive_domain:
ydf = ydf.clip(0)

ydf['prediction'] = ydf.values.tolist()
ds.data_frame = original_df
return ydf[['prediction']]
2 changes: 1 addition & 1 deletion tests/integration/basic/test_model_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ def test_5_timeseries_t_plus_n(self):
'window': 5
}
}
expected_mixers = ['NeuralTs', 'SkTime', 'ARIMAMixer', 'ETSMixer']
expected_mixers = ['NeuralTs', 'XGBoostArrayMixer']
mixers = self.get_mixers(df, target, prob_kwargs=prob_kwargs)
self.assertEqual(set(mixers), set(expected_mixers))

0 comments on commit 0fa5389

Please sign in to comment.