"""
Performs model training, testing, evaluations and deployment
"""
# Author: Shashank Agrawal
# License: MIT
# Version: 0.1.2
# Email: dew@bluemist-ai.one
# Created: Jun 22, 2022
# Last modified: June 19, 2023
import importlib
import logging
import os
import time
from logging import config
import pandas as pd
from pandas.core.dtypes.common import is_numeric_dtype
from sklearn.compose import TransformedTargetRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.pipeline import Pipeline
from tqdm import tqdm
import bluemist
from bluemist import environment
from bluemist.pipeline.bluemist_pipeline import add_pipeline_step, save_model_pipeline, clear_all_model_pipelines
from bluemist.preprocessing import preprocessor
from bluemist.regression.constant import multi_output_regressors, multi_task_regressors, unsupported_regressors, \
base_estimator_regressors
from bluemist.utils.constants import GPU_BRAND_INTEL, GPU_BRAND_NVIDIA, GPU_ACCELERATION_NVIDIA, CPU_ACCELERATION_INTEL, \
CPU_BRAND_INTEL
from bluemist.utils.metrics import metric_scorer
from sklearn.utils import all_estimators
from bluemist.utils.scaler import get_scaler
from bluemist.utils import generate_api as generate_api
from bluemist.artifacts.api import predict
from IPython.display import display, HTML
BLUEMIST_PATH = os.environ["BLUEMIST_PATH"]
config.fileConfig(BLUEMIST_PATH + '/' + 'logging.config')
logger = logging.getLogger("bluemist")
def import_mlflow():
import mlflow
return mlflow
def measure_execution_time(start_time):
# Calculate the elapsed time
elapsed_time = time.time() - start_time
# Convert elapsed time to seconds, minutes, milliseconds, and hours
milliseconds = int(elapsed_time * 1000) % 1000
seconds = elapsed_time % 60
minutes = (elapsed_time // 60) % 60
hours = (elapsed_time // 3600)
execution_time = "{:02d}:{:02d}:{:02d},{:03d}".format(int(hours), int(minutes), int(seconds), int(milliseconds))
return execution_time
def initialize_mlflow(mlflow_experiment_name):
mlflow = import_mlflow()
logger.info('Initializing MLFlow...')
if mlflow_experiment_name is not None:
experiment = mlflow.get_experiment_by_name(mlflow_experiment_name)
if not experiment:
mlflow.create_experiment(name=mlflow_experiment_name,
artifact_location=BLUEMIST_PATH + '/' + 'artifacts/experiments/mlflow')
if experiment is not None and experiment.lifecycle_stage == 'deleted':
logger.info('Restoring MLFlow experiment :: {}'.format(mlflow_experiment_name))
client = mlflow.tracking.MlflowClient()
client.restore_experiment(experiment.experiment_id)
mlflow.set_experiment(mlflow_experiment_name)
[docs]def get_estimators(multi_output=False, multi_task=False, names_only=True):
"""
**Returns the list of available regression estimators**
multi_output : bool, default=False
Future use
multi_task : bool, default=False
Future use
names_only : bool, default=True
Returns only the estimator name without metadata
"""
estimators = all_estimators(type_filter='regressor')
logger.debug('All available estimators :: {}'.format(estimators))
estimators_to_remove = []
for estimator in estimators:
if not multi_output and estimator[0] in multi_output_regressors:
estimators_to_remove.append(estimator)
if not multi_task and estimator[0] in multi_task_regressors:
estimators_to_remove.append(estimator)
if unsupported_regressors and estimator[0] in unsupported_regressors:
estimators_to_remove.append(estimator)
if base_estimator_regressors and estimator[0] in base_estimator_regressors:
estimators_to_remove.append(estimator)
logger.debug('Estimators not supported by Bluemist AI :: {}'.format(estimators_to_remove))
for estimator_to_remove in estimators_to_remove:
estimators.remove(estimator_to_remove)
if bool(names_only):
return [estimator[0] for estimator in estimators]
logger.info('Estimators available for modelling :: {}'.format(estimators))
return estimators
[docs]def deploy_model(estimator_name, host='localhost', port=8000):
"""
estimator_name : str
Name of the estimator to be deployed
host : {str, IPv4 or IPv6}, default='localhost'
Hostname or IP address of the machine where the API will be deployed
port : int, default=8000
API listening port
"""
logger.info('Generating API code to deploy the model :: {}'.format(estimator_name))
generate_api.generate_api_code(estimator_name=estimator_name,
initial_column_metadata=preprocessor.initial_column_metadata_for_deployment,
encoded_column_metadata=preprocessor.encoded_columns_for_deployment,
target_variable=preprocessor.target_for_deployment)
importlib.reload(predict)
logger.info('Starting API server on host {} and port {}'.format(host, port))
predict.start_api_server(host=host, port=port)
[docs]def train_test_evaluate(
X_train,
X_test,
y_train,
y_test,
tune_models=None,
metrics='default',
multi_output=False,
multi_task=False,
target_scaling_strategy=None,
save_pipeline_to_disk=True,
experiment_name=None,
run_name=None):
"""
**Trains the data on the given dataset, evaluate the models and returns comparison metrics**
X_train : pandas dataframe
Training data
X_test : pandas dataframe
Test data
y_train : array of shape (X_train.shape[0],)
Target values of training dataset
y_test : array of shape (X_test.shape[0],)
Target values of test dataset
tune_models : {'all', None} or list of models to be trained, default=None
all: tune all regression models
list: list of models to be trained
None: hyperparameter tuning will not be performed
metrics : {'all', 'default'}, default='default'
- all:
mean_absolute_error, mean_squared_error, r2_score, explained_variance_score, max_error,
mean_squared_log_error, median_absolute_error, mean_absolute_percentage_error, mean_poisson_deviance,
mean_gamma_deviance, mean_tweedie_deviance, d2_tweedie_score, mean_pinball_loss
- default:
mean_absolute_error, mean_squared_error, r2_score
multi_output : bool, default=False
Future use
multi_task : bool, default=False
Future use
target_scaling_strategy : {'StandardScaler', 'MinMaxScaler', 'MaxAbsScaler', 'RobustScaler', None}, default=None
Scales the target variable before training the model
save_pipeline_to_disk : bool, default=True
Save preprocessor and model training pipeline to the disk. Should be set to True if needs model to be deployed
as an API
experiment_name : str, default=None
Name of the experiment
run_name : str,default=None
Name of the run within the experiment
Examples
---------
*Regression*
.. raw:: html
:file: ../../code_samples/quickstarts/regression/regression_hyperparameter_tuning.html
"""
tune_all_models = False
models_to_tune = []
target_scaler = None
sklearnex_algorithms = None
if target_scaling_strategy is not None:
target_scaler = get_scaler(target_scaling_strategy)
if isinstance(tune_models, str) and tune_models == 'all':
tune_all_models = True
elif isinstance(tune_models, list):
models_to_tune = tune_models
if experiment_name is not None:
initialize_mlflow(experiment_name)
# Get patch names from sklearnex
if environment.available_cpu == CPU_BRAND_INTEL:
from sklearnex import sklearn_is_patched, get_patch_names
sklearnex_algorithms = get_patch_names()
df = pd.DataFrame()
estimators = get_estimators(multi_output, multi_task, names_only=False)
clear_all_model_pipelines()
counter = 0
# If hyperparameter tuning is requested for specific models, limit the overall training to those models to save time
if tune_models is not None and not tune_all_models and models_to_tune:
estimators_to_skip = []
for estimator in estimators:
if estimator[0] not in models_to_tune:
estimators_to_skip.append(estimator)
for estimator_to_skip in estimators_to_skip:
estimators.remove(estimator_to_skip)
for estimator_name, estimator_class in (pbar := tqdm(estimators, colour='blue')):
start_time = time.time()
pbar.set_description(f"Training {estimator_name}")
counter = counter + 1
estimator_execution_device = 'CPU'
# No tuning OR Tune All OR Tune Few
if tune_models is None or tune_all_models or estimator_name in models_to_tune:
logger.info('############ Regressor in progress :: {} ############'.format(estimator_name))
regressor = None
try:
if environment.available_gpu == GPU_BRAND_NVIDIA:
import cuml
cuml.set_global_output_type('array') # cuML will return predictions of type cupy.ndarray
if hasattr(cuml, estimator_name):
regressor = getattr(cuml, estimator_name)()
estimator_execution_device = GPU_ACCELERATION_NVIDIA
logger.info('Regressor class from cuML :: {}'.format(regressor.__class__.__module__ + '.' + regressor.__class__.__name__))
if regressor is None and environment.available_cpu == CPU_BRAND_INTEL:
for sklearnex_algorithm in sklearnex_algorithms:
if (estimator_name == 'LinearRegression' and sklearnex_algorithm == 'linear') or estimator_name.lower() == sklearnex_algorithm:
logger.info('\nSklearn Algorithm :: {}, Sklearn Intel(R) Ex Algorithm :: {}'.format(estimator_name, sklearnex_algorithm))
if sklearn_is_patched(name=sklearnex_algorithm):
regressor = estimator_class()
estimator_execution_device = CPU_ACCELERATION_INTEL
break
# Normal CPU processing if acceleration extensions are not applied
if regressor is None:
regressor = estimator_class()
# TODO: Revisit this code. sklearn throws error without n_components=1
if estimator_name in ['CCA', 'PLSCanonical']:
regressor.set_params(n_components=1)
# Hyperparameter tuning is requested
if tune_all_models or estimator_name in models_to_tune:
estimator_params = regressor.get_params()
logger.info('Available hyperparameters to be tuned :: {}'.format(estimator_params))
logger.debug('Python type() for hyperparameters :: {}'.format(type(estimator_params)))
if estimator_execution_device == GPU_ACCELERATION_NVIDIA:
default_hyperparameters_module = bluemist.regression.tuning.cuml
else:
default_hyperparameters_module = bluemist.regression.tuning.sklearn
default_hyperparameters = getattr(default_hyperparameters_module, 'default_params', None)
model_params_for_tuning = getattr(default_hyperparameters_module, estimator_name, None)
remove_params = model_params_for_tuning.get('remove_params')
hyperparameters_to_remove = []
for hyperparameter, default_value in estimator_params.items():
if str(default_value) == 'deprecated':
hyperparameters_to_remove.append(hyperparameter)
logger.debug('Deprecated hyperparameter identified :: {}'.format(hyperparameter))
elif model_params_for_tuning is not None and hyperparameter in model_params_for_tuning:
estimator_params[hyperparameter] = model_params_for_tuning[hyperparameter]
logger.debug('Hyperparameter in model configuration :: {} :: {}'.format(hyperparameter, estimator_params[hyperparameter]))
elif hyperparameter in default_hyperparameters:
estimator_params[hyperparameter] = default_hyperparameters[hyperparameter]
logger.debug('Hyperparameter in default configuration :: {} :: {}'.format(hyperparameter, estimator_params[hyperparameter]))
if remove_params is not None:
if hyperparameter in remove_params and hyperparameter not in hyperparameters_to_remove:
hyperparameters_to_remove.append(hyperparameter)
logger.debug('Unsupported hyperparameter identified :: {}'.format(hyperparameter))
for hyperparameter_to_remove in hyperparameters_to_remove:
estimator_params.pop(hyperparameter_to_remove, None)
# Creating new dictionary of hyperparameters to add step name as required by the sklearn pipeline
hyperparameters = {}
for hyperparameter in estimator_params:
old_key = hyperparameter
if target_scaling_strategy is not None:
new_key = estimator_name + '__regressor__' + hyperparameter
else:
new_key = estimator_name + '__' + hyperparameter
hyperparameters[new_key] = estimator_params[old_key]
logger.info('Hyperparameters to be used for model tuning :: {}'.format(hyperparameters))
if target_scaling_strategy is not None:
# TODO: Remove the usage of TransformedTargetRegressor so speical handling is not required for cuML
transformed_target_regressor = TransformedTargetRegressor(regressor=regressor, transformer=target_scaler)
if environment.available_gpu == 'NVIDIA':
step_estimator = (estimator_name, regressor)
else:
step_estimator = (estimator_name, transformed_target_regressor)
steps = add_pipeline_step(estimator_name, step_estimator)
else:
step_estimator = (estimator_name, regressor)
steps = add_pipeline_step(estimator_name, step_estimator)
if steps is not None:
model_pipeline = Pipeline(steps=steps)
randomized_search = RandomizedSearchCV(model_pipeline, param_distributions=hyperparameters, n_iter=100, error_score='raise')
optimized_estimator = randomized_search.fit(X_train, y_train)
best_estimator_pipeline = optimized_estimator.best_estimator_
logger.debug('Model pipeline parameters :: {}'.format(model_pipeline.get_params().keys()))
logger.info('Fitted estimator with all parameters :: {}'.format(optimized_estimator))
logger.debug('Model pipeline :: {}'.format(model_pipeline))
logger.info('Model pipeline with best estimator :: {}'.format(best_estimator_pipeline))
if save_pipeline_to_disk:
logger.info('Saving model pipeline to disk')
save_model_pipeline(estimator_name, best_estimator_pipeline)
else:
if target_scaling_strategy is not None:
# TODO: Remove the usage of TransformedTargetRegressor so speical handling is not required for cuML
transformed_target_regressor = TransformedTargetRegressor(regressor=regressor, transformer=target_scaler)
if environment.available_gpu == 'NVIDIA':
step_estimator = (estimator_name, regressor)
else:
step_estimator = (estimator_name, transformed_target_regressor)
steps = add_pipeline_step(estimator_name, step_estimator)
else:
step_estimator = (estimator_name, regressor)
steps = add_pipeline_step(estimator_name, step_estimator)
model_pipeline = Pipeline(steps=steps)
best_estimator_pipeline = model_pipeline.fit(X_train, y_train)
if save_pipeline_to_disk:
save_model_pipeline(estimator_name, best_estimator_pipeline)
logger.info('Model pipeline with best estimator (no hyperparameter tuning) :: {}'.format(best_estimator_pipeline))
logger.debug('Model pipeline (no hyperparameter tuning) :: {}'.format(model_pipeline))
if tune_all_models or estimator_name in models_to_tune:
logger.info('Best score :: {}'.format(optimized_estimator.best_score_))
logger.info('Best hyperparameters :: {}'.format(optimized_estimator.best_params_))
y_pred = best_estimator_pipeline.predict(X_test)
# Convert cupy.ndarray to numpy.ndarray as sklearn returns numpy.ndarray but cuML will return cupy.ndarray
y_pred_class_name = y_pred.__class__.__module__ + '.' + y_pred.__class__.__name__
logger.info('y_pred_class_name :: {}'.format(y_pred_class_name))
if y_pred_class_name == 'cupy.ndarray':
import cupy as cp
y_pred = cp.asnumpy(y_pred)
scorer = metric_scorer(y_test, y_pred, metrics)
estimator_stats_df = scorer.calculate_metrics()
final_stats_df = estimator_stats_df.copy()
execution_time = measure_execution_time(start_time)
final_stats_df.insert(0, 'Estimator', estimator_name) # Insert Estimator name as the first column in the dataframe
final_stats_df.insert(1, 'Execution Device', estimator_execution_device) # Insert Execution Device as the second column in the dataframe
final_stats_df.insert(2, 'Execution Time', execution_time) # Insert Execution Time as the third column in the dataframe
logger.info('Current estimator Stats :: \n{}'.format(final_stats_df.to_string()))
logger.debug('Current estimator stats as dictionary :: {}'.format(final_stats_df.to_dict('records')))
df = pd.concat([df, final_stats_df], ignore_index=True)
logger.debug('Estimator stats so far :: \n{}'.format(df.to_string()))
if experiment_name is not None:
mlflow = import_mlflow()
with mlflow.start_run(run_name=run_name):
logger.info('Capturing stats in MLFlow...')
mlflow.log_param('model', estimator_name)
mlflow.log_metrics(estimator_stats_df.to_dict('records')[0])
mlflow.sklearn.log_model(best_estimator_pipeline, 'model_' + estimator_name)
run_id = mlflow.active_run().info.run_id
logger.info('Model saved in run :: {}'.format(run_id))
except Exception as e:
exception = {'Estimator': [estimator_name], 'Exception': str(e)}
exception_df = pd.DataFrame(exception)
logger.info('Exception occurred :: \n{}'.format(exception_df))
df = pd.concat([df, exception_df])
logger.error('Exception occurred while training the model :: {}'.format(str(e)), exc_info=True)
df.set_index('Estimator', inplace=True)
#print(df)
display(HTML(df.style
.highlight_max(
subset=[col for col in df.columns if col.endswith('score') and is_numeric_dtype(df[col])], color='green')
.highlight_min(
subset=[col for col in df.columns if col.endswith('score') and is_numeric_dtype(df[col])], color='yellow')
.highlight_max(
subset=[col for col in df.columns if not col.endswith('score') and is_numeric_dtype(df[col])], color='yellow')
.highlight_min(
subset=[col for col in df.columns if not col.endswith('score') and is_numeric_dtype(df[col])], color='green')
.to_html()))
logger.info('Estimator stats across all trained models : \n{}'.format(df.to_string()))