Source code for bluemist.regression.regressor

"""
Performs model training, testing, evaluations and deployment
"""

__author__ = "Shashank Agrawal"
__license__ = "MIT"
__version__ = "0.1.1"
__email__ = "dew@bluemist-ai.one"

import importlib
import logging
import os
from logging import config

import pandas as pd

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from pandas.core.dtypes.common import is_numeric_dtype
from sklearn.compose import TransformedTargetRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.pipeline import Pipeline
from tqdm import tqdm

import bluemist
from bluemist.pipeline.bluemist_pipeline import add_pipeline_step, save_model_pipeline, clear_all_model_pipelines
from bluemist.preprocessing import preprocessor
from bluemist.regression.constant import multi_output_regressors, multi_task_regressors, unsupported_regressors, \
    base_estimator_regressors

from bluemist.regression.tuning.constant import default_hyperparameters
from bluemist.utils.metrics import scoringStrategy
from sklearn.utils import all_estimators

from bluemist.utils.scaler import getScaler
from bluemist.utils import generate_api as generate_api
from bluemist.artifacts.api import predict

from IPython.display import display, HTML

BLUEMIST_PATH = os.environ["BLUEMIST_PATH"]

config.fileConfig(BLUEMIST_PATH + '/' + 'logging.config')
logger = logging.getLogger("bluemist")


def initialize_mlflow(mlflow_experiment_name):
    logger.info('Initializing MLFlow...')
    if mlflow_experiment_name is not None:
        experiment = mlflow.get_experiment_by_name(mlflow_experiment_name)
        if not experiment:
            mlflow.create_experiment(name=mlflow_experiment_name,
                                     artifact_location=BLUEMIST_PATH + '/' + 'artifacts/experiments/mlflow')

        if experiment is not None and experiment.lifecycle_stage == 'deleted':
            logger.info('Restoring MLFlow experiment :: {}'.format(mlflow_experiment_name))
            client = MlflowClient()
            client.restore_experiment(experiment.experiment_id)

        mlflow.set_experiment(mlflow_experiment_name)


[docs]def get_estimators(multi_output=False, multi_task=False, names_only=True): """ **Returns the list of available regression estimators** multi_output : bool, default=False Future use multi_task : bool, default=False Future use names_only : bool, default=True Rerturn only the estimator name without metadata """ estimators = all_estimators(type_filter='regressor') logger.debug('All available estimators :: {}'.format(estimators)) estimators_to_remove = [] for estimator in estimators: if not multi_output and estimator[0] in multi_output_regressors: estimators_to_remove.append(estimator) if not multi_task and estimator[0] in multi_task_regressors: estimators_to_remove.append(estimator) if unsupported_regressors and estimator[0] in unsupported_regressors: estimators_to_remove.append(estimator) if base_estimator_regressors and estimator[0] in base_estimator_regressors: estimators_to_remove.append(estimator) logger.debug('Estimators not supported by Bluemist AI :: {}'.format(estimators_to_remove)) for estimator_to_remove in estimators_to_remove: estimators.remove(estimator_to_remove) if bool(names_only): return [estimator[0] for estimator in estimators] logger.info('Estimators available for modelling :: {}'.format(estimators)) return estimators
[docs]def deploy_model(estimator_name, host='localhost', port=8000): """ estimator_name : str, Estimator name to be delpoyed host : {str, IPv4 or IPv6}, default='localhost' Hostname or ip address of the machine where API to be deployed port : int, default=8000 API listening port """ logger.info('Generating API code to deploy the model :: {}'.format(estimator_name)) generate_api.generate_api_code(estimator_name=estimator_name, initial_column_metadata=preprocessor.initial_column_metadata_for_deployment, encoded_column_metadata=preprocessor.encoded_columns_for_deployment, target_variable=preprocessor.target_for_deployment) importlib.reload(predict) logger.info('Starting API server on host {} and port {}'.format(host, port)) predict.start_api_server(host=host, port=port)
[docs]def train_test_evaluate( X_train, X_test, y_train, y_test, tune_models=None, metrics='default', multi_output=False, multi_task=False, target_scaling_strategy=None, save_pipeline_to_disk=True, experiment_name=None, run_name=None): """ **Trains the data on the given dataset, evaluate the models and returns comparison metrics** X_train : pandas dataframe Training data X_test : pandas dataframe Test data y_train : array of shape (X_train.shape[0],) Target values of training dataset y_test : array of shape (X_test.shape[0],) Target values of test dataset tune_models : {'all', None} or list of models to be trained, default=None all: tune all regression models list: list of models to be trained None: hyperparameter tuning will not be performed metrics : {'all', 'default'}, default='default' - all: mean_absolute_error, mean_squared_error, r2_score, explained_variance_score, max_error, mean_squared_log_error, median_absolute_error, mean_absolute_percentage_error, mean_poisson_deviance, mean_gamma_deviance, mean_tweedie_deviance, d2_tweedie_score, mean_pinball_loss - default: mean_absolute_error, mean_squared_error, r2_score multi_output : bool, default=False Future use multi_task : bool, default=False Future use target_scaling_strategy : {'StandardScaler', 'MinMaxScaler', 'MaxAbsScaler', 'RobustScaler', None}, default=None Scales the target variable before training the model save_pipeline_to_disk : bool, default=True Save preprocessor and model training pipeline to the disk. Should be set to True if needs model to be deployed as an API experiment_name : str, default=None Name of the experiment run_name : str,default=None Name of the run within the experiment Examples --------- *Regression* .. raw:: html :file: ../../code_samples/quickstarts/regression/regression_hyperparameter_tuning.html """ tune_all_models = False tune_model_list = [] target_scaler = None capture_stats = False if target_scaling_strategy is not None: target_scaler = getScaler(target_scaling_strategy) if isinstance(tune_models, str) and tune_models == 'all': tune_all_models = True elif isinstance(tune_models, list): tune_model_list = tune_models if experiment_name is not None: capture_stats = True initialize_mlflow(experiment_name) df = pd.DataFrame() estimators = get_estimators(multi_output, multi_task, names_only=False) clear_all_model_pipelines() i = 0 # If hyperparameter tuning is requested for specific models, limit the overall training to those models to save time if tune_models is not None and not tune_all_models and tune_model_list: estimators_to_skip = [] for estimator in estimators: if estimator[0] not in tune_model_list: estimators_to_skip.append(estimator) for estimator_to_skip in estimators_to_skip: estimators.remove(estimator_to_skip) for estimator_name, estimator_class in (pbar := tqdm(estimators, colour='blue')): pbar.set_description(f"Training {estimator_name}") i = i + 1 if tune_models is None or tune_all_models or estimator_name in tune_model_list: try: logger.info( '################### Regressor in progress :: {} ###################'.format(estimator_name)) regressor = estimator_class() if estimator_name in ['CCA', 'PLSCanonical']: regressor.set_params(n_components=1) # Hyperparameter tuning is requested if tune_all_models or estimator_name in tune_model_list: estimator_parameters = regressor.get_params() logger.info('Available hyperparameters to be tuned :: {}'.format(estimator_parameters)) logger.debug('Python type() for hyperparameters :: {}'.format(type(estimator_parameters))) model_hyperparameters_for_tuning = getattr(bluemist.regression.tuning.constant, estimator_name, None) deprecated_hyperparameters = [] for hyperparameter, default_hyperparameter_value in estimator_parameters.items(): if default_hyperparameter_value == 'deprecated': deprecated_hyperparameters.append(hyperparameter) logger.debug('Deprecated hyperparameter identified :: {}'.format(hyperparameter)) elif model_hyperparameters_for_tuning is not None \ and hyperparameter in model_hyperparameters_for_tuning: estimator_parameters[hyperparameter] = model_hyperparameters_for_tuning[hyperparameter] logger.debug('Hyperparameter in model configuration :: {} :: {}'.format(hyperparameter, estimator_parameters[ hyperparameter])) elif hyperparameter in default_hyperparameters: estimator_parameters[hyperparameter] = default_hyperparameters[hyperparameter] logger.debug('Hyperparameter in default configuration :: {} :: {}'.format(hyperparameter, estimator_parameters[ hyperparameter])) for deprecated_hyperparameter in deprecated_hyperparameters: estimator_parameters.pop(deprecated_hyperparameter, None) # Creating new dictionary of hyperparameters to add step name as required by the pipeline hyperparameters = {} for hyperparameter in estimator_parameters: old_key = hyperparameter if target_scaling_strategy is not None: new_key = estimator_name + '__regressor__' + hyperparameter else: new_key = estimator_name + '__' + hyperparameter hyperparameters[new_key] = estimator_parameters[old_key] logger.info('Hyperparameters to be used for model tuning :: {}'.format(hyperparameters)) if target_scaling_strategy is not None: transformed_target_regressor = TransformedTargetRegressor(regressor=regressor, transformer=target_scaler) step_estimator = (estimator_name, transformed_target_regressor) steps = add_pipeline_step(estimator_name, step_estimator) else: step_estimator = (estimator_name, regressor) steps = add_pipeline_step(estimator_name, step_estimator) if steps is not None: model_pipeline = Pipeline(steps=steps) search = RandomizedSearchCV(model_pipeline, param_distributions=hyperparameters, n_iter=100) fitted_estimator_with_all_parameters = search.fit(X_train, y_train) pipeline_with_best_estimator = fitted_estimator_with_all_parameters.best_estimator_ logger.debug('Model pipeline parameters :: {}'.format(model_pipeline.get_params().keys())) logger.info( 'Fitted estimator with all parameters :: {}'.format(fitted_estimator_with_all_parameters)) logger.debug('Model pipeline :: {}'.format(model_pipeline)) logger.info('Model pipeline with best estimator :: {}'.format(pipeline_with_best_estimator)) if save_pipeline_to_disk: logger.info('Saving model pipeline to disk') save_model_pipeline(estimator_name, pipeline_with_best_estimator) else: if target_scaling_strategy is not None: transformed_target_regressor = TransformedTargetRegressor(regressor=regressor, transformer=target_scaler) step_estimator = (estimator_name, transformed_target_regressor) steps = add_pipeline_step(estimator_name, step_estimator) else: step_estimator = (estimator_name, regressor) steps = add_pipeline_step(estimator_name, step_estimator) model_pipeline = Pipeline(steps=steps) pipeline_with_best_estimator = model_pipeline.fit(X_train, y_train) if save_pipeline_to_disk: save_model_pipeline(estimator_name, pipeline_with_best_estimator) logger.info('Model pipeline with best estimator (no hyperparameter tuning) :: {}'.format( pipeline_with_best_estimator)) logger.debug('Model pipeline (no hyperparameter tuning) :: {}'.format(model_pipeline)) if tune_all_models or estimator_name in tune_model_list: logger.info('Best score :: {}'.format(fitted_estimator_with_all_parameters.best_score_)) logger.info('Best hyperparameters :: {}'.format(fitted_estimator_with_all_parameters.best_params_)) y_pred = pipeline_with_best_estimator.predict(X_test) scorer = scoringStrategy(y_test, y_pred, metrics) estimator_stats_df = scorer.getStats() final_stats_df = estimator_stats_df.copy() # Insert Estimator name as the first column in the dataframe final_stats_df.insert(0, 'Estimator', estimator_name) logger.info('Current estimator Stats :: \n{}'.format(final_stats_df.to_string())) logger.debug( 'Current estimator stats as dictionary :: {}'.format(final_stats_df.to_dict('records'))) df = pd.concat([df, final_stats_df], ignore_index=True) logger.debug('Estimator stats so far :: \n{}'.format(df.to_string())) if capture_stats: with mlflow.start_run(run_name=run_name): logger.info('Capturing stats in MLFlow...') mlflow.log_param('model', estimator_name) mlflow.log_metrics(estimator_stats_df.to_dict('records')[0]) mlflow.sklearn.log_model(pipeline_with_best_estimator, 'model_' + estimator_name) run_id = mlflow.active_run().info.run_id logger.info('Model saved in run :: {}'.format(run_id)) except Exception as e: exception = {'Estimator': [estimator_name], 'Exception': str(e)} exception_df = pd.DataFrame(exception) logger.info('Exception occurred :: \n{}'.format(exception_df)) df = pd.concat([df, exception_df]) logger.error('Exception occurred while training the model :: {}'.format(str(e)), exc_info=True) df.set_index('Estimator', inplace=True) display(HTML(df.style .highlight_max( subset=[col for col in df.columns if col.endswith('score') and is_numeric_dtype(df[col])], color='green') .highlight_min( subset=[col for col in df.columns if col.endswith('score') and is_numeric_dtype(df[col])], color='yellow') .highlight_max( subset=[col for col in df.columns if not col.endswith('score') and is_numeric_dtype(df[col])], color='yellow') .highlight_min( subset=[col for col in df.columns if not col.endswith('score') and is_numeric_dtype(df[col])], color='green') .to_html())) logger.info('Estimator stats across all trained models : \n{}'.format(df.to_string()))