Source code for evalml.pipelines.pipeline_base

import copy
import re
from abc import ABC, abstractmethod
from collections import OrderedDict

import cloudpickle
import pandas as pd
from sklearn.model_selection import train_test_split

from .components import Estimator, handle_component
from .graphs import make_feature_importance_graph, make_pipeline_graph

from evalml.exceptions import IllFormattedClassNameError
from evalml.objectives import get_objective
from evalml.problem_types import handle_problem_types
from evalml.utils import Logger, classproperty, get_random_state

logger = Logger()


[docs]class PipelineBase(ABC): """Base class for all pipelines.""" @property @classmethod @abstractmethod def component_graph(cls): """Returns list of components representing pipeline graph structure Returns: list(str/ComponentBase): list of ComponentBase objects or strings denotes graph structure of this pipeline """ return NotImplementedError("This pipeline must have `component_graph` as a class variable.") @property @classmethod @abstractmethod def supported_problem_types(cls): """Returns a list of ProblemTypes that this pipeline supports Returns: list(str/ProblemType): list of ProblemType objects or strings that this pipeline supports """ return NotImplementedError("This pipeline must have `supported_problem_types` as a class variable.") custom_hyperparameters = None
[docs] def __init__(self, parameters, objective, random_state=0): """Machine learning pipeline made out of transformers and a estimator. Required Class Variables: component_graph (list): List of components in order. Accepts strings or ComponentBase objects in the list supported_problem_types (list): List of problem types for this pipeline. Accepts strings or ProbemType enum in the list. Arguments: objective (ObjectiveBase): the objective to optimize parameters (dict): dictionary with component names as keys and dictionary of that component's parameters as values. An empty dictionary {} implies using all default values for component parameters. random_state (int, np.random.RandomState): The random seed/state. Defaults to 0. """ self.random_state = get_random_state(random_state) self.component_graph = [self._instantiate_component(c, parameters) for c in self.component_graph] self.supported_problem_types = [handle_problem_types(problem_type) for problem_type in self.supported_problem_types] self.objective = get_objective(objective) self.input_feature_names = {} self.results = {} self.estimator = self.component_graph[-1] if isinstance(self.component_graph[-1], Estimator) else None if self.estimator is None: raise ValueError("A pipeline must have an Estimator as the last component in component_graph.") self._validate_problem_types(self.supported_problem_types)
@classproperty def name(cls): """Returns a name describing the pipeline. By default, this will take the class name and add a space between each capitalized word. If the pipeline has a _name attribute, this will be returned instead. """ try: name = cls._name except AttributeError: rex = re.compile(r'(?<=[a-z])(?=[A-Z])') name = rex.sub(' ', cls.__name__) if name == cls.__name__: raise IllFormattedClassNameError("Pipeline Class {} needs to follow pascall case standards or `_name` must be defined.".format(cls.__name__)) return name @classproperty def summary(cls): """Returns a short summary of the pipeline structure, describing the list of components used. Example: Logistic Regression Classifier w/ Simple Imputer + One Hot Encoder """ def _generate_summary(component_graph): component_graph[-1] = handle_component(component_graph[-1]) estimator = component_graph[-1] if isinstance(component_graph[-1], Estimator) else None if estimator is not None: summary = "{}".format(estimator.name) else: summary = "Pipeline" for index, component in enumerate(component_graph[:-1]): component = handle_component(component) if index == 0: summary += " w/ {}".format(component.name) else: summary += " + {}".format(component.name) return summary return _generate_summary(cls.component_graph) def _validate_problem_types(self, problem_types): """Validates provided `problem_types` against the estimator in `self.component_graph` Arguments: problem_types (list): list of ProblemTypes """ estimator_problem_types = self.estimator.supported_problem_types for problem_type in self.supported_problem_types: if problem_type not in estimator_problem_types: raise ValueError("Problem type {} not valid for this component graph. Valid problem types include {}.".format(problem_type, estimator_problem_types)) def _instantiate_component(self, component, parameters): """Instantiates components with parameters in `parameters`""" component = handle_component(component) component_class = component.__class__ component_name = component.name try: component_parameters = parameters.get(component_name, {}) new_component = component_class(**component_parameters, random_state=self.random_state) except (ValueError, TypeError) as e: err = "Error received when instantiating component {} with the following arguments {}".format(component_name, component_parameters) raise ValueError(err) from e return new_component def __getitem__(self, index): if isinstance(index, slice): raise NotImplementedError('Slicing pipelines is currently not supported.') elif isinstance(index, int): return self.component_graph[index] else: return self.get_component(index) def __setitem__(self, index, value): raise NotImplementedError('Setting pipeline components is not supported.')
[docs] def get_component(self, name): """Returns component by name Arguments: name (str): name of component Returns: Component: component to return """ return next((component for component in self.component_graph if component.name == name), None)
[docs] def describe(self): """Outputs pipeline details including component parameters Arguments: return_dict (bool): If True, return dictionary of information about pipeline. Defaults to false Returns: dict: dictionary of all component parameters if return_dict is True, else None """ logger.log_title(self.name) logger.log("Supported Problem Types: {}".format(', '.join([str(problem_type) for problem_type in self.supported_problem_types]))) logger.log("Model Family: {}".format(str(self.model_family))) better_string = "lower is better" if self.objective.greater_is_better: better_string = "greater is better" objective_string = "Objective to Optimize: {} ({})".format(self.objective.name, better_string) logger.log(objective_string) if self.estimator.name in self.input_feature_names: logger.log("Number of features: {}".format(len(self.input_feature_names[self.estimator.name]))) # Summary of steps logger.log_subtitle("Pipeline Steps") for number, component in enumerate(self.component_graph, 1): component_string = str(number) + ". " + component.name logger.log(component_string) component.describe(print_name=False)
def _transform(self, X): X_t = X for component in self.component_graph[:-1]: X_t = component.transform(X_t) return X_t def _fit(self, X, y): X_t = X y_t = y for component in self.component_graph[:-1]: self.input_feature_names.update({component.name: list(pd.DataFrame(X_t))}) X_t = component.fit_transform(X_t, y_t) self.input_feature_names.update({self.estimator.name: list(pd.DataFrame(X_t))}) self.estimator.fit(X_t, y_t)
[docs] def fit(self, X, y, objective_fit_size=.2): """Build a model Arguments: X (pd.DataFrame or np.array): the input training data of shape [n_samples, n_features] y (pd.Series): the target training labels of length [n_samples] feature_types (list, optional): list of feature types. either numeric of categorical. categorical features will automatically be encoded Returns: self """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) if not isinstance(y, pd.Series): y = pd.Series(y) if self.objective.needs_fitting: X, X_objective, y, y_objective = train_test_split(X, y, test_size=objective_fit_size, random_state=self.estimator.random_state) self._fit(X, y) if self.objective.needs_fitting: y_predicted = self.predict_proba(X_objective) if self.objective.uses_extra_columns: self.objective.fit(y_predicted, y_objective, X_objective) else: self.objective.fit(y_predicted, y_objective) return self
[docs] def predict(self, X): """Make predictions using selected features. Args: X (pd.DataFrame or np.array) : data of shape [n_samples, n_features] Returns: pd.Series : estimated labels """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) X_t = self._transform(X) if self.objective and self.objective.needs_fitting: y_predicted = self.predict_proba(X) if self.objective.uses_extra_columns: return self.objective.predict(y_predicted, X) return self.objective.predict(y_predicted) return self.estimator.predict(X_t)
[docs] def predict_proba(self, X): """Make probability estimates for labels. Args: X (pd.DataFrame or np.array) : data of shape [n_samples, n_features] Returns: pd.DataFrame : probability estimates """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) X = self._transform(X) proba = self.estimator.predict_proba(X) if proba.shape[1] <= 2: return proba[:, 1] else: return proba
[docs] def score(self, X, y, other_objectives=None): """Evaluate model performance on current and additional objectives Args: X (pd.DataFrame or np.array) : data of shape [n_samples, n_features] y (pd.Series) : true labels of length [n_samples] other_objectives (list): list of other objectives to score Returns: float, dict: score, ordered dictionary of other objective scores """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) if not isinstance(y, pd.Series): y = pd.Series(y) other_objectives = other_objectives or [] other_objectives = [get_objective(o) for o in other_objectives] y_predicted = None y_predicted_proba = None scores = [] for objective in [self.objective] + other_objectives: if objective.score_needs_proba: if y_predicted_proba is None: y_predicted_proba = self.predict_proba(X) y_predictions = y_predicted_proba else: if y_predicted is None: y_predicted = self.predict(X) y_predictions = y_predicted if objective.uses_extra_columns: scores.append(objective.score(y_predictions, y, X)) else: scores.append(objective.score(y_predictions, y)) if not other_objectives: return scores[0], {} other_scores = OrderedDict(zip([n.name for n in other_objectives], scores[1:])) return scores[0], other_scores
[docs] def graph(self, filepath=None): """Generate an image representing the pipeline graph Arguments: filepath (str, optional) : Path to where the graph should be saved. If set to None (as by default), the graph will not be saved. Returns: graphviz.Digraph: Graph object that can be directly displayed in Jupyter notebooks. """ return make_pipeline_graph(self.component_graph, self.name, filepath=filepath)
@classproperty def model_family(cls): "Returns model family of this pipeline template""" return handle_component(cls.component_graph[-1]).model_family @classproperty def hyperparameters(cls): "Returns hyperparameter ranges as a flat dictionary from all components " hyperparameter_ranges = dict() for component in cls.component_graph: component = handle_component(component) hyperparameter_ranges.update(component.hyperparameter_ranges) if cls.custom_hyperparameters: hyperparameter_ranges.update(cls.custom_hyperparameters) return hyperparameter_ranges @property def parameters(self): """Returns parameter dictionary for this pipeline Returns: dict: dictionary of all component parameters """ return {c.name: copy.copy(c.parameters) for c in self.component_graph if c.parameters} @property def feature_importances(self): """Return feature importances. Features dropped by feature selection are excluded""" feature_names = self.input_feature_names[self.estimator.name] importances = list(zip(feature_names, self.estimator.feature_importances)) # note: this only works for binary importances.sort(key=lambda x: -abs(x[1])) df = pd.DataFrame(importances, columns=["feature", "importance"]) return df
[docs] def feature_importance_graph(self, show_all_features=False): """Generate a bar graph of the pipeline's feature importances Arguments: show_all_features (bool, optional) : If true, graph features with an importance value of zero. Defaults to false. Returns: plotly.Figure, a bar graph showing features and their importances """ return make_feature_importance_graph(self.feature_importances, show_all_features=show_all_features)
[docs] def save(self, file_path): """Saves pipeline at file path Args: file_path (str) : location to save file Returns: None """ with open(file_path, 'wb') as f: cloudpickle.dump(self, f)
[docs] @staticmethod def load(file_path): """Loads pipeline at file path Args: file_path (str) : location to load file Returns: PipelineBase obj """ with open(file_path, 'rb') as f: return cloudpickle.load(f)