diff --git a/.flake8 b/.flake8 index ef1201e41..79215002d 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,4 @@ [flake8] max-line-length = 120 -ignore = E275,E402,F821,W503,W504,C408,W391 +ignore = E275,E402,F821,W503,W504,C408,W391,E721 exclude = .git,__pycache__,docs,docssrc diff --git a/.github/workflows/ligthtwood.yml b/.github/workflows/ligthtwood.yml index dfc58bd62..9cea8d930 100644 --- a/.github/workflows/ligthtwood.yml +++ b/.github/workflows/ligthtwood.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - python-version: ['3.8','3.9'] + python-version: ["3.8","3.9","3.10","3.11"] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/lightwood/__about__.py b/lightwood/__about__.py index 7fccf38f3..91a35f04a 100644 --- a/lightwood/__about__.py +++ b/lightwood/__about__.py @@ -1,6 +1,6 @@ __title__ = 'lightwood' __package_name__ = 'lightwood' -__version__ = '23.7.1.0' +__version__ = '23.8.1.0' __description__ = "Lightwood is a toolkit for automatic machine learning model building" __email__ = "community@mindsdb.com" __author__ = 'MindsDB Inc' diff --git a/lightwood/analysis/explain.py b/lightwood/analysis/explain.py index e9b91968b..b6d6dd297 100644 --- a/lightwood/analysis/explain.py +++ b/lightwood/analysis/explain.py @@ -4,12 +4,13 @@ from dataprep_ml import StatisticalAnalysis -from lightwood.helpers.log import log +from lightwood.helpers.log import log, timed from lightwood.api.types import ProblemDefinition, PredictionArguments from lightwood.helpers.ts import get_inferred_timestamps from lightwood.analysis.base import BaseAnalysisBlock +@timed def explain(data: pd.DataFrame, encoded_data: torch.Tensor, predictions: pd.DataFrame, diff --git a/lightwood/analysis/nc/calibrate.py b/lightwood/analysis/nc/calibrate.py index e58ec5d82..88d5c93ae 100644 --- a/lightwood/analysis/nc/calibrate.py +++ b/lightwood/analysis/nc/calibrate.py @@ -200,10 +200,11 @@ def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]: # save relevant predictions in the caches, then calibrate the ICP pred_cache = icp_df.pop(f'__predicted_{ns.target}').values if ns.is_multi_ts and ns.is_classification: - # output['label_encoders'].transform(preds.reshape(-1, 1)) pred_cache = output['label_encoders'].transform([[p[0] for p in pred_cache]]) elif ns.is_multi_ts: pred_cache = np.array([np.array(p) for p in pred_cache]) + elif ns.is_classification: + pred_cache = output['label_encoders'].transform(pred_cache.reshape(-1, 1)) icps[tuple(group)].nc_function.model.prediction_cache = pred_cache icp_df, y = clean_df(icp_df, ns, output.get('label_encoders', None)) diff --git a/lightwood/api/high_level.py b/lightwood/api/high_level.py index cd2de1a33..36d2dd9b3 100644 --- a/lightwood/api/high_level.py +++ b/lightwood/api/high_level.py @@ -1,5 +1,4 @@ import os -from types import ModuleType from typing import Union import dill import pandas as pd @@ -8,12 +7,8 @@ from type_infer.infer import infer_types from lightwood.api.predictor import PredictorInterface from lightwood.api.json_ai import generate_json_ai -import tempfile -from lightwood.api.json_ai import code_from_json_ai as _code_from_json_ai -import importlib.util +from lightwood.helpers.codegen import code_from_json_ai as _code_from_json_ai, _module_from_code, _predictor_from_code import sys -import random -import string import gc import time from lightwood.helpers.log import log @@ -107,10 +102,7 @@ def predictor_from_code(code: str) -> PredictorInterface: :returns: A lightwood ``Predictor`` object """ - module_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12)) - module_name += str(time.time()).replace('.', '') - predictor = _module_from_code(code, module_name).Predictor() - return predictor + return _predictor_from_code(code) def code_from_problem(df: pd.DataFrame, problem_definition: Union[ProblemDefinition, dict]) -> str: @@ -162,31 +154,6 @@ def predictor_from_state(state_file: str, code: str = None) -> PredictorInterfac return predictor -def _module_from_code(code: str, module_name: str) -> ModuleType: - """ - Create a python module (containing the generated ``Predictor`` class) from the code. This is both a python object and an associated temporary file on your filesystem - - :param code: The ``Predictor``'s code in text form - :param module_name: The name of the newly created module - - :returns: A python module object - """ # noqa - dirname = tempfile.gettempdir() - filename = os.urandom(24).hex() + str(time.time()).replace('.', '') + '.py' - path = os.path.join(dirname, filename) - if 'LIGHTWOOD_DEV_SAVE_TO' in os.environ: - path = os.environ['LIGHTWOOD_DEV_SAVE_TO'] - - with open(path, 'wb') as fp: - fp.write(code.encode('utf-8')) - spec = importlib.util.spec_from_file_location(module_name, fp.name) - temp_module = importlib.util.module_from_spec(spec) - sys.modules[module_name] = temp_module - spec.loader.exec_module(temp_module) - - return temp_module - - def predictor_from_json_ai(json_ai: JsonAI) -> PredictorInterface: """ Creates a ready-to-train ``Predictor`` object based on the details you specified inside your JsonAI. diff --git a/lightwood/api/json_ai.py b/lightwood/api/json_ai.py index b77d1f143..fbc775d16 100644 --- a/lightwood/api/json_ai.py +++ b/lightwood/api/json_ai.py @@ -1,69 +1,18 @@ -# TODO: _add_implicit_values unit test ensures NO changes for a fully specified file. +# TODO: add_implicit_values unit test ensures NO changes for a fully specified file. import inspect -from copy import deepcopy from type_infer.dtype import dtype from type_infer.base import TypeInformation from dataprep_ml import StatisticalAnalysis -from lightwood.helpers.log import log -from lightwood.helpers.templating import call, inline_dict, align from lightwood.helpers.templating import _consolidate_analysis_blocks, _add_cls_kwarg +from lightwood.helpers.constants import IMPORTS, IMPORT_EXTERNAL_DIRS from lightwood.api.types import ( JsonAI, ProblemDefinition, ) -from lightwood.__about__ import __version__ as lightwood_version import lightwood.ensemble - -# For custom modules, we create a module loader with necessary imports below -IMPORT_EXTERNAL_DIRS = """ -for import_dir in [os.path.join(os.path.expanduser('~/lightwood_modules'), lightwood_version.replace('.', '_')), os.path.join('/etc/lightwood_modules', lightwood_version.replace('.', '_'))]: - if os.path.exists(import_dir) and os.access(import_dir, os.R_OK): - for file_name in list(os.walk(import_dir))[0][2]: - if file_name[-3:] != '.py': - continue - mod_name = file_name[:-3] - loader = importlib.machinery.SourceFileLoader(mod_name, - os.path.join(import_dir, file_name)) - module = ModuleType(loader.name) - loader.exec_module(module) - sys.modules[mod_name] = module - exec(f'import {mod_name}') -""" # noqa - -IMPORTS = """ -import lightwood -from lightwood import __version__ as lightwood_version -from lightwood.analysis import * -from lightwood.api import * -from lightwood.data import * -from lightwood.encoder import * -from lightwood.ensemble import * -from lightwood.helpers.device import * -from lightwood.helpers.general import * -from lightwood.helpers.ts import * -from lightwood.helpers.log import * -from lightwood.helpers.numeric import * -from lightwood.helpers.parallelism import * -from lightwood.helpers.seed import * -from lightwood.helpers.text import * -from lightwood.helpers.torch import * -from lightwood.mixer import * - -from dataprep_ml.insights import statistical_analysis -from dataprep_ml.cleaners import cleaner -from dataprep_ml.splitters import splitter -from dataprep_ml.imputers import * - -import pandas as pd -from typing import Dict, List, Union, Optional -import os -from types import ModuleType -import importlib.machinery -import sys -import time -""" +import lightwood.encoder def lookup_encoder( @@ -145,6 +94,10 @@ def lookup_encoder( # Time-series representations require more advanced flags if tss.is_timeseries: gby = tss.group_by if tss.group_by is not None else [] + + if tss.order_by in gby: + raise Exception('The `order_by` column cannot be used to `group_by` simultaneously!') + if col_name == tss.order_by: encoder_dict["module"] = "ArrayEncoder" encoder_dict["args"]["original_type"] = f'"{tss.target_type}"' @@ -525,7 +478,7 @@ def _populate_implicit_field( json_ai.__setattr__(field_name, field) -def _add_implicit_values(json_ai: JsonAI) -> JsonAI: +def add_implicit_values(json_ai: JsonAI) -> JsonAI: """ To enable brevity in writing, auto-generate the "unspecified/missing" details required in the ML pipeline. @@ -536,7 +489,6 @@ def _add_implicit_values(json_ai: JsonAI) -> JsonAI: problem_definition = json_ai.problem_definition tss = problem_definition.timeseries_settings is_ts = tss.is_timeseries - # tsa_val = "self.ts_analysis" if is_ts else None # TODO: remove mixers = json_ai.model['args']['submodels'] # Add implicit ensemble arguments @@ -649,10 +601,20 @@ def _add_implicit_values(json_ai: JsonAI) -> JsonAI: # enforce fit_on_all if this mixer is specified problem_definition.fit_on_all = True + # encoder checks for name in json_ai.encoders: if name not in json_ai.dependency_dict: json_ai.dependency_dict[name] = [] + # filter arguments for included encoders (custom encoders will skip the check) + for col, enc_dict in json_ai.encoders.items(): + filtered_kwargs = {} + if hasattr(lightwood.encoder, enc_dict['module']): + encoder_cls = getattr(lightwood.encoder, enc_dict['module']) + for k, v in enc_dict['args'].items(): + _add_cls_kwarg(encoder_cls, filtered_kwargs, k, v) + json_ai.encoders[col]['args'] = filtered_kwargs + # Add "hidden" fields hidden_fields = { "cleaner": { @@ -771,600 +733,6 @@ def _add_implicit_values(json_ai: JsonAI) -> JsonAI: return json_ai -def code_from_json_ai(json_ai: JsonAI) -> str: - """ - Generates a custom ``PredictorInterface`` given the specifications from ``JsonAI`` object. - - :param json_ai: ``JsonAI`` object with fully specified parameters - - :returns: Automated syntax of the ``PredictorInterface`` object. - """ - json_ai = deepcopy(json_ai) - # ----------------- # - # Fill in any missing values - json_ai = _add_implicit_values(json_ai) - - # ----------------- # - - # Instantiate data types - dtype_dict = {} - - for k in json_ai.dtype_dict: - if json_ai.dtype_dict[k] not in (dtype.invalid, dtype.empty): - dtype_dict[k] = json_ai.dtype_dict[k] - - # Populate imputers - imputer_dict = {} - if json_ai.imputers: - for imputer in json_ai.imputers: - imputer_dict[imputer['args']['target'].replace('\'', '').replace('\"', '')] = call(imputer) - json_ai.imputers = imputer_dict - imputers = inline_dict(json_ai.imputers) - - # Populate encoders - encoder_dict = {} - for col_name, encoder in json_ai.encoders.items(): - encoder_dict[col_name] = call(encoder) - - # Populate time-series specific details - # TODO: consider moving this to a `JsonAI override` phase - tss = json_ai.problem_definition.timeseries_settings - if tss.is_timeseries: - if tss.use_previous_target: - col_name = f"__mdb_ts_previous_{json_ai.problem_definition.target}" - target_type = json_ai.dtype_dict[json_ai.problem_definition.target] - json_ai.problem_definition.timeseries_settings.target_type = target_type - encoder_dict[col_name] = call( - lookup_encoder( - target_type, - col_name, - False, - json_ai.problem_definition, - False, - None, - ) - ) - - dtype_dict[col_name] = target_type - # @TODO: Is populating the json_ai at this stage even necessary? - json_ai.encoders[col_name] = encoder_dict[col_name] - json_ai.dtype_dict[col_name] = target_type - json_ai.dependency_dict[col_name] = [] - - # ----------------- # - - input_cols = [x.replace("'", "\\'").replace('"', '\\"') for x in json_ai.encoders - if x != json_ai.problem_definition.target] - input_cols = ",".join([f"""'{name}'""" for name in input_cols]) - - # ----------------- # - # Time-series specific code blocks - # ----------------- # - - ts_transform_code = "" - ts_analyze_code = None - ts_encoder_code = "" - if json_ai.timeseries_transformer is not None: - ts_transform_code = f""" -log.info('Transforming timeseries data') -data = {call(json_ai.timeseries_transformer)} -""" - ts_analyze_code = f""" -self.ts_analysis = {call(json_ai.timeseries_analyzer)} -""" - # @TODO: set these kwargs/properties in the json ai construction (if possible) - if json_ai.timeseries_analyzer is not None: - ts_encoder_code = """ -if encoder.is_timeseries_encoder: - kwargs['ts_analysis'] = self.ts_analysis -""" - - if json_ai.problem_definition.timeseries_settings.is_timeseries: - ts_target_code = """ -if encoder.is_target: - encoder.normalizers = self.ts_analysis['target_normalizers'] - encoder.group_combinations = self.ts_analysis['group_combinations'] -""" - else: - ts_target_code = "" - - # ----------------- # - # Statistical Analysis Body - # ----------------- # - - analyze_data_body = f""" -self.statistical_analysis = statistical_analysis(data, - self.dtype_dict, - self.problem_definition.to_dict(), - {json_ai.identifiers}) - -# Instantiate post-training evaluation -self.analysis_blocks = [{', '.join([call(block) for block in json_ai.analysis_blocks])}] - """ - - analyze_data_body = align(analyze_data_body, 2) - - # ----------------- # - # Pre-processing Body - # ----------------- # - - clean_body = f""" -log.info('Cleaning the data') -self.imputers = {imputers} -data = {call(json_ai.cleaner)} - -# Time-series blocks -{ts_transform_code} -""" - - clean_body += '\nreturn data' - - clean_body = align(clean_body, 2) - - # ----------------- # - # Train-Test Splitter Body - # ----------------- # - - split_body = f""" -log.info("Splitting the data into train/test") -train_test_data = {call(json_ai.splitter)} - -return train_test_data - """ - - split_body = align(split_body, 2) - - # ----------------- # - # Prepare features Body - # ----------------- # - - prepare_body = """ -self.mode = 'train' - -if self.statistical_analysis is None: - raise Exception("Please run analyze_data first") -""" - if ts_analyze_code is not None: - prepare_body += f""" -if self.mode != 'predict': - {align(ts_analyze_code, 1)} -""" - - prepare_body += f""" -# Column to encoder mapping -self.encoders = {inline_dict(encoder_dict)} - -# Prepare the training + dev data -concatenated_train_dev = pd.concat([data['train'], data['dev']]) - -prepped_encoders = {{}} - -# Prepare input encoders -parallel_encoding = parallel_encoding_check(data['train'], self.encoders) - -if parallel_encoding: - log.debug('Preparing in parallel...') - for col_name, encoder in self.encoders.items(): - if col_name != self.target and not encoder.is_trainable_encoder: - prepped_encoders[col_name] = (encoder, concatenated_train_dev[col_name], 'prepare') - prepped_encoders = mut_method_call(prepped_encoders) - -else: - log.debug('Preparing sequentially...') - for col_name, encoder in self.encoders.items(): - if col_name != self.target and not encoder.is_trainable_encoder: - log.debug(f'Preparing encoder for {{col_name}}...') - encoder.prepare(concatenated_train_dev[col_name]) - prepped_encoders[col_name] = encoder - -# Store encoders -for col_name, encoder in prepped_encoders.items(): - self.encoders[col_name] = encoder - -# Prepare the target -if self.target not in prepped_encoders: - if self.encoders[self.target].is_trainable_encoder: - self.encoders[self.target].prepare(data['train'][self.target], data['dev'][self.target]) - else: - self.encoders[self.target].prepare(pd.concat([data['train'], data['dev']])[self.target]) - -# Prepare any non-target encoders that are learned -for col_name, encoder in self.encoders.items(): - if col_name != self.target and encoder.is_trainable_encoder: - priming_data = pd.concat([data['train'], data['dev']]) - kwargs = {{}} - if self.dependencies[col_name]: - kwargs['dependency_data'] = {{}} - for col in self.dependencies[col_name]: - kwargs['dependency_data'][col] = {{ - 'original_type': self.dtype_dict[col], - 'data': priming_data[col] - }} - {align(ts_encoder_code, 3)} - - # If an encoder representation requires the target, provide priming data - if hasattr(encoder, 'uses_target'): - kwargs['encoded_target_values'] = self.encoders[self.target].encode(priming_data[self.target]) - - encoder.prepare(data['train'][col_name], data['dev'][col_name], **kwargs) - - {align(ts_target_code, 1)} -""" - prepare_body = align(prepare_body, 2) - - # ----------------- # - # Featurize Data Body - # ----------------- # - - feature_body = f""" -log.info('Featurizing the data') - -tss = self.problem_definition.timeseries_settings - -feature_data = dict() -for key, data in split_data.items(): - if key != 'stratified_on': - - # compute and store two splits - full and filtered (useful for time series post-train analysis) - if key not in self.feature_cache: - featurized_split = EncodedDs(self.encoders, data, self.target) - filtered_subset = EncodedDs(self.encoders, filter_ts(data, tss), self.target) - - for k, s in zip((key, f'{{key}}_filtered'), (featurized_split, filtered_subset)): - self.feature_cache[k] = s - - for k in (key, f'{{key}}_filtered'): - feature_data[k] = self.feature_cache[k] - -return feature_data - -""" # noqa - - feature_body = align(feature_body, 2) - - # ----------------- # - # Fit Mixer Body - # ----------------- # - - fit_body = f""" -self.mode = 'train' - -# --------------- # -# Extract data -# --------------- # -# Extract the featurized data into train/dev/test -encoded_train_data = enc_data['train'] -encoded_dev_data = enc_data['dev'] -encoded_test_data = enc_data['test_filtered'] - -log.info('Training the mixers') - -# --------------- # -# Fit Models -# --------------- # -# Assign list of mixers -self.mixers = [{', '.join([call(x) for x in json_ai.model["args"]["submodels"]])}] - -# Train mixers -trained_mixers = [] -for mixer in self.mixers: - try: - if mixer.trains_once: - self.fit_mixer(mixer, - ConcatedEncodedDs([encoded_train_data, encoded_dev_data]), - encoded_test_data) - else: - self.fit_mixer(mixer, encoded_train_data, encoded_dev_data) - trained_mixers.append(mixer) - except Exception as e: - log.warning(f'Exception: {{e}} when training mixer: {{mixer}}') - if {json_ai.problem_definition.strict_mode} and mixer.stable: - raise e - -# Update mixers to trained versions -if not trained_mixers: - raise Exception('No mixers could be trained! Please verify your problem definition or JsonAI model representation.') -self.mixers = trained_mixers - -# --------------- # -# Create Ensembles -# --------------- # -log.info('Ensembling the mixer') -# Create an ensemble of mixers to identify best performing model -# Dirty hack -self.ensemble = {call(json_ai.model)} -self.supports_proba = self.ensemble.supports_proba -""" - fit_body = align(fit_body, 2) - - # ----------------- # - # Analyze Ensemble Body - # ----------------- # - - analyze_ensemble = f""" - -# --------------- # -# Extract data -# --------------- # -# Extract the featurized data into train/dev/test -encoded_train_data = enc_data['train'] -encoded_dev_data = enc_data['dev'] -encoded_test_data = enc_data['test'] - -# --------------- # -# Analyze Ensembles -# --------------- # -log.info('Analyzing the ensemble of mixers') -self.model_analysis, self.runtime_analyzer = {call(json_ai.analyzer)} -""" - analyze_ensemble = align(analyze_ensemble, 2) - - # ----------------- # - # Adjust Ensemble Body - # ----------------- # - - adjust_body = f""" -self.mode = 'train' - -# --------------- # -# Prepare data -# --------------- # -if dev_data is None: - data = train_data - split = splitter( - data=data, - pct_train=0.8, - pct_dev=0.2, - pct_test=0, - tss=self.problem_definition.timeseries_settings.to_dict(), - seed=self.problem_definition.seed_nr, - target=self.target, - dtype_dict=self.dtype_dict) - train_data = split['train'] - dev_data = split['dev'] - -if adjust_args is None or not adjust_args.get('learn_call'): - train_data = self.preprocess(train_data) - dev_data = self.preprocess(dev_data) - -dev_data = EncodedDs(self.encoders, dev_data, self.target) -train_data = EncodedDs(self.encoders, train_data, self.target) - -# --------------- # -# Update/Adjust Mixers -# --------------- # -log.info('Updating the mixers') - -for mixer in self.mixers: - mixer.partial_fit(train_data, dev_data, adjust_args) -""" # noqa - - adjust_body = align(adjust_body, 2) - - # ----------------- # - # Learn Body - # ----------------- # - - learn_body = """ -self.mode = 'train' -n_phases = 8 if self.problem_definition.fit_on_all else 7 - -# Perform stats analysis -log.info(f'[Learn phase 1/{n_phases}] - Statistical analysis') -self.analyze_data(data) - -# Pre-process the data -log.info(f'[Learn phase 2/{n_phases}] - Data preprocessing') -data = self.preprocess(data) - -# Create train/test (dev) split -log.info(f'[Learn phase 3/{n_phases}] - Data splitting') -train_dev_test = self.split(data) - -# Prepare encoders -log.info(f'[Learn phase 4/{n_phases}] - Preparing encoders') -self.prepare(train_dev_test) - -# Create feature vectors from data -log.info(f'[Learn phase 5/{n_phases}] - Feature generation') -enc_train_test = self.featurize(train_dev_test) - -# Prepare mixers -log.info(f'[Learn phase 6/{n_phases}] - Mixer training') -if not self.problem_definition.embedding_only: - self.fit(enc_train_test) -else: - self.mixers = [] - self.ensemble = Embedder(self.target, mixers=list(), data=enc_train_test['train']) - self.supports_proba = self.ensemble.supports_proba - -# Analyze the ensemble -log.info(f'[Learn phase 7/{n_phases}] - Ensemble analysis') -self.analyze_ensemble(enc_train_test) - -# ------------------------ # -# Enable model partial fit AFTER it is trained and evaluated for performance with the appropriate train/dev/test splits. -# This assumes the predictor could continuously evolve, hence including reserved testing data may improve predictions. -# SET `json_ai.problem_definition.fit_on_all=False` TO TURN THIS BLOCK OFF. - -# Update the mixers with partial fit -if self.problem_definition.fit_on_all and all([not m.trains_once for m in self.mixers]): - log.info(f'[Learn phase 8/{n_phases}] - Adjustment on validation requested') - self.adjust(enc_train_test["test"].data_frame, ConcatedEncodedDs([enc_train_test["train"], - enc_train_test["dev"]]).data_frame, - adjust_args={'learn_call': True}) - -self.feature_cache = dict() # empty feature cache to avoid large predictor objects -""" - learn_body = align(learn_body, 2) - # ----------------- # - # Predict Body - # ----------------- # - - predict_body = f""" -self.mode = 'predict' -n_phases = 3 if self.pred_args.all_mixers else 4 - -if len(data) == 0: - raise Exception("Empty input, aborting prediction. Please try again with some input data.") - -self.pred_args = PredictionArguments.from_dict(args) - -log.info(f'[Predict phase 1/{{n_phases}}] - Data preprocessing') -if self.problem_definition.ignore_features: - log.info(f'Dropping features: {{self.problem_definition.ignore_features}}') - data = data.drop(columns=self.problem_definition.ignore_features, errors='ignore') -for col in self.input_cols: - if col not in data.columns: - data[col] = [None] * len(data) - -# Pre-process the data -data = self.preprocess(data) - -# Featurize the data -log.info(f'[Predict phase 2/{{n_phases}}] - Feature generation') -encoded_ds = self.featurize({{"predict_data": data}})["predict_data"] -encoded_data = encoded_ds.get_encoded_data(include_target=False) - -log.info(f'[Predict phase 3/{{n_phases}}] - Calling ensemble') -if self.pred_args.return_embedding: - embedder = Embedder(self.target, mixers=list(), data=encoded_ds) - df = embedder(encoded_ds, args=self.pred_args) -else: - df = self.ensemble(encoded_ds, args=self.pred_args) - -if not(any( - [self.pred_args.all_mixers, - self.pred_args.return_embedding, - self.problem_definition.embedding_only] - )): - log.info(f'[Predict phase 4/{{n_phases}}] - Analyzing output') - df, global_insights = {call(json_ai.explainer)} - self.global_insights = {{**self.global_insights, **global_insights}} - -self.feature_cache = dict() # empty feature cache to avoid large predictor objects - -return df -""" - - predict_body = align(predict_body, 2) - - predictor_code = f""" -{IMPORTS} -{IMPORT_EXTERNAL_DIRS} - -class Predictor(PredictorInterface): - target: str - mixers: List[BaseMixer] - encoders: Dict[str, BaseEncoder] - ensemble: BaseEnsemble - mode: str - - def __init__(self): - seed({json_ai.problem_definition.seed_nr}) - self.target = '{json_ai.problem_definition.target}' - self.mode = 'inactive' - self.problem_definition = ProblemDefinition.from_dict({json_ai.problem_definition.to_dict()}) - self.accuracy_functions = {json_ai.accuracy_functions} - self.identifiers = {json_ai.identifiers} - self.dtype_dict = {inline_dict(dtype_dict)} - self.lightwood_version = '{lightwood_version}' - self.pred_args = PredictionArguments() - - # Any feature-column dependencies - self.dependencies = {inline_dict(json_ai.dependency_dict)} - - self.input_cols = [{input_cols}] - - # Initial stats analysis - self.statistical_analysis = None - self.ts_analysis = None - self.runtime_log = dict() - self.global_insights = dict() - - # Feature cache - self.feature_cache = dict() - - @timed - def analyze_data(self, data: pd.DataFrame) -> None: - # Perform a statistical analysis on the unprocessed data -{analyze_data_body} - - @timed - def preprocess(self, data: pd.DataFrame) -> pd.DataFrame: - # Preprocess and clean data -{clean_body} - - @timed - def split(self, data: pd.DataFrame) -> Dict[str, pd.DataFrame]: - # Split the data into training/testing splits -{split_body} - - @timed - def prepare(self, data: Dict[str, pd.DataFrame]) -> None: - # Prepare encoders to featurize data -{prepare_body} - - @timed - def featurize(self, split_data: Dict[str, pd.DataFrame]): - # Featurize data into numerical representations for models -{feature_body} - - @timed - def fit(self, enc_data: Dict[str, pd.DataFrame]) -> None: - # Fit predictors to estimate target -{fit_body} - - @timed - def fit_mixer(self, mixer, encoded_train_data, encoded_dev_data) -> None: - mixer.fit(encoded_train_data, encoded_dev_data) - - @timed - def analyze_ensemble(self, enc_data: Dict[str, pd.DataFrame]) -> None: - # Evaluate quality of fit for the ensemble of mixers -{analyze_ensemble} - - @timed - def learn(self, data: pd.DataFrame) -> None: - if self.problem_definition.ignore_features: - log.info(f'Dropping features: {{self.problem_definition.ignore_features}}') - data = data.drop(columns=self.problem_definition.ignore_features, errors='ignore') -{learn_body} - - @timed - def adjust(self, train_data: Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame], - dev_data: Optional[Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame]] = None, - adjust_args: Optional[dict] = None) -> None: - # Update mixers with new information -{adjust_body} - - @timed - def predict(self, data: pd.DataFrame, args: Dict = {{}}) -> pd.DataFrame: -{predict_body} -""" - - try: - import black - except Exception: - black = None - - if black is not None: - try: - formatted_predictor_code = black.format_str(predictor_code, mode=black.FileMode()) - - if type(predictor_from_code(formatted_predictor_code)).__name__ == 'Predictor': - predictor_code = formatted_predictor_code - else: - log.info('Black formatter output is invalid, predictor code might be a bit ugly') - - except Exception: - log.info('Black formatter failed to run, predictor code might be a bit ugly') - else: - log.info('Unable to import black formatter, predictor code might be a bit ugly.') - - return predictor_code - - def validate_json_ai(json_ai: JsonAI) -> bool: """ Checks the validity of a ``JsonAI`` object diff --git a/lightwood/data/timeseries_transform.py b/lightwood/data/timeseries_transform.py index 135447210..ea6f31c81 100644 --- a/lightwood/data/timeseries_transform.py +++ b/lightwood/data/timeseries_transform.py @@ -213,7 +213,7 @@ def _ts_infer_next_row(df: pd.DataFrame, ob: str) -> pd.DataFrame: delta = 1 last_row[ob] += delta - new_df = df.append(last_row) + new_df = pd.concat([df, last_row], ignore_index=True) new_df.index = pd.DatetimeIndex(new_index) return new_df diff --git a/lightwood/helpers/codegen.py b/lightwood/helpers/codegen.py new file mode 100644 index 000000000..6e2070fec --- /dev/null +++ b/lightwood/helpers/codegen.py @@ -0,0 +1,658 @@ +import os +import sys +import time +import string +import random +import tempfile +import importlib + +from copy import deepcopy +from types import ModuleType + +from type_infer.dtype import dtype + +from lightwood.helpers.log import log +from lightwood.api.types import JsonAI +from lightwood.api.json_ai import add_implicit_values, lookup_encoder +from lightwood.helpers.constants import IMPORTS, IMPORT_EXTERNAL_DIRS +from lightwood.helpers.templating import call, inline_dict, align +from lightwood.__about__ import __version__ as lightwood_version + + +def code_from_json_ai(json_ai: JsonAI) -> str: + """ + Generates a custom ``PredictorInterface`` given the specifications from ``JsonAI`` object. + + :param json_ai: ``JsonAI`` object with fully specified parameters + + :returns: Automated syntax of the ``PredictorInterface`` object. + """ + json_ai = deepcopy(json_ai) + # ----------------- # + # Fill in any missing values + json_ai = add_implicit_values(json_ai) + + # ----------------- # + + # Instantiate data types + dtype_dict = {} + + for k in json_ai.dtype_dict: + if json_ai.dtype_dict[k] not in (dtype.invalid, dtype.empty): + dtype_dict[k] = json_ai.dtype_dict[k] + + # Populate imputers + imputer_dict = {} + if json_ai.imputers: + for imputer in json_ai.imputers: + imputer_dict[imputer['args']['target'].replace('\'', '').replace('\"', '')] = call(imputer) + json_ai.imputers = imputer_dict + imputers = inline_dict(json_ai.imputers) + + # Populate encoders + encoder_dict = {} + for col_name, encoder in json_ai.encoders.items(): + encoder_dict[col_name] = call(encoder) + + # Populate time-series specific details + # TODO: consider moving this to a `JsonAI override` phase + tss = json_ai.problem_definition.timeseries_settings + if tss.is_timeseries: + if tss.use_previous_target: + col_name = f"__mdb_ts_previous_{json_ai.problem_definition.target}" + target_type = json_ai.dtype_dict[json_ai.problem_definition.target] + json_ai.problem_definition.timeseries_settings.target_type = target_type + encoder_dict[col_name] = call( + lookup_encoder( + target_type, + col_name, + False, + json_ai.problem_definition, + False, + None, + ) + ) + + dtype_dict[col_name] = target_type + # @TODO: Is populating the json_ai at this stage even necessary? + json_ai.encoders[col_name] = encoder_dict[col_name] + json_ai.dtype_dict[col_name] = target_type + json_ai.dependency_dict[col_name] = [] + + # ----------------- # + + input_cols = [x.replace("'", "\\'").replace('"', '\\"') for x in json_ai.encoders + if x != json_ai.problem_definition.target] + if len(input_cols) < 1: + raise Exception('There are no valid input features. Please check your data before trying again.') + input_cols = ",".join([f"""'{name}'""" for name in input_cols]) + + # ----------------- # + # Time-series specific code blocks + # ----------------- # + + ts_transform_code = "" + ts_analyze_code = None + ts_encoder_code = "" + if json_ai.timeseries_transformer is not None: + ts_transform_code = f""" +log.info('Transforming timeseries data') +data = {call(json_ai.timeseries_transformer)} +""" + ts_analyze_code = f""" +self.ts_analysis = {call(json_ai.timeseries_analyzer)} +""" + # @TODO: set these kwargs/properties in the json ai construction (if possible) + if json_ai.timeseries_analyzer is not None: + ts_encoder_code = """ +if encoder.is_timeseries_encoder: + kwargs['ts_analysis'] = self.ts_analysis +""" + + if json_ai.problem_definition.timeseries_settings.is_timeseries: + ts_target_code = """ +if encoder.is_target: + encoder.normalizers = self.ts_analysis['target_normalizers'] + encoder.group_combinations = self.ts_analysis['group_combinations'] +""" + else: + ts_target_code = "" + + # ----------------- # + # Statistical Analysis Body + # ----------------- # + + analyze_data_body = f""" +self.statistical_analysis = statistical_analysis(data, + self.dtype_dict, + self.problem_definition.to_dict(), + {json_ai.identifiers}) + +# Instantiate post-training evaluation +self.analysis_blocks = [{', '.join([call(block) for block in json_ai.analysis_blocks])}] + """ + + analyze_data_body = align(analyze_data_body, 2) + + # ----------------- # + # Pre-processing Body + # ----------------- # + + clean_body = f""" +log.info('Cleaning the data') +self.imputers = {imputers} +data = {call(json_ai.cleaner)} + +# Time-series blocks +{ts_transform_code} +""" + + clean_body += '\nreturn data' + + clean_body = align(clean_body, 2) + + # ----------------- # + # Train-Test Splitter Body + # ----------------- # + + split_body = f""" +log.info("Splitting the data into train/test") +train_test_data = {call(json_ai.splitter)} + +return train_test_data + """ + + split_body = align(split_body, 2) + + # ----------------- # + # Prepare features Body + # ----------------- # + + prepare_body = """ +self.mode = 'train' + +if self.statistical_analysis is None: + raise Exception("Please run analyze_data first") +""" + if ts_analyze_code is not None: + prepare_body += f""" +if self.mode != 'predict': + {align(ts_analyze_code, 1)} +""" + + prepare_body += f""" +# Column to encoder mapping +self.encoders = {inline_dict(encoder_dict)} + +# Prepare the training + dev data +concatenated_train_dev = pd.concat([data['train'], data['dev']]) + +prepped_encoders = {{}} + +# Prepare input encoders +parallel_encoding = parallel_encoding_check(data['train'], self.encoders) + +if parallel_encoding: + log.debug('Preparing in parallel...') + for col_name, encoder in self.encoders.items(): + if col_name != self.target and not encoder.is_trainable_encoder: + prepped_encoders[col_name] = (encoder, concatenated_train_dev[col_name], 'prepare') + prepped_encoders = mut_method_call(prepped_encoders) + +else: + log.debug('Preparing sequentially...') + for col_name, encoder in self.encoders.items(): + if col_name != self.target and not encoder.is_trainable_encoder: + log.debug(f'Preparing encoder for {{col_name}}...') + encoder.prepare(concatenated_train_dev[col_name]) + prepped_encoders[col_name] = encoder + +# Store encoders +for col_name, encoder in prepped_encoders.items(): + self.encoders[col_name] = encoder + +# Prepare the target +if self.target not in prepped_encoders: + if self.encoders[self.target].is_trainable_encoder: + self.encoders[self.target].prepare(data['train'][self.target], data['dev'][self.target]) + else: + self.encoders[self.target].prepare(pd.concat([data['train'], data['dev']])[self.target]) + +# Prepare any non-target encoders that are learned +for col_name, encoder in self.encoders.items(): + if col_name != self.target and encoder.is_trainable_encoder: + priming_data = pd.concat([data['train'], data['dev']]) + kwargs = {{}} + if self.dependencies[col_name]: + kwargs['dependency_data'] = {{}} + for col in self.dependencies[col_name]: + kwargs['dependency_data'][col] = {{ + 'original_type': self.dtype_dict[col], + 'data': priming_data[col] + }} + {align(ts_encoder_code, 3)} + + # If an encoder representation requires the target, provide priming data + if hasattr(encoder, 'uses_target'): + kwargs['encoded_target_values'] = self.encoders[self.target].encode(priming_data[self.target]) + + encoder.prepare(data['train'][col_name], data['dev'][col_name], **kwargs) + + {align(ts_target_code, 1)} +""" + prepare_body = align(prepare_body, 2) + + # ----------------- # + # Featurize Data Body + # ----------------- # + + feature_body = f""" +log.info('Featurizing the data') + +tss = self.problem_definition.timeseries_settings + +feature_data = dict() +for key, data in split_data.items(): + if key != 'stratified_on': + + # compute and store two splits - full and filtered (useful for time series post-train analysis) + if key not in self.feature_cache: + featurized_split = EncodedDs(self.encoders, data, self.target) + filtered_subset = EncodedDs(self.encoders, filter_ts(data, tss), self.target) + + for k, s in zip((key, f'{{key}}_filtered'), (featurized_split, filtered_subset)): + self.feature_cache[k] = s + + for k in (key, f'{{key}}_filtered'): + feature_data[k] = self.feature_cache[k] + +return feature_data + +""" # noqa + + feature_body = align(feature_body, 2) + + # ----------------- # + # Fit Mixer Body + # ----------------- # + + fit_body = f""" +self.mode = 'train' + +# --------------- # +# Extract data +# --------------- # +# Extract the featurized data into train/dev/test +encoded_train_data = enc_data['train'] +encoded_dev_data = enc_data['dev'] +encoded_test_data = enc_data['test_filtered'] + +log.info('Training the mixers') + +# --------------- # +# Fit Models +# --------------- # +# Assign list of mixers +self.mixers = [{', '.join([call(x) for x in json_ai.model["args"]["submodels"]])}] + +# Train mixers +trained_mixers = [] +for mixer in self.mixers: + try: + if mixer.trains_once: + self.fit_mixer(mixer, + ConcatedEncodedDs([encoded_train_data, encoded_dev_data]), + encoded_test_data) + else: + self.fit_mixer(mixer, encoded_train_data, encoded_dev_data) + trained_mixers.append(mixer) + except Exception as e: + log.warning(f'Exception: {{e}} when training mixer: {{mixer}}') + if {json_ai.problem_definition.strict_mode} and mixer.stable: + raise e + +# Update mixers to trained versions +if not trained_mixers: + raise Exception('No mixers could be trained! Please verify your problem definition or JsonAI model representation.') +self.mixers = trained_mixers + +# --------------- # +# Create Ensembles +# --------------- # +log.info('Ensembling the mixer') +# Create an ensemble of mixers to identify best performing model +# Dirty hack +self.ensemble = {call(json_ai.model)} +self.supports_proba = self.ensemble.supports_proba +""" + fit_body = align(fit_body, 2) + + # ----------------- # + # Analyze Ensemble Body + # ----------------- # + + analyze_ensemble = f""" + +# --------------- # +# Extract data +# --------------- # +# Extract the featurized data into train/dev/test +encoded_train_data = enc_data['train'] +encoded_dev_data = enc_data['dev'] +encoded_test_data = enc_data['test'] + +# --------------- # +# Analyze Ensembles +# --------------- # +log.info('Analyzing the ensemble of mixers') +self.model_analysis, self.runtime_analyzer = {call(json_ai.analyzer)} +""" + analyze_ensemble = align(analyze_ensemble, 2) + + # ----------------- # + # Adjust Ensemble Body + # ----------------- # + + adjust_body = f""" +self.mode = 'train' + +# --------------- # +# Prepare data +# --------------- # +if dev_data is None: + data = train_data + split = splitter( + data=data, + pct_train=0.8, + pct_dev=0.2, + pct_test=0, + tss=self.problem_definition.timeseries_settings.to_dict(), + seed=self.problem_definition.seed_nr, + target=self.target, + dtype_dict=self.dtype_dict) + train_data = split['train'] + dev_data = split['dev'] + +if adjust_args is None or not adjust_args.get('learn_call'): + train_data = self.preprocess(train_data) + dev_data = self.preprocess(dev_data) + +dev_data = EncodedDs(self.encoders, dev_data, self.target) +train_data = EncodedDs(self.encoders, train_data, self.target) + +# --------------- # +# Update/Adjust Mixers +# --------------- # +log.info('Updating the mixers') + +for mixer in self.mixers: + mixer.partial_fit(train_data, dev_data, adjust_args) +""" # noqa + + adjust_body = align(adjust_body, 2) + + # ----------------- # + # Learn Body + # ----------------- # + + learn_body = """ +self.mode = 'train' +n_phases = 8 if self.problem_definition.fit_on_all else 7 + +# Perform stats analysis +log.info(f'[Learn phase 1/{n_phases}] - Statistical analysis') +self.analyze_data(data) + +# Pre-process the data +log.info(f'[Learn phase 2/{n_phases}] - Data preprocessing') +data = self.preprocess(data) + +# Create train/test (dev) split +log.info(f'[Learn phase 3/{n_phases}] - Data splitting') +train_dev_test = self.split(data) + +# Prepare encoders +log.info(f'[Learn phase 4/{n_phases}] - Preparing encoders') +self.prepare(train_dev_test) + +# Create feature vectors from data +log.info(f'[Learn phase 5/{n_phases}] - Feature generation') +enc_train_test = self.featurize(train_dev_test) + +# Prepare mixers +log.info(f'[Learn phase 6/{n_phases}] - Mixer training') +if not self.problem_definition.embedding_only: + self.fit(enc_train_test) +else: + self.mixers = [] + self.ensemble = Embedder(self.target, mixers=list(), data=enc_train_test['train']) + self.supports_proba = self.ensemble.supports_proba + +# Analyze the ensemble +log.info(f'[Learn phase 7/{n_phases}] - Ensemble analysis') +self.analyze_ensemble(enc_train_test) + +# ------------------------ # +# Enable model partial fit AFTER it is trained and evaluated for performance with the appropriate train/dev/test splits. +# This assumes the predictor could continuously evolve, hence including reserved testing data may improve predictions. +# SET `json_ai.problem_definition.fit_on_all=False` TO TURN THIS BLOCK OFF. + +# Update the mixers with partial fit +if self.problem_definition.fit_on_all and all([not m.trains_once for m in self.mixers]): + log.info(f'[Learn phase 8/{n_phases}] - Adjustment on validation requested') + self.adjust(enc_train_test["test"].data_frame, ConcatedEncodedDs([enc_train_test["train"], + enc_train_test["dev"]]).data_frame, + adjust_args={'learn_call': True}) + +self.feature_cache = dict() # empty feature cache to avoid large predictor objects +""" + learn_body = align(learn_body, 2) + # ----------------- # + # Predict Body + # ----------------- # + + predict_body = f""" +self.mode = 'predict' +n_phases = 3 if self.pred_args.all_mixers else 4 + +if len(data) == 0: + raise Exception("Empty input, aborting prediction. Please try again with some input data.") + +self.pred_args = PredictionArguments.from_dict(args) + +log.info(f'[Predict phase 1/{{n_phases}}] - Data preprocessing') +if self.problem_definition.ignore_features: + log.info(f'Dropping features: {{self.problem_definition.ignore_features}}') + data = data.drop(columns=self.problem_definition.ignore_features, errors='ignore') +for col in self.input_cols: + if col not in data.columns: + data[col] = [None] * len(data) + +# Pre-process the data +data = self.preprocess(data) + +# Featurize the data +log.info(f'[Predict phase 2/{{n_phases}}] - Feature generation') +encoded_ds = self.featurize({{"predict_data": data}})["predict_data"] +encoded_data = encoded_ds.get_encoded_data(include_target=False) + +log.info(f'[Predict phase 3/{{n_phases}}] - Calling ensemble') + +@timed +def _timed_call(encoded_ds): + if self.pred_args.return_embedding: + embedder = Embedder(self.target, mixers=list(), data=encoded_ds) + df = embedder(encoded_ds, args=self.pred_args) + else: + df = self.ensemble(encoded_ds, args=self.pred_args) + return df + +df = _timed_call(encoded_ds) + +if not(any( + [self.pred_args.all_mixers, + self.pred_args.return_embedding, + self.problem_definition.embedding_only] + )): + log.info(f'[Predict phase 4/{{n_phases}}] - Analyzing output') + df, global_insights = {call(json_ai.explainer)} + self.global_insights = {{**self.global_insights, **global_insights}} + +self.feature_cache = dict() # empty feature cache to avoid large predictor objects + +return df +""" + + predict_body = align(predict_body, 2) + + predictor_code = f""" +{IMPORTS} +{IMPORT_EXTERNAL_DIRS} + +class Predictor(PredictorInterface): + target: str + mixers: List[BaseMixer] + encoders: Dict[str, BaseEncoder] + ensemble: BaseEnsemble + mode: str + + def __init__(self): + seed({json_ai.problem_definition.seed_nr}) + self.target = '{json_ai.problem_definition.target}' + self.mode = 'inactive' + self.problem_definition = ProblemDefinition.from_dict({json_ai.problem_definition.to_dict()}) + self.accuracy_functions = {json_ai.accuracy_functions} + self.identifiers = {json_ai.identifiers} + self.dtype_dict = {inline_dict(dtype_dict)} + self.lightwood_version = '{lightwood_version}' + self.pred_args = PredictionArguments() + + # Any feature-column dependencies + self.dependencies = {inline_dict(json_ai.dependency_dict)} + + self.input_cols = [{input_cols}] + + # Initial stats analysis + self.statistical_analysis = None + self.ts_analysis = None + self.runtime_log = dict() + self.global_insights = dict() + + # Feature cache + self.feature_cache = dict() + + @timed_predictor + def analyze_data(self, data: pd.DataFrame) -> None: + # Perform a statistical analysis on the unprocessed data +{analyze_data_body} + + @timed_predictor + def preprocess(self, data: pd.DataFrame) -> pd.DataFrame: + # Preprocess and clean data +{clean_body} + + @timed_predictor + def split(self, data: pd.DataFrame) -> Dict[str, pd.DataFrame]: + # Split the data into training/testing splits +{split_body} + + @timed_predictor + def prepare(self, data: Dict[str, pd.DataFrame]) -> None: + # Prepare encoders to featurize data +{prepare_body} + + @timed_predictor + def featurize(self, split_data: Dict[str, pd.DataFrame]): + # Featurize data into numerical representations for models +{feature_body} + + @timed_predictor + def fit(self, enc_data: Dict[str, pd.DataFrame]) -> None: + # Fit predictors to estimate target +{fit_body} + + @timed_predictor + def fit_mixer(self, mixer, encoded_train_data, encoded_dev_data) -> None: + mixer.fit(encoded_train_data, encoded_dev_data) + + @timed_predictor + def analyze_ensemble(self, enc_data: Dict[str, pd.DataFrame]) -> None: + # Evaluate quality of fit for the ensemble of mixers +{analyze_ensemble} + + @timed_predictor + def learn(self, data: pd.DataFrame) -> None: + if self.problem_definition.ignore_features: + log.info(f'Dropping features: {{self.problem_definition.ignore_features}}') + data = data.drop(columns=self.problem_definition.ignore_features, errors='ignore') +{learn_body} + + @timed_predictor + def adjust(self, train_data: Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame], + dev_data: Optional[Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame]] = None, + adjust_args: Optional[dict] = None) -> None: + # Update mixers with new information +{adjust_body} + + @timed_predictor + def predict(self, data: pd.DataFrame, args: Dict = {{}}) -> pd.DataFrame: +{predict_body} +""" + + try: + import black + except Exception: + black = None + + if black is not None: + try: + formatted_predictor_code = black.format_str(predictor_code, mode=black.FileMode()) + + if type(_predictor_from_code(formatted_predictor_code)).__name__ == 'Predictor': + predictor_code = formatted_predictor_code + else: + log.info('Black formatter output is invalid, predictor code might be a bit ugly') + + except Exception: + log.info('Black formatter failed to run, predictor code might be a bit ugly') + else: + log.info('Unable to import black formatter, predictor code might be a bit ugly.') + + return predictor_code + + +def _module_from_code(code: str, module_name: str) -> ModuleType: + """ + Create a python module (containing the generated ``Predictor`` class) from the code. This is both a python object and an associated temporary file on your filesystem + + :param code: The ``Predictor``'s code in text form + :param module_name: The name of the newly created module + + :returns: A python module object + """ # noqa + dirname = tempfile.gettempdir() + filename = os.urandom(24).hex() + str(time.time()).replace('.', '') + '.py' + path = os.path.join(dirname, filename) + if 'LIGHTWOOD_DEV_SAVE_TO' in os.environ: + path = os.environ['LIGHTWOOD_DEV_SAVE_TO'] + + with open(path, 'wb') as fp: + fp.write(code.encode('utf-8')) + spec = importlib.util.spec_from_file_location(module_name, fp.name) + temp_module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = temp_module + spec.loader.exec_module(temp_module) + + return temp_module + + +def _predictor_from_code(code: str): + """ + :param code: The ``Predictor``'s code in text form + + :returns: A lightwood ``Predictor`` object + """ + module_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12)) + module_name += str(time.time()).replace('.', '') + predictor = _module_from_code(code, module_name).Predictor() + return predictor diff --git a/lightwood/helpers/constants.py b/lightwood/helpers/constants.py index 181373801..30d61b5ea 100644 --- a/lightwood/helpers/constants.py +++ b/lightwood/helpers/constants.py @@ -4,3 +4,52 @@ _UNCOMMON_WORD = '__mdb_unknown_cat' _UNCOMMON_TOKEN = 0 + +# For custom modules, we create a module loader with necessary imports below +IMPORT_EXTERNAL_DIRS = """ +for import_dir in [os.path.join(os.path.expanduser('~/lightwood_modules'), lightwood_version.replace('.', '_')), os.path.join('/etc/lightwood_modules', lightwood_version.replace('.', '_'))]: + if os.path.exists(import_dir) and os.access(import_dir, os.R_OK): + for file_name in list(os.walk(import_dir))[0][2]: + if file_name[-3:] != '.py': + continue + mod_name = file_name[:-3] + loader = importlib.machinery.SourceFileLoader(mod_name, + os.path.join(import_dir, file_name)) + module = ModuleType(loader.name) + loader.exec_module(module) + sys.modules[mod_name] = module + exec(f'import {mod_name}') +""" # noqa + +IMPORTS = """ +import lightwood +from lightwood import __version__ as lightwood_version +from lightwood.analysis import * +from lightwood.api import * +from lightwood.data import * +from lightwood.encoder import * +from lightwood.ensemble import * +from lightwood.helpers.device import * +from lightwood.helpers.general import * +from lightwood.helpers.ts import * +from lightwood.helpers.log import * +from lightwood.helpers.numeric import * +from lightwood.helpers.parallelism import * +from lightwood.helpers.seed import * +from lightwood.helpers.text import * +from lightwood.helpers.torch import * +from lightwood.mixer import * + +from dataprep_ml.insights import statistical_analysis +from dataprep_ml.cleaners import cleaner +from dataprep_ml.splitters import splitter +from dataprep_ml.imputers import * + +import pandas as pd +from typing import Dict, List, Union, Optional +import os +from types import ModuleType +import importlib.machinery +import sys +import time +""" diff --git a/lightwood/helpers/general.py b/lightwood/helpers/general.py index d735dc806..3f9827d70 100644 --- a/lightwood/helpers/general.py +++ b/lightwood/helpers/general.py @@ -20,7 +20,7 @@ def is_none(value): if type(value) in (np.ndarray,) and value.size == 0: return True - if type(value) != str and isinstance(value, Iterable) and value == []: + if type(value) != str and isinstance(value, Iterable) and len(value) == 0: return True elif type(value) != str and isinstance(value, Iterable): return False diff --git a/lightwood/helpers/log.py b/lightwood/helpers/log.py index b85c26dd0..7f2890632 100644 --- a/lightwood/helpers/log.py +++ b/lightwood/helpers/log.py @@ -19,7 +19,7 @@ def initialize_log(): return log -def timed(f): +def timed_predictor(f): """ Intended to be called from within lightwood predictor methods. We use `wraps` to pass metadata into debuggers (as in stackoverflow.com/a/27737385) @@ -30,7 +30,22 @@ def wrap(predictor, *args, **kw): result = f(predictor, *args, **kw) te = time() log.debug(f' `{f.__name__}` runtime: {round(te - ts, 2)} seconds') - predictor.runtime_log[(f.__name__, datetime.fromtimestamp(ts))] = round(te - ts, 2) + if hasattr(predictor, 'runtime_log'): + predictor.runtime_log[(f.__name__, datetime.fromtimestamp(ts))] = round(te - ts, 2) + return result + return wrap + + +def timed(f): + """ + Intended to be called from within any lightwood method to log the runtime. + """ + @wraps(f) + def wrap(*args, **kw): + ts = time() + result = f(*args, **kw) + te = time() + log.debug(f' `{f.__name__}` runtime: {round(te - ts, 2)} seconds') return result return wrap diff --git a/lightwood/helpers/ts.py b/lightwood/helpers/ts.py index 49abd134a..cc649436f 100644 --- a/lightwood/helpers/ts.py +++ b/lightwood/helpers/ts.py @@ -51,35 +51,41 @@ def get_delta(df: pd.DataFrame, tss) -> Tuple[Dict, Dict, Dict]: def get_inferred_timestamps(df: pd.DataFrame, col: str, deltas: dict, tss, stat_analysis, time_format='') -> pd.DataFrame: horizon = tss.horizon - if tss.group_by: - gby = [f'group_{g}' for g in tss.group_by] - for (idx, row) in df.iterrows(): - last = [r for r in row[f'order_{col}'] if r == r][-1] # filter out nans (safeguard; it shouldn't happen anyway) + last = np.vstack(df[f'order_{col}'].dropna().values)[:, -1] - if tss.group_by: - try: - series_delta = deltas[tuple(row[gby].tolist())] - except KeyError: - series_delta = deltas['__default'] + if tss.group_by: + gby = [f'group_{g}' for g in tss.group_by] + series_delta = df[gby].apply(lambda x: deltas.get(tuple(x.values.tolist()), + deltas['__default']), axis=1).values + series_delta = series_delta.reshape(-1, 1) + else: + series_delta = np.full_like(df.values[:, 0:1], deltas['__default']) + + last = np.repeat(np.expand_dims(last, axis=1), horizon, axis=1) + lins = np.linspace(0, horizon - 1, num=horizon) + series_delta = np.repeat(series_delta, horizon, axis=1) + timestamps = last + series_delta * lins + + if time_format: + if time_format.lower() == 'infer': + tformat = stat_analysis.ts_stats['order_format'] else: - series_delta = deltas['__default'] - timestamps = [last + t * series_delta for t in range(horizon)] + tformat = time_format - if tss.horizon == 1: - timestamps = timestamps[0] # preserves original input format if horizon == 1 + if tformat: + def _strfts(elt): + return datetime.utcfromtimestamp(elt).strftime(tformat) + timestamps = np.vectorize(_strfts)(timestamps) - if time_format: - if time_format.lower() == 'infer': - tformat = stat_analysis.ts_stats['order_format'] - else: - tformat = time_format + # truncate to horizon + timestamps = timestamps[:, :horizon] - if tformat: - for i, ts in enumerate(timestamps): - timestamps[i] = datetime.utcfromtimestamp(ts).strftime(tformat) + # preserves original input format if horizon == 1 + if tss.horizon == 1: + timestamps = timestamps.squeeze() - df[f'order_{col}'].iloc[idx] = timestamps + df[f'order_{col}'] = timestamps.tolist() return df[f'order_{col}'] diff --git a/lightwood/mixer/gluonts.py b/lightwood/mixer/gluonts.py index 4bb5d2448..37925c7c0 100644 --- a/lightwood/mixer/gluonts.py +++ b/lightwood/mixer/gluonts.py @@ -234,14 +234,20 @@ def _make_initial_ds(self, df=None, phase='predict', groups=None): for col in self.static_features_cat: df[col] = self.static_features_cat_encoders[col].transform(df[col].values.reshape(-1, 1)) + static_features = [] + if self.static_features_real: + static_features.extend(self.static_features_real) + if self.static_features_cat: + static_features.extend(self.static_features_cat) + static_features_df = df[static_features] + ds = PandasDataset.from_long_dataframe( df, target=self.target, - item_id=gby, + item_id=gby[0], freq=freq, timestamp=oby_col_name, - feat_static_real=self.static_features_real if self.static_features_real else [], - feat_static_cat=self.static_features_cat if self.static_features_cat else [], + static_features=static_features_df ) return ds diff --git a/lightwood/mixer/nhits.py b/lightwood/mixer/nhits.py index a443705a7..b88528c16 100644 --- a/lightwood/mixer/nhits.py +++ b/lightwood/mixer/nhits.py @@ -94,7 +94,7 @@ def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: oby_col = self.ts_analysis["tss"].order_by gby = self.ts_analysis["tss"].group_by if self.ts_analysis["tss"].group_by else [] df = deepcopy(cat_ds.data_frame) - Y_df = self._make_initial_df(df, mode='train') + Y_df, _ = self._make_initial_df(df, mode='train') self.group_boundaries = self._set_boundary(Y_df, gby) if gby: n_time = df[gby].value_counts().min() @@ -133,7 +133,8 @@ def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs, args: Optional[dict] = None) -> None: self.hyperparam_search = False - self.fit(train_data, dev_data) # TODO: add support for passing args (e.g. n_epochs) + self.train_args = args.get('trainer_args', {}) if args else {} # NOTE: this replaces the original config + self.fit(train_data, dev_data) self.prepared = True def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs], @@ -148,38 +149,40 @@ def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs], if args.predict_proba: log.warning('This mixer does not output probability estimates') - length = sum(ds.encoded_ds_lengths) if isinstance(ds, ConcatedEncodedDs) else len(ds) - ydf = pd.DataFrame(0, # zero-filled - index=np.arange(length), - columns=['prediction', 'lower', 'upper'], - dtype=object) - - input_df = self._make_initial_df(deepcopy(ds.data_frame)) - ydf['index'] = input_df['index'] - - pred_cols = ['NHITS-median'] - # provided quantile must match one of the training levels, else we default to the largest one of these if args.fixed_confidence is not None and int(args.fixed_confidence * 100) in self.conf_level: level = int(args.fixed_confidence * 100) else: level = max(self.conf_level) - pred_cols.extend([f'NHITS-lo-{level}', f'NHITS-hi-{level}']) target_cols = ['prediction', 'lower', 'upper'] + pred_cols = ['NHITS-median', f'NHITS-lo-{level}', f'NHITS-hi-{level}'] + + input_df, idxs = self._make_initial_df(deepcopy(ds.data_frame)) + length = sum(ds.encoded_ds_lengths) if isinstance(ds, ConcatedEncodedDs) else len(ds) + ydf = pd.DataFrame(0, index=range(length), columns=target_cols, dtype=object) + + # fill with zeroed arrays + zero_array = [0 for _ in range(self.horizon)] for target_col in target_cols: - ydf[target_col] = [[0 for _ in range(self.horizon)] for _ in range(len(ydf))] # zero-filled arrays + ydf[target_col] = [zero_array] * len(ydf) - group_ends = [] - for group in input_df['unique_id'].unique(): - group_ends.append(input_df[input_df['unique_id'] == group]['index'].iloc[-1]) + grouper = input_df.groupby('unique_id') + group_ends = grouper.last()['index'].values fcst = self.model.predict(input_df).reset_index() + fcst['ds'] = fcst.groupby('unique_id').cumcount() + horizons = pd.pivot_table(fcst, values=pred_cols, index='unique_id', columns='ds') + + temp_df = pd.DataFrame(0, # zero-filled + index=range(len(horizons)), + columns=target_cols, + dtype=object) + + for pcol, tcol in zip(pred_cols, target_cols): + temp_df[tcol] = horizons[pcol].values.tolist() - for gidx, group in zip(group_ends, input_df['unique_id'].unique()): - for pred_col, target_col in zip(pred_cols, target_cols): - group_preds = fcst[fcst['unique_id'] == group][pred_col].tolist()[:self.horizon] - idx = ydf[ydf['index'] == gidx].index[0] - ydf.at[idx, target_col] = group_preds + for tcol in target_cols: + ydf[tcol].iloc[group_ends] = temp_df[tcol] ydf['confidence'] = level / 100 return ydf @@ -192,11 +195,12 @@ def _make_initial_df(self, df, mode='inference'): """ # noqa oby_col = self.ts_analysis["tss"].order_by - df = df.sort_values(by=f'__mdb_original_{oby_col}') + # df = df.sort_values(by=f'__mdb_original_{oby_col}') # TODO rm df[f'__mdb_parsed_{oby_col}'] = df.index df = df.reset_index(drop=True) Y_df = pd.DataFrame() + Y_df['_index'] = np.arange(len(df)) Y_df['y'] = df[self.target] Y_df['ds'] = df[f'__mdb_parsed_{oby_col}'] @@ -219,7 +223,8 @@ def _make_initial_df(self, df, mode='inference'): if filtered: Y_df = pd.concat(filtered) - return Y_df + filtered_idxs = Y_df.pop('_index').values + return Y_df, filtered_idxs @staticmethod def _set_boundary(df: pd.DataFrame, gby: list) -> Dict[str, object]: diff --git a/requirements.txt b/requirements.txt index ad9864184..6b81917b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,23 +1,23 @@ -type_infer >=0.0.13 -dataprep_ml >=0.0.13 -mindsdb-evaluator >=0.0.9 +type_infer >=0.0.15 +dataprep_ml >=0.0.18 +mindsdb-evaluator >=0.0.11 numpy -nltk >=3,<3.6 -python-dateutil >=2.8.1 -pandas >=1.1.5, <1.5.0 +nltk >=3.8, <3.9 +python-dateutil >=2.8.2 +pandas >=2.0.0, <2.1.0 schema >=0.6.8 torch >=2.0.0, <2.1 requests >=2.0.0 transformers -optuna >=2.8.0,<2.10.0 +optuna >=3.1.0,<4.0.0 scipy >=1.5.4 psutil >=5.7.0 setuptools >=21.2.1 wheel >=0.32.2 -scikit-learn >=1.0.0, <=1.0.2 +scikit-learn >=1.0.0 dataclasses_json >=0.5.4 dill ==0.3.6 -sktime >=0.14.0,<0.15.0 +sktime >=0.21.0,<0.22.0 statsforecast ==1.4.0 torch_optimizer ==0.1.0 black ==23.3.0 diff --git a/requirements_extra_ts.txt b/requirements_extra_ts.txt index 78cc94804..e37416201 100644 --- a/requirements_extra_ts.txt +++ b/requirements_extra_ts.txt @@ -2,4 +2,4 @@ pystan==2.19.1.1 prophet==1.1 neuralforecast ==1.5.0 mxnet >=1.6.0, <2.0.0 -gluonts >= 0.11.0, <0.12.0 +gluonts >= 0.13.2, <0.14.0 diff --git a/setup.py b/setup.py index d7dec7d84..c4a75b0b3 100644 --- a/setup.py +++ b/setup.py @@ -62,5 +62,5 @@ def remove_requirements(requirements, name, replace=''): "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], - python_requires=">=3.8" + python_requires=">=3.8,<3.12" ) diff --git a/tests/integration/advanced/test_timeseries.py b/tests/integration/advanced/test_timeseries.py index f7ed259e3..412f36732 100644 --- a/tests/integration/advanced/test_timeseries.py +++ b/tests/integration/advanced/test_timeseries.py @@ -46,8 +46,8 @@ def split_arrivals(self, data: pd.DataFrame, grouped: bool) -> (pd.DataFrame, pd for g in data[group].unique(): subframe = data[data[group] == g] length = subframe.shape[0] - train = train.append(subframe[:int(length * train_ratio)]) - test = test.append(subframe[int(length * train_ratio):]) + train = pd.concat([train, subframe[:int(length * train_ratio)]], ignore_index=True) + test = pd.concat([test, subframe[int(length * train_ratio):]], ignore_index=True) else: train = data[:int(data.shape[0] * train_ratio)] test = data[int(data.shape[0] * train_ratio):] @@ -506,7 +506,7 @@ def test_9_ts_dedupe(self): data = pd.read_csv('tests/data/arrivals.csv') data = data[data['Country'].isin(['US', 'Japan'])] target_len = len(data) - data = data.append(data[data['Country'] == 'Japan']).reset_index(drop=True) # force duplication of one series + data = pd.concat([data, data[data['Country'] == 'Japan']], ignore_index=True) # force duplication of one series jai = json_ai_from_problem(data, ProblemDefinition.from_dict({'target': 'Traffic', 'time_aim': 30, 'timeseries_settings': { diff --git a/tests/unit_tests/encoder/audio/test_mfcc.py b/tests/unit_tests/encoder/audio/test_mfcc.py index 401c18a10..370fadaf8 100644 --- a/tests/unit_tests/encoder/audio/test_mfcc.py +++ b/tests/unit_tests/encoder/audio/test_mfcc.py @@ -9,7 +9,7 @@ class TestMFCCEncoder(unittest.TestCase): def test_encode(self): if MFCCEncoder is None: - print('Skipping this test since the system for the encoder work are not installed') + print('Skipping this test since MFCCEncoder cannot be imported') return dir_path = os.path.dirname(os.path.realpath(__file__))