"""Component that imputes missing data according to a specified timeseries-specific imputation strategy."""
import pandas as pd
import woodwork as ww
from woodwork.logical_types import (
    BooleanNullable,
    Double,
)
from evalml.pipelines.components.transformers import Transformer
from evalml.utils import infer_feature_types
from evalml.utils.nullable_type_utils import (
    _determine_fractional_type,
    _determine_non_nullable_equivalent,
)
[docs]class TimeSeriesImputer(Transformer):
    """Imputes missing data according to a specified timeseries-specific imputation strategy.
    This Transformer should be used after the `TimeSeriesRegularizer` in order to impute the missing values that were
    added to X and y (if passed).
    Args:
        categorical_impute_strategy (string): Impute strategy to use for string, object, boolean, categorical dtypes.
            Valid values include "backwards_fill" and "forwards_fill". Defaults to "forwards_fill".
        numeric_impute_strategy (string): Impute strategy to use for numeric columns. Valid values include
            "backwards_fill", "forwards_fill", and "interpolate". Defaults to "interpolate".
        target_impute_strategy (string): Impute strategy to use for the target column. Valid values include
            "backwards_fill", "forwards_fill", and "interpolate". Defaults to "forwards_fill".
        random_seed (int): Seed for the random number generator. Defaults to 0.
    Raises:
        ValueError: If categorical_impute_strategy, numeric_impute_strategy, or target_impute_strategy is not one of the valid values.
    """
    modifies_features = True
    modifies_target = True
    training_only = True
    name = "Time Series Imputer"
    hyperparameter_ranges = {
        "categorical_impute_strategy": ["backwards_fill", "forwards_fill"],
        "numeric_impute_strategy": ["backwards_fill", "forwards_fill", "interpolate"],
        "target_impute_strategy": ["backwards_fill", "forwards_fill", "interpolate"],
    }
    """{
        "categorical_impute_strategy": ["backwards_fill", "forwards_fill"],
        "numeric_impute_strategy": ["backwards_fill", "forwards_fill", "interpolate"],
        "target_impute_strategy": ["backwards_fill", "forwards_fill", "interpolate"],
    }"""
    _valid_categorical_impute_strategies = set(["backwards_fill", "forwards_fill"])
    _valid_numeric_impute_strategies = set(
        ["backwards_fill", "forwards_fill", "interpolate"],
    )
    _valid_target_impute_strategies = set(
        ["backwards_fill", "forwards_fill", "interpolate"],
    )
    # Incompatibility: https://github.com/alteryx/evalml/issues/4001
    # TODO: Remove when support is added https://github.com/alteryx/evalml/issues/4014
    _integer_nullable_incompatibilities = ["X", "y"]
    _boolean_nullable_incompatibilities = ["y"]
    def __init__(
        self,
        categorical_impute_strategy="forwards_fill",
        numeric_impute_strategy="interpolate",
        target_impute_strategy="forwards_fill",
        random_seed=0,
        **kwargs,
    ):
        if categorical_impute_strategy not in self._valid_categorical_impute_strategies:
            raise ValueError(
                f"{categorical_impute_strategy} is an invalid parameter. Valid categorical impute strategies are {', '.join(self._valid_numeric_impute_strategies)}",
            )
        elif numeric_impute_strategy not in self._valid_numeric_impute_strategies:
            raise ValueError(
                f"{numeric_impute_strategy} is an invalid parameter. Valid numeric impute strategies are {', '.join(self._valid_numeric_impute_strategies)}",
            )
        elif target_impute_strategy not in self._valid_target_impute_strategies:
            raise ValueError(
                f"{target_impute_strategy} is an invalid parameter. Valid target column impute strategies are {', '.join(self._valid_target_impute_strategies)}",
            )
        parameters = {
            "categorical_impute_strategy": categorical_impute_strategy,
            "numeric_impute_strategy": numeric_impute_strategy,
            "target_impute_strategy": target_impute_strategy,
        }
        parameters.update(kwargs)
        self._all_null_cols = None
        self._forwards_cols = None
        self._backwards_cols = None
        self._interpolate_cols = None
        self._impute_target = None
        self._y_all_null_cols = None
        super().__init__(
            parameters=parameters,
            component_obj=None,
            random_seed=random_seed,
        )
[docs]    def fit(self, X, y=None):
        """Fits imputer to data.
        'None' values are converted to np.nan before imputation and are treated as the same.
        If a value is missing at the beginning or end of a column, that value will be imputed using
        backwards fill or forwards fill as necessary, respectively.
        Args:
            X (pd.DataFrame, np.ndarray): The input training data of shape [n_samples, n_features]
            y (pd.Series, optional): The target training data of length [n_samples]
        Returns:
            self
        """
        X = infer_feature_types(X)
        nan_ratio = X.isna().sum() / X.shape[0]
        self._all_null_cols = nan_ratio[nan_ratio == 1].index.tolist()
        def _filter_cols(impute_strat, X):
            """Function to return which columns of the dataset to impute given the impute strategy."""
            cols = []
            if self.parameters["categorical_impute_strategy"] == impute_strat:
                if self.parameters["numeric_impute_strategy"] == impute_strat:
                    cols = list(X.columns)
                else:
                    cols = list(X.ww.select(exclude=["numeric"]).columns)
            elif self.parameters["numeric_impute_strategy"] == impute_strat:
                cols = list(X.ww.select(include=["numeric"]).columns)
            X_cols = [col for col in cols if col not in self._all_null_cols]
            if len(X_cols) > 0:
                return X_cols
        self._forwards_cols = _filter_cols("forwards_fill", X)
        self._backwards_cols = _filter_cols("backwards_fill", X)
        self._interpolate_cols = _filter_cols("interpolate", X)
        if isinstance(y, pd.Series):
            y = infer_feature_types(y)
            if y.isnull().any():
                self._impute_target = self.parameters["target_impute_strategy"]
        elif isinstance(y, pd.DataFrame):
            y = infer_feature_types(y)
            y_nan_ratio = y.isna().sum() / y.shape[0]
            self._y_all_null_cols = y_nan_ratio[y_nan_ratio == 1].index.tolist()
            if y.isnull().values.any():
                self._impute_target = self.parameters["target_impute_strategy"]
        return self 
    def _handle_nullable_types(self, X=None, y=None):
        """Transforms X and y to remove any incompatible nullable types for the time series imputer when the interpolate method is used.
        Args:
            X (pd.DataFrame, optional): Input data to a component of shape [n_samples, n_features].
                May contain nullable types.
            y (pd.Series or pd.DataFrame, optional): The target of length [n_samples] or the
                unstacked target for a multiseries problem of length [n_samples, n_features*n_series].
                May contain nullable types.
        Returns:
            X, y with any incompatible nullable types downcasted to compatible equivalents when interpolate is used. Is NoOp otherwise.
        """
        if self._impute_target == "interpolate":
            # For BooleanNullable, we have to avoid Categorical columns
            # since the category dtype also has incompatibilities with linear interpolate, which is expected
            # TODO: Avoid categorical columns for BooleanNullable in multiseries when
            #       multiseries timeseries supports categorical
            if isinstance(y, pd.Series) and isinstance(
                y.ww.logical_type,
                BooleanNullable,
            ):
                y = ww.init_series(y, Double)
            else:
                _, y = super()._handle_nullable_types(None, y)
        if self._interpolate_cols is not None:
            X, _ = super()._handle_nullable_types(X, None)
        return X, y