Skip to content

Commit

Permalink
Merge pull request #1109 from mindsdb/staging
Browse files Browse the repository at this point in the history
Release 23.2.1.0
  • Loading branch information
paxcema authored Feb 9, 2023
2 parents d93f8b5 + 1062ee8 commit e77a859
Show file tree
Hide file tree
Showing 21 changed files with 455 additions and 146 deletions.
2 changes: 1 addition & 1 deletion lightwood/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = 'lightwood'
__package_name__ = 'lightwood'
__version__ = '23.1.2.1'
__version__ = '23.2.1.0'
__description__ = "Lightwood is a toolkit for automatic machine learning model building"
__email__ = "[email protected]"
__author__ = 'MindsDB Inc'
Expand Down
12 changes: 8 additions & 4 deletions lightwood/analysis/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ def model_analyzer(

# raw predictions for validation dataset
args = {} if not is_classification else {"predict_proba": True}
filtered_df = filter_ds(encoded_val_data, tss)
encoded_val_data = EncodedDs(encoded_val_data.encoders, filtered_df, encoded_val_data.target)
normal_predictions = predictor(encoded_val_data, args=PredictionArguments.from_dict(args))
normal_predictions = normal_predictions.set_index(encoded_val_data.data_frame.index)
filtered_df = encoded_val_data.data_frame
normal_predictions = None

if len(analysis_blocks) > 0:
filtered_df = filter_ds(encoded_val_data, tss)
encoded_val_data = EncodedDs(encoded_val_data.encoders, filtered_df, encoded_val_data.target)
normal_predictions = predictor(encoded_val_data, args=PredictionArguments.from_dict(args))
normal_predictions = normal_predictions.set_index(encoded_val_data.data_frame.index)

# ------------------------- #
# Run analysis blocks, both core and user-defined
Expand Down
35 changes: 22 additions & 13 deletions lightwood/analysis/explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,30 @@ def explain(data: pd.DataFrame,
# ------------------------- #
# Setup base insights
# ------------------------- #
data = data.reset_index(drop=True)
predictions = predictions.reset_index(drop=True)

data = data.reset_index(drop=True)
tss = problem_definition.timeseries_settings
row_insights = pd.DataFrame()
global_insights = {}

def _reformat_ts_columns(tss, out_df, in_df):
if tss.is_timeseries:
if tss.group_by:
for col in tss.group_by:
out_df[f'group_{col}'] = in_df[col]

out_df[f'order_{tss.order_by}'] = in_df[tss.order_by]
out_df[f'order_{tss.order_by}'] = get_inferred_timestamps(
out_df, tss.order_by, ts_analysis['deltas'], tss, stat_analysis,
time_format=pred_args.time_format
)
return out_df

if not explainer_blocks:
predictions.rename(columns={'__mdb_original_index': 'original_index'}, inplace=True)
predictions = _reformat_ts_columns(tss, predictions, data)
return predictions, global_insights

row_insights['original_index'] = data['__mdb_original_index']
row_insights['prediction'] = predictions['prediction']

Expand All @@ -50,17 +69,7 @@ def explain(data: pd.DataFrame,
if '__mdb_proba' in col:
row_insights[col] = predictions[col]

tss = problem_definition.timeseries_settings
if tss.is_timeseries:
if tss.group_by:
for col in tss.group_by:
row_insights[f'group_{col}'] = data[col]

row_insights[f'order_{tss.order_by}'] = data[tss.order_by]
row_insights[f'order_{tss.order_by}'] = get_inferred_timestamps(
row_insights, tss.order_by, ts_analysis['deltas'], tss, stat_analysis,
time_format=pred_args.time_format
)
row_insights = _reformat_ts_columns(tss, row_insights, data)

kwargs = {
'data': data,
Expand Down
29 changes: 20 additions & 9 deletions lightwood/api/json_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,20 +929,29 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
# Prepare the training + dev data
concatenated_train_dev = pd.concat([data['train'], data['dev']])
encoder_prepping_dict = {{}}
prepped_encoders = {{}}
# Prepare encoders that do not require learned strategies
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
encoder_prepping_dict[col_name] = [encoder, concatenated_train_dev[col_name], 'prepare']
# Prepare input encoders
parallel_encoding = parallel_encoding_check(data['train'], self.encoders)
if parallel_encoding:
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
prepped_encoders[col_name] = (encoder, concatenated_train_dev[col_name], 'prepare')
prepped_encoders = mut_method_call(prepped_encoders)
else:
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
encoder.prepare(concatenated_train_dev[col_name])
prepped_encoders[col_name] = encoder
# Setup parallelization
parallel_prepped_encoders = mut_method_call(encoder_prepping_dict)
for col_name, encoder in parallel_prepped_encoders.items():
# Store encoders
for col_name, encoder in prepped_encoders.items():
self.encoders[col_name] = encoder
# Prepare the target
if self.target not in parallel_prepped_encoders:
if self.target not in prepped_encoders:
if self.encoders[self.target].is_trainable_encoder:
self.encoders[self.target].prepare(data['train'][self.target], data['dev'][self.target])
else:
Expand Down Expand Up @@ -1024,6 +1033,8 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
raise e
# Update mixers to trained versions
if not trained_mixers:
raise Exception('No mixers could be trained! Please verify your problem definition or JsonAI model representation.')
self.mixers = trained_mixers
# --------------- #
Expand Down
6 changes: 5 additions & 1 deletion lightwood/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ class PredictionArguments:
:param predict_proba: triggers (where supported) predictions in raw probability output form. I.e. for classifiers,
instead of returning only the predicted class, the output additionally includes the assigned probability for
each class.
:param all_mixers: forces an ensemble to return predictions emitted by all its internal mixers.
:param all_mixers: forces an ensemble to return predictions emitted by all its internal mixers.
:param mixer_weights: a list with coefficients that are normalized into 0-1 bounded scores to mix the output of all mixers available to a compatible ensemble (e.g. [0.5, 0.5] for an ensemble with two mixers would yield the mean prediction). Can be used with WeightedMeanEnsemble, StackedEnsemble or TsStackedEnsemble.
:param fixed_confidence: Used in the ICP analyzer module, specifies an `alpha` fixed confidence so that predictions, in average, are correct `alpha` percent of the time. For unsupervised anomaly detection, this also translates into the expected error rate. Bounded between 0.01 and 0.99 (respectively implies wider and tighter bounds, all other parameters being equal).
:param anomaly_cooldown: Sets the minimum amount of timesteps between consecutive firings of the the anomaly detector.
:param simple_ts_bounds: in forecasting contexts, enabling this parameter disables the usual conformal-based bounds (with Bonferroni correction) and resorts to a simpler way of scaling bounds through the horizon based on the uncertainty estimation for the first value in the forecast (see helpers.ts.add_tn_num_conf_bounds for the implementation).
Expand All @@ -445,6 +446,7 @@ class PredictionArguments:

predict_proba: bool = True
all_mixers: bool = False
mixer_weights: list = None
fixed_confidence: Union[int, float, None] = None
anomaly_cooldown: int = 1
forecast_offset: int = 0
Expand All @@ -465,6 +467,7 @@ def from_dict(obj: Dict):
# maybe this should be stateful instead, and save the latest used value for each field?
predict_proba = obj.get('predict_proba', PredictionArguments.predict_proba)
all_mixers = obj.get('all_mixers', PredictionArguments.all_mixers)
mixer_weights = obj.get('mixer_weights', PredictionArguments.mixer_weights)
fixed_confidence = obj.get('fixed_confidence', PredictionArguments.fixed_confidence)
anomaly_cooldown = obj.get('anomaly_cooldown', PredictionArguments.anomaly_cooldown)
forecast_offset = obj.get('forecast_offset', PredictionArguments.forecast_offset)
Expand All @@ -475,6 +478,7 @@ def from_dict(obj: Dict):
pred_args = PredictionArguments(
predict_proba=predict_proba,
all_mixers=all_mixers,
mixer_weights=mixer_weights,
fixed_confidence=fixed_confidence,
anomaly_cooldown=anomaly_cooldown,
forecast_offset=forecast_offset,
Expand Down
30 changes: 21 additions & 9 deletions lightwood/ensemble/best_of.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import numpy as np
import pandas as pd
from mindsdb_evaluator import evaluate_accuracies

from lightwood.helpers.log import log
from type_infer.helpers import is_nan_numeric
from lightwood.mixer.base import BaseMixer
from lightwood.ensemble.base import BaseEnsemble
from lightwood.api.types import PredictionArguments, SubmodelData
from lightwood.data.encoded_ds import EncodedDs
from mindsdb_evaluator import evaluate_accuracies

# special dispatches
from lightwood.mixer import GluonTSMixer # imported from base mixer folder as it needs optional dependencies


class BestOf(BaseEnsemble):
Expand All @@ -22,19 +25,24 @@ class BestOf(BaseEnsemble):
def __init__(self, target, mixers: List[BaseMixer], data: EncodedDs, accuracy_functions,
args: PredictionArguments, ts_analysis: Optional[dict] = None, fit: bool = True) -> None:
super().__init__(target, mixers, data, fit=False)
self.special_dispatch_list = [GluonTSMixer]

if fit:
score_list = []
for _, mixer in enumerate(self.mixers):
score_dict = evaluate_accuracies(
data.data_frame,
mixer(data, args)['prediction'],
target,
accuracy_functions,
ts_analysis=ts_analysis
)

avg_score = np.mean(list(score_dict.values()))
if type(mixer) in self.special_dispatch_list:
avg_score = self.special_dispatch(mixer, data, args, target, ts_analysis)
else:
score_dict = evaluate_accuracies(
data.data_frame,
mixer(data, args)['prediction'],
target,
accuracy_functions,
ts_analysis=ts_analysis
)

avg_score = np.mean(list(score_dict.values()))
log.info(f'Mixer: {type(mixer).__name__} got accuracy: {avg_score}')

if is_nan_numeric(avg_score):
Expand Down Expand Up @@ -73,3 +81,7 @@ def __call__(self, ds: EncodedDs, args: PredictionArguments) -> pd.DataFrame:
else:
log.warning(f'Unstable mixer {type(mixer).__name__} failed with exception: {e}.\
Trying next best')

def special_dispatch(self, mixer, data, args, target, ts_analysis):
if isinstance(mixer, GluonTSMixer):
return mixer.model_train_stats.loss_history[-1]
7 changes: 4 additions & 3 deletions lightwood/ensemble/stacked_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, target, mixers: List[BaseMixer], data: EncodedDs, dtype_dict:
self.optimizer = SGD([self.mixer_weights], lr=0.01)
self.agg_dim = 1

if fit:
if fit and len(mixers) > 1:
all_preds = torch.tensor(self.predict(data, args)).squeeze().reshape(-1, len(mixers))
actual = torch.tensor(data.data_frame[self.target_cols].values)

Expand All @@ -52,7 +52,7 @@ def _eval_loss():
self.optimizer.step(_eval_loss)
self.mixer_weights = self.softmax(self.mixer_weights)
log.info(f'Optimal stacking weights: {self.mixer_weights.detach().tolist()}')
self.prepared = True
self.prepared = True

def predict(self, ds: EncodedDs, args: PredictionArguments) -> List:
outputs = []
Expand All @@ -64,9 +64,10 @@ def predict(self, ds: EncodedDs, args: PredictionArguments) -> List:

def __call__(self, ds: EncodedDs, args: PredictionArguments) -> pd.DataFrame:
assert self.prepared
mixer_weights = torch.tensor(args.mixer_weights) if args.mixer_weights else self.mixer_weights
output = pd.DataFrame()
predictions = torch.tensor(self.predict(ds, args)).squeeze().reshape(-1, len(self.mixers))
predictions = (predictions * self.mixer_weights).sum(axis=1)
predictions = (predictions * mixer_weights).sum(axis=1)
output['prediction'] = predictions.detach().numpy().tolist()
return output

Expand Down
7 changes: 4 additions & 3 deletions lightwood/ensemble/ts_stacked_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, target, mixers: List[BaseMixer], data: EncodedDs, dtype_dict:
self.agg_dim = 2
self.opt_max_iter = 1000

if fit:
if fit and len(mixers) > 1:
all_preds = torch.tensor(self.predict(data, args)).squeeze().reshape(-1, self.horizon, len(mixers))
actual = torch.tensor(data.data_frame[self.target_cols].values)
nan_mask = actual != actual
Expand All @@ -54,12 +54,13 @@ def _eval_loss():
optimizer.step(_eval_loss)
self.mixer_weights = self.softmax(self.mixer_weights)
log.info(f'Optimal stacking weights: {self.mixer_weights.detach().tolist()}')
self.prepared = True
self.prepared = True

def __call__(self, ds: EncodedDs, args: PredictionArguments) -> pd.DataFrame:
assert self.prepared
mixer_weights = torch.tensor(args.mixer_weights) if args.mixer_weights else self.mixer_weights
output = pd.DataFrame()
predictions = torch.tensor(np.concatenate(self.predict(ds, args), axis=2))
predictions = (predictions * self.mixer_weights).sum(axis=self.agg_dim)
predictions = (predictions * mixer_weights).sum(axis=self.agg_dim)
output['prediction'] = predictions.detach().numpy().tolist()
return output
3 changes: 2 additions & 1 deletion lightwood/ensemble/weighted_mean_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def __call__(self, ds: EncodedDs, args: PredictionArguments) -> pd.DataFrame:
for mixer in self.mixers:
df[f'__mdb_mixer_{type(mixer).__name__}'] = mixer(ds, args=args)['prediction']

avg_predictions_df = df.apply(lambda x: np.average(x, weights=self.weights), axis='columns')
mixer_weights = args.mixer_weights if args.mixer_weights else self.weights
avg_predictions_df = df.apply(lambda x: np.average(x, weights=mixer_weights), axis='columns')
return pd.DataFrame(avg_predictions_df, columns=['prediction'])

@staticmethod
Expand Down
19 changes: 19 additions & 0 deletions lightwood/helpers/parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import multiprocessing as mp
from lightwood.helpers.log import log

MAX_SEQ_ENCODERS = 20
MAX_SEQ_LEN = 100_000


def get_nr_procs(df=None):
if 'LIGHTWOOD_N_WORKERS' in os.environ:
Expand Down Expand Up @@ -54,3 +57,19 @@ def mut_method_call(object_dict: Dict[str, tuple]) -> Dict[str, object]:
pool.join()

return dict(return_dict)


def parallel_encoding_check(df, encoders):
"""
Given a dataframe and some encoders, this rule-based method determines whether to train these encoders in parallel.
This has runtime implications, as instancing a new Lightwood process has noticeable overhead.
""" # noqa
trainable_encoders = [enc for col, enc in encoders.items() if enc.is_trainable_encoder]

if len(trainable_encoders) > MAX_SEQ_ENCODERS:
return True

if len(df) > MAX_SEQ_LEN:
return True

return False
10 changes: 8 additions & 2 deletions lightwood/helpers/seed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import random
import torch
import numpy as np
import mxnet as mx

try:
import mxnet as mx
except Exception:
mx = None


def seed(seed_nr: int) -> None:
Expand All @@ -10,4 +14,6 @@ def seed(seed_nr: int) -> None:
torch.backends.cudnn.benchmark = False
np.random.seed(seed_nr)
random.seed(seed_nr)
mx.random.seed(seed_nr)

if mx is not None:
mx.random.seed(seed_nr)
23 changes: 18 additions & 5 deletions lightwood/helpers/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@
* permission of MindsDB Inc
*******************************************************
"""
import os

import nltk
from contextlib import redirect_stdout

from lightwood.helpers.log import log


try:
nltk.data.find('tokenizers/punkt')
with redirect_stdout(open(os.devnull, "w")):
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
try:
nltk.download('punkt', quiet=True)
except Exception:
log.error("NLTK was unable to download the 'punkt' package. Please check your connection and try again!")

try:
from nltk.corpus import stopwords
stopwords.words('english')
with redirect_stdout(open(os.devnull, "w")):
from nltk.corpus import stopwords
stopwords.words('english')
except LookupError:
nltk.download('stopwords', quiet=True)
try:
nltk.download('stopwords', quiet=True)
except Exception:
log.error("NLTK was unable to download the 'stopwords' package. Please check your connection and try again!")
6 changes: 5 additions & 1 deletion lightwood/mixer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from lightwood.mixer.sktime import SkTime
from lightwood.mixer.arima import ARIMAMixer
from lightwood.mixer.ets import ETSMixer
from lightwood.mixer.gluonts import GluonTSMixer
from lightwood.mixer.regression import Regression

try:
Expand All @@ -25,6 +24,11 @@
except Exception:
ProphetMixer = None

try:
from lightwood.mixer.gluonts import GluonTSMixer
except Exception:
GluonTSMixer = None

try:
from lightwood.mixer.lightgbm import LightGBM
from lightwood.mixer.lightgbm_array import LightGBMArray
Expand Down
Loading

0 comments on commit e77a859

Please sign in to comment.