Skip to content

Feature: Implement series-specific model selection using meta-features #1211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions ads/opctl/operator/lowcode/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@


class AbstractData(ABC):
def __init__(self, spec, name="input_data", data=None):
def __init__(self, spec, name="input_data", data=None, subset=None):
self.Transformations = Transformations
self.data = None
self._data_dict = dict()
self.name = name
self.spec = spec
self.subset = subset
if data is not None:
self.data = data
else:
self.load_transform_ingest_data(spec)
# Subset by series if requested
# if self.subset is not None and hasattr(self, 'data') and self.data is not None:
# subset_str = [str(s) for s in self.subset]
# self.data = self.data[self.data.index.get_level_values(DataColumns.Series).isin(subset_str)]

def get_raw_data_by_cat(self, category):
mapping = self._data_transformer.get_target_category_columns_map()
Expand Down Expand Up @@ -72,7 +77,7 @@ def get_data_for_series(self, series_id):
def _load_data(self, data_spec, **kwargs):
loading_start_time = time.time()
try:
raw_data = load_data(data_spec)
raw_data = load_data(data_spec, subset=self.subset if self.subset else None, target_category_columns=self.spec.target_category_columns)
except InvalidParameterError as e:
e.args = e.args + (f"Invalid Parameter: {self.name}",)
raise e
Expand Down
207 changes: 207 additions & 0 deletions ads/opctl/operator/lowcode/common/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,210 @@ def get_target_category_columns_map(self):
def _fill_na(self, df: pd.DataFrame, na_value=0) -> pd.DataFrame:
"""Fill nans in dataframe"""
return df.fillna(value=na_value)

def build_fforms_meta_features(self, data, target_col=None, group_cols=None):
"""
Build meta-features for time series based on FFORMS paper and add them to the original DataFrame.

Parameters
----------
data : pandas.DataFrame
Input DataFrame containing time series data
target_col : str, optional
Name of the target column to calculate meta-features for.
If None, uses the target column specified in dataset_info.
group_cols : list of str, optional
List of columns to group by before calculating meta-features.
If None, calculates features for the entire series.

Returns
-------
pandas.DataFrame
Original DataFrame with additional meta-feature columns

References
----------
Talagala, T. S., Hyndman, R. J., & Athanasopoulos, G. (2023).
Meta-learning how to forecast time series. Journal of Forecasting, 42(6), 1476-1501.
"""
if not isinstance(data, pd.DataFrame):
raise ValueError("Input must be a pandas DataFrame")

# Use target column from dataset_info if not specified
if target_col is None:
target_col = self.target_column_name
if target_col not in data.columns:
raise ValueError(f"Target column '{target_col}' not found in DataFrame")

# Check if group_cols are provided and valid
if group_cols is not None:
if not isinstance(group_cols, list):
raise ValueError("group_cols must be a list of column names")
for col in group_cols:
if col not in data.columns:
raise ValueError(f"Group column '{col}' not found in DataFrame")

# If no group_cols, get the target_category_columns else treat the entire DataFrame as a single series
if not group_cols:
group_cols = self.target_category_columns if self.target_category_columns else []

# Calculate meta-features for each series
def calculate_series_features(series):
"""Calculate features for a single series"""
n = len(series)
values = series.values

# Basic statistics
mean = series.mean()
std = series.std()
variance = series.var()
skewness = series.skew()
kurtosis = series.kurtosis()
cv = std / mean if mean != 0 else np.inf

# Trend features
X = np.vstack([np.arange(n), np.ones(n)]).T
trend_coef = np.linalg.lstsq(X, values, rcond=None)[0][0]
trend_pred = X.dot(np.linalg.lstsq(X, values, rcond=None)[0])
residuals = values - trend_pred
std_residuals = np.std(residuals)

# Turning points
turning_points = 0
for i in range(1, n-1):
if (values[i-1] < values[i] and values[i] > values[i+1]) or \
(values[i-1] > values[i] and values[i] < values[i+1]):
turning_points += 1
turning_points_rate = turning_points / (n-2) if n > 2 else 0

# Serial correlation
acf1 = series.autocorr(lag=1) if n > 1 else 0
acf2 = series.autocorr(lag=2) if n > 2 else 0
acf10 = series.autocorr(lag=10) if n > 10 else 0

# Seasonality features
seasonal_strength = 0
seasonal_peak_strength = 0
if n >= 12:
seasonal_lags = [12, 24, 36]
seasonal_acfs = []
for lag in seasonal_lags:
if n > lag:
acf_val = series.autocorr(lag=lag)
seasonal_acfs.append(abs(acf_val))
seasonal_peak_strength = max(seasonal_acfs) if seasonal_acfs else 0

ma = series.rolling(window=12, center=True).mean()
seasonal_comp = series - ma
seasonal_strength = 1 - np.var(seasonal_comp.dropna()) / np.var(series)

# Stability and volatility features
values_above_mean = values >= mean
crossing_points = np.sum(values_above_mean[1:] != values_above_mean[:-1])
crossing_rate = crossing_points / (n - 1) if n > 1 else 0

# First and second differences
diff1 = np.diff(values)
diff2 = np.diff(diff1) if len(diff1) > 1 else np.array([])

diff1_mean = np.mean(np.abs(diff1)) if len(diff1) > 0 else 0
diff1_var = np.var(diff1) if len(diff1) > 0 else 0
diff2_mean = np.mean(np.abs(diff2)) if len(diff2) > 0 else 0
diff2_var = np.var(diff2) if len(diff2) > 0 else 0

# Nonlinearity features
if n > 3:
X = values[:-1].reshape(-1, 1)
y = values[1:]
X2 = X * X
X3 = X * X * X
X_aug = np.hstack([X, X2, X3])
nonlinearity = np.linalg.lstsq(X_aug, y, rcond=None)[1][0] if len(y) > 0 else 0
else:
nonlinearity = 0

# Long-term trend features
if n >= 10:
mid = n // 2
trend_change = np.mean(values[mid:]) - np.mean(values[:mid])
else:
trend_change = 0

# Step changes and spikes
step_changes = np.abs(diff1).max() if len(diff1) > 0 else 0
spikes = np.sum(np.abs(values - mean) > 2 * std) / n if std != 0 else 0

# Hurst exponent and entropy
lag = min(10, n // 2)
variance_ratio = np.var(series.diff(lag)) / (lag * np.var(series.diff())) if n > lag else 0
hurst = np.log(variance_ratio) / (2 * np.log(lag)) if variance_ratio > 0 and lag > 1 else 0

hist, _ = np.histogram(series, bins='auto', density=True)
entropy = -np.sum(hist[hist > 0] * np.log(hist[hist > 0]))

return pd.Series({
'ts_n_obs': n,
'ts_mean': mean,
'ts_std': std,
'ts_variance': variance,
'ts_cv': cv,
'ts_skewness': skewness,
'ts_kurtosis': kurtosis,
'ts_trend': trend_coef,
'ts_trend_change': trend_change,
'ts_std_residuals': std_residuals,
'ts_turning_points_rate': turning_points_rate,
'ts_seasonal_strength': seasonal_strength,
'ts_seasonal_peak_strength': seasonal_peak_strength,
'ts_acf1': acf1,
'ts_acf2': acf2,
'ts_acf10': acf10,
'ts_crossing_rate': crossing_rate,
'ts_diff1_mean': diff1_mean,
'ts_diff1_variance': diff1_var,
'ts_diff2_mean': diff2_mean,
'ts_diff2_variance': diff2_var,
'ts_nonlinearity': nonlinearity,
'ts_step_max': step_changes,
'ts_spikes_rate': spikes,
'ts_hurst': hurst,
'ts_entropy': entropy
})

# Create copy of input DataFrame
result_df = data.copy()

if group_cols:
# Calculate features for each group
features = []
# Sort by date within each group if date column exists
date_col = self.dt_column_name if self.dt_column_name else 'Date'
if date_col in data.columns:
data = data.sort_values([date_col] + group_cols)

for name, group in data.groupby(group_cols):
# Sort group by date if exists
if date_col in group.columns:
group = group.sort_values(date_col)
group_features = calculate_series_features(group[target_col])
if isinstance(name, tuple):
feature_row = dict(zip(group_cols, name))
else:
feature_row = {group_cols[0]: name}
feature_row.update(group_features)
features.append(feature_row)

# Create features DataFrame without merging
features_df = pd.DataFrame(features)
# Return only the meta-features DataFrame with group columns
return features_df
else:
# Sort by date if exists and calculate features for entire series
date_col = self.dt_column_name if self.dt_column_name else 'Date'
if date_col in data.columns:
data = data.sort_values(date_col)
features = calculate_series_features(data[target_col])
# Return single row DataFrame with meta-features
return pd.DataFrame([features])

return result_df
8 changes: 8 additions & 0 deletions ads/opctl/operator/lowcode/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ def load_data(data_spec, storage_options=None, **kwargs):
data = data[columns]
if limit:
data = data[:limit]
# Filtering by subset if provided
subset = kwargs.get('subset', None)
if subset is not None:
target_category_columns = kwargs.get('target_category_columns', None)
mask = False
for col in target_category_columns:
mask = mask | data[col].isin(subset)
data = data[mask]
return data


Expand Down
56 changes: 53 additions & 3 deletions ads/opctl/operator/lowcode/forecast/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
# Copyright (c) 2023, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import copy
import json
import os
import sys
from typing import Dict, List

import pandas as pd
import yaml

from ads.opctl import logger
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.utils import _parse_input_args

from .const import AUTO_SELECT_SERIES
from .model.forecast_datasets import ForecastDatasets, ForecastResults
from .operator_config import ForecastOperatorConfig
from .whatifserve import ModelDeploymentManager
Expand All @@ -24,9 +27,56 @@ def operate(operator_config: ForecastOperatorConfig) -> ForecastResults:
from .model.factory import ForecastOperatorModelFactory

datasets = ForecastDatasets(operator_config)
results = ForecastOperatorModelFactory.get_model(
operator_config, datasets
).generate_report()
model = ForecastOperatorModelFactory.get_model(operator_config, datasets)

if operator_config.spec.model == AUTO_SELECT_SERIES and hasattr(
operator_config.spec, "meta_features"
):
# For AUTO_SELECT_SERIES, handle each series with its specific model
meta_features = operator_config.spec.meta_features
results = ForecastResults()
sub_results_list = []

# Group the data by selected model
for model_name in meta_features["selected_model"].unique():
# Get series that use this model
series_groups = meta_features[meta_features["selected_model"] == model_name]

# Create a sub-config for this model
sub_config = copy.deepcopy(operator_config)
sub_config.spec.model = model_name

# Create sub-datasets for these series
sub_datasets = ForecastDatasets(
operator_config,
subset=series_groups[operator_config.spec.target_category_columns]
.values.flatten()
.tolist(),
)

# Get and run the appropriate model
sub_model = ForecastOperatorModelFactory.get_model(sub_config, sub_datasets)
sub_result_df, sub_elapsed_time = sub_model.build_model()
sub_results = sub_model.generate_report(
result_df=sub_result_df,
elapsed_time=sub_elapsed_time,
save_sub_reports=True,
)
sub_results_list.append(sub_results)

# results_df = pd.concat([results_df, sub_result_df], ignore_index=True, axis=0)
# elapsed_time += sub_elapsed_time
# Merge all sub_results into a single ForecastResults object
if sub_results_list:
results = sub_results_list[0]
for sub_result in sub_results_list[1:]:
results.merge(sub_result)
else:
results = None

else:
# For other cases, use the single selected model
results = model.generate_report()
# saving to model catalog
spec = operator_config.spec
if spec.what_if_analysis and datasets.additional_data:
Expand Down
1 change: 1 addition & 0 deletions ads/opctl/operator/lowcode/forecast/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ class ForecastOutputColumns(ExtendedEnum):
PROPHET_INTERNAL_DATE_COL = "ds"
RENDER_LIMIT = 5000
AUTO_SELECT = "auto-select"
AUTO_SELECT_SERIES = "auto-select-series"
BACKTEST_REPORT_NAME = "back_test.csv"
Loading