import copy
import inspect
import os
import re
from abc import ABC, abstractmethod
import cloudpickle
import numpy as np
import pandas as pd
from .components import Estimator, handle_component_class
from evalml.exceptions import IllFormattedClassNameError, MissingComponentError
from evalml.utils import (
classproperty,
get_logger,
get_random_state,
import_or_raise,
log_subtitle,
log_title
)
logger = get_logger(__file__)
[docs]class PipelineBase(ABC):
"""Base class for all pipelines."""
@property
@classmethod
@abstractmethod
def component_graph(cls):
"""Returns list of components representing pipeline graph structure
Returns:
list(str / ComponentBase subclass): list of ComponentBase subclasses or strings denotes graph structure of this pipeline
"""
custom_hyperparameters = None
custom_name = None
problem_type = None
[docs] def __init__(self, parameters, random_state=0):
"""Machine learning pipeline made out of transformers and a estimator.
Required Class Variables:
component_graph (list): List of components in order. Accepts strings or ComponentBase subclasses in the list
Arguments:
parameters (dict): dictionary with component names as keys and dictionary of that component's parameters as values.
An empty dictionary {} implies using all default values for component parameters.
random_state (int, np.random.RandomState): The random seed/state. Defaults to 0.
"""
self.random_state = get_random_state(random_state)
self.component_graph = [self._instantiate_component(component_class, parameters) for component_class in self.component_graph]
self.input_feature_names = {}
self.results = {}
self.estimator = self.component_graph[-1] if isinstance(self.component_graph[-1], Estimator) else None
if self.estimator is None:
raise ValueError("A pipeline must have an Estimator as the last component in component_graph.")
self._validate_estimator_problem_type()
@classproperty
def name(cls):
"""Returns a name describing the pipeline.
By default, this will take the class name and add a space between each capitalized word (class name should be in Pascal Case). If the pipeline has a custom_name attribute, this will be returned instead.
"""
if cls.custom_name:
name = cls.custom_name
else:
rex = re.compile(r'(?<=[a-z])(?=[A-Z])')
name = rex.sub(' ', cls.__name__)
if name == cls.__name__:
raise IllFormattedClassNameError("Pipeline Class {} needs to follow Pascal Case standards or `custom_name` must be defined.".format(cls.__name__))
return name
@classproperty
def summary(cls):
"""Returns a short summary of the pipeline structure, describing the list of components used.
Example: Logistic Regression Classifier w/ Simple Imputer + One Hot Encoder
"""
component_graph = [handle_component_class(component_class) for component_class in copy.copy(cls.component_graph)]
if len(component_graph) == 0:
return "Empty Pipeline"
summary = "Pipeline"
component_graph[-1] = component_graph[-1]
if inspect.isclass(component_graph[-1]) and issubclass(component_graph[-1], Estimator):
estimator_class = component_graph.pop(-1)
summary = estimator_class.name
if len(component_graph) == 0:
return summary
component_names = [component_class.name for component_class in component_graph]
return '{} w/ {}'.format(summary, ' + '.join(component_names))
def _validate_estimator_problem_type(self):
"""Validates this pipeline's problem_type against that of the estimator from `self.component_graph`"""
estimator_problem_types = self.estimator.supported_problem_types
if self.problem_type not in estimator_problem_types:
raise ValueError("Problem type {} not valid for this component graph. Valid problem types include {}."
.format(self.problem_type, estimator_problem_types))
def _instantiate_component(self, component_class, parameters):
"""Instantiates components with parameters in `parameters`"""
try:
component_class = handle_component_class(component_class)
except MissingComponentError as e:
err = "Error recieved when retrieving class for component {}".format(component_class)
raise MissingComponentError(err) from e
component_name = component_class.name
try:
component_parameters = parameters.get(component_name, {})
new_component = component_class(**component_parameters, random_state=self.random_state)
except (ValueError, TypeError) as e:
err = "Error received when instantiating component {} with the following arguments {}".format(component_name, component_parameters)
raise ValueError(err) from e
return new_component
def __getitem__(self, index):
if isinstance(index, slice):
raise NotImplementedError('Slicing pipelines is currently not supported.')
elif isinstance(index, int):
return self.component_graph[index]
else:
return self.get_component(index)
def __setitem__(self, index, value):
raise NotImplementedError('Setting pipeline components is not supported.')
[docs] def get_component(self, name):
"""Returns component by name
Arguments:
name (str): name of component
Returns:
Component: component to return
"""
return next((component for component in self.component_graph if component.name == name), None)
[docs] def describe(self):
"""Outputs pipeline details including component parameters
Arguments:
return_dict (bool): If True, return dictionary of information about pipeline. Defaults to false
Returns:
dict: dictionary of all component parameters if return_dict is True, else None
"""
log_title(logger, self.name)
logger.info("Problem Type: {}".format(self.problem_type))
logger.info("Model Family: {}".format(str(self.model_family)))
if self.estimator.name in self.input_feature_names:
logger.info("Number of features: {}".format(len(self.input_feature_names[self.estimator.name])))
# Summary of steps
log_subtitle(logger, "Pipeline Steps")
for number, component in enumerate(self.component_graph, 1):
component_string = str(number) + ". " + component.name
logger.info(component_string)
component.describe(print_name=False)
def _transform(self, X):
X_t = X
for component in self.component_graph[:-1]:
X_t = component.transform(X_t)
return X_t
def _fit(self, X, y):
X_t = X
y_t = y
for component in self.component_graph[:-1]:
self.input_feature_names.update({component.name: list(pd.DataFrame(X_t))})
X_t = component.fit_transform(X_t, y_t)
self.input_feature_names.update({self.estimator.name: list(pd.DataFrame(X_t))})
self.estimator.fit(X_t, y_t)
[docs] def fit(self, X, y):
"""Build a model
Arguments:
X (pd.DataFrame or np.array): the input training data of shape [n_samples, n_features]
y (pd.Series): the target training labels of length [n_samples]
Returns:
self
"""
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
if not isinstance(y, pd.Series):
y = pd.Series(y)
self._fit(X, y)
return self
[docs] def predict(self, X, objective=None):
"""Make predictions using selected features.
Arguments:
X (pd.DataFrame or np.array) : data of shape [n_samples, n_features]
objective (Object or string): the objective to use to make predictions
Returns:
pd.Series : estimated labels
"""
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
X_t = self._transform(X)
return self.estimator.predict(X_t)
[docs] @abstractmethod
def score(self, X, y, objectives):
"""Evaluate model performance on current and additional objectives
Arguments:
X (pd.DataFrame or np.array) : data of shape [n_samples, n_features]
y (pd.Series) : true labels of length [n_samples]
objectives (list): Non-empty list of objectives to score on
Returns:
dict: ordered dictionary of objective scores
"""
@staticmethod
def _score(X, y, predictions, objective):
"""Given data, model predictions or predicted probabilities computed on the data, and an objective, evaluate and return the objective score.
Will return `np.nan` if the objective errors.
"""
score = np.nan
try:
score = objective.score(y, predictions, X)
except Exception as e:
logger.error('Error in PipelineBase.score while scoring objective {}: {}'.format(objective.name, str(e)))
return score
@classproperty
def model_family(cls):
"Returns model family of this pipeline template"""
component_graph = copy.copy(cls.component_graph)
return handle_component_class(component_graph[-1]).model_family
@classproperty
def hyperparameters(cls):
"Returns hyperparameter ranges from all components as a dictionary"
hyperparameter_ranges = dict()
component_graph = copy.copy(cls.component_graph)
for component_class in component_graph:
component_class = handle_component_class(component_class)
component_hyperparameters = copy.copy(component_class.hyperparameter_ranges)
if cls.custom_hyperparameters and component_class.name in cls.custom_hyperparameters:
component_hyperparameters.update(cls.custom_hyperparameters.get(component_class.name, {}))
hyperparameter_ranges[component_class.name] = component_hyperparameters
return hyperparameter_ranges
@property
def parameters(self):
"""Returns parameter dictionary for this pipeline
Returns:
dict: dictionary of all component parameters
"""
return {c.name: copy.copy(c.parameters) for c in self.component_graph if c.parameters}
@classproperty
def default_parameters(cls):
"""Returns the default parameter dictionary for this pipeline.
Returns:
dict: dictionary of all component default parameters.
"""
defaults = {}
for c in cls.component_graph:
component = handle_component_class(c)
if component.default_parameters:
defaults[component.name] = component.default_parameters
return defaults
@property
def feature_importance(self):
"""Return importance associated with each feature. Features dropped by feature selection are excluded"""
feature_names = self.input_feature_names[self.estimator.name]
importance = list(zip(feature_names, self.estimator.feature_importance)) # note: this only works for binary
importance.sort(key=lambda x: -abs(x[1]))
df = pd.DataFrame(importance, columns=["feature", "importance"])
return df
[docs] def graph(self, filepath=None):
"""Generate an image representing the pipeline graph
Arguments:
filepath (str, optional) : Path to where the graph should be saved. If set to None (as by default), the graph will not be saved.
Returns:
graphviz.Digraph: Graph object that can be directly displayed in Jupyter notebooks.
"""
graphviz = import_or_raise('graphviz', error_msg='Please install graphviz to visualize pipelines.')
# Try rendering a dummy graph to see if a working backend is installed
try:
graphviz.Digraph().pipe()
except graphviz.backend.ExecutableNotFound:
raise RuntimeError(
"To graph entity sets, a graphviz backend is required.\n" +
"Install the backend using one of the following commands:\n" +
" Mac OS: brew install graphviz\n" +
" Linux (Ubuntu): sudo apt-get install graphviz\n" +
" Windows: conda install python-graphviz\n"
)
graph_format = None
path_and_name = None
if filepath:
# Explicitly cast to str in case a Path object was passed in
filepath = str(filepath)
try:
f = open(filepath, 'w')
f.close()
except (IOError, FileNotFoundError):
raise ValueError(('Specified filepath is not writeable: {}'.format(filepath)))
path_and_name, graph_format = os.path.splitext(filepath)
graph_format = graph_format[1:].lower() # ignore the dot
supported_filetypes = graphviz.backend.FORMATS
if graph_format not in supported_filetypes:
raise ValueError(("Unknown format '{}'. Make sure your format is one of the " +
"following: {}").format(graph_format, supported_filetypes))
# Initialize a new directed graph
graph = graphviz.Digraph(name=self.name, format=graph_format,
graph_attr={'splines': 'ortho'})
graph.attr(rankdir='LR')
# Draw components
for component in self.component_graph:
label = '%s\l' % (component.name) # noqa: W605
if len(component.parameters) > 0:
parameters = '\l'.join([key + ' : ' + "{:0.2f}".format(val) if (isinstance(val, float))
else key + ' : ' + str(val)
for key, val in component.parameters.items()]) # noqa: W605
label = '%s |%s\l' % (component.name, parameters) # noqa: W605
graph.node(component.name, shape='record', label=label)
# Draw edges
for i in range(len(self.component_graph[:-1])):
graph.edge(self.component_graph[i].name, self.component_graph[i + 1].name)
if filepath:
graph.render(path_and_name, cleanup=True)
return graph
[docs] def graph_feature_importance(self, show_all_features=False):
"""Generate a bar graph of the pipeline's feature importance
Arguments:
show_all_features (bool, optional) : If true, graph features with an importance value of zero. Defaults to false.
Returns:
plotly.Figure, a bar graph showing features and their corresponding importance
"""
go = import_or_raise("plotly.graph_objects", error_msg="Cannot find dependency plotly.graph_objects")
feat_imp = self.feature_importance
feat_imp['importance'] = abs(feat_imp['importance'])
if not show_all_features:
# Remove features with zero importance
feat_imp = feat_imp[feat_imp['importance'] != 0]
# List is reversed to go from ascending order to descending order
feat_imp = feat_imp.iloc[::-1]
title = 'Feature Importance'
subtitle = 'May display fewer features due to feature selection'
data = [go.Bar(
x=feat_imp['importance'],
y=feat_imp['feature'],
orientation='h'
)]
layout = {
'title': '{0}<br><sub>{1}</sub>'.format(title, subtitle),
'height': 800,
'xaxis_title': 'Feature Importance',
'yaxis_title': 'Feature',
'yaxis': {
'type': 'category'
}
}
fig = go.Figure(data=data, layout=layout)
return fig
[docs] def save(self, file_path):
"""Saves pipeline at file path
Arguments:
file_path (str) : location to save file
Returns:
None
"""
with open(file_path, 'wb') as f:
cloudpickle.dump(self, f)
[docs] @staticmethod
def load(file_path):
"""Loads pipeline at file path
Arguments:
file_path (str) : location to load file
Returns:
PipelineBase object
"""
with open(file_path, 'rb') as f:
return cloudpickle.load(f)
[docs] def clone(self, random_state=0):
"""Constructs a new pipeline with the same parameters and components.
Arguments:
random_state (int): the value to seed the random state with. Can also be a RandomState instance. Defaults to 0.
Returns:
A new instance of this pipeline with identical parameters and components
"""
return self.__class__(self.parameters, random_state=random_state)