"""Autoregressive Integrated Moving Average Model. The three parameters (p, d, q) are the AR order, the degree of differencing, and the MA order. More information here: https://www.statsmodels.org/devel/generated/statsmodels.tsa.arima.model.ARIMA.html."""
from typing import Dict, Hashable, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from pandas.api.types import is_integer_dtype
from skopt.space import Integer
from sktime.forecasting.base import ForecastingHorizon
from evalml.model_family import ModelFamily
from evalml.pipelines.components.estimators import Estimator
from evalml.pipelines.components.utils import convert_bool_to_double, match_indices
from evalml.problem_types import ProblemTypes
from evalml.utils import (
    import_or_raise,
    infer_feature_types,
)
[docs]class ARIMARegressor(Estimator):
    """Autoregressive Integrated Moving Average Model. The three parameters (p, d, q) are the AR order, the degree of differencing, and the MA order. More information here: https://www.statsmodels.org/devel/generated/statsmodels.tsa.arima.model.ARIMA.html.
    Currently ARIMARegressor isn't supported via conda install. It's recommended that it be installed via PyPI.
    Args:
        time_index (str): Specifies the name of the column in X that provides the datetime objects. Defaults to None.
        trend (str): Controls the deterministic trend. Options are ['n', 'c', 't', 'ct'] where 'c' is a constant term,
            't' indicates a linear trend, and 'ct' is both. Can also be an iterable when defining a polynomial, such
            as [1, 1, 0, 1].
        start_p (int): Minimum Autoregressive order. Defaults to 2.
        d (int): Minimum Differencing degree. Defaults to 0.
        start_q (int): Minimum Moving Average order. Defaults to 2.
        max_p (int): Maximum Autoregressive order. Defaults to 5.
        max_d (int): Maximum Differencing degree. Defaults to 2.
        max_q (int): Maximum Moving Average order. Defaults to 5.
        seasonal (boolean): Whether to fit a seasonal model to ARIMA. Defaults to True.
        sp (int or str): Period for seasonal differencing, specifically the number of periods in each season. If "detect", this
            model will automatically detect this parameter (given the time series is a standard frequency) and will fall
            back to 1 (no seasonality) if it cannot be detected. Defaults to 1.
        n_jobs (int or None): Non-negative integer describing level of parallelism used for pipelines. Defaults to -1.
        random_seed (int): Seed for the random number generator. Defaults to 0.
    """
    name = "ARIMA Regressor"
    hyperparameter_ranges = {
        "start_p": Integer(1, 3),
        "d": Integer(0, 2),
        "start_q": Integer(1, 3),
        "max_p": Integer(3, 10),
        "max_d": Integer(2, 5),
        "max_q": Integer(3, 10),
        "seasonal": [True, False],
    }
    """{
        "start_p": Integer(1, 3),
        "d": Integer(0, 2),
        "start_q": Integer(1, 3),
        "max_p": Integer(3, 10),
        "max_d": Integer(2, 5),
        "max_q": Integer(3, 10),
        "seasonal": [True, False],
    }"""
    model_family = ModelFamily.ARIMA
    """ModelFamily.ARIMA"""
    supported_problem_types = [ProblemTypes.TIME_SERIES_REGRESSION]
    """[ProblemTypes.TIME_SERIES_REGRESSION]"""
    max_rows = 1000
    max_cols = 7
    def __init__(
        self,
        time_index: Optional[Hashable] = None,
        trend: Optional[str] = None,
        start_p: int = 2,
        d: int = 0,
        start_q: int = 2,
        max_p: int = 5,
        max_d: int = 2,
        max_q: int = 5,
        seasonal: bool = True,
        sp: int = 1,
        n_jobs: int = -1,
        random_seed: Union[int, float] = 0,
        maxiter: int = 10,
        use_covariates: bool = True,
        **kwargs,
    ):
        self.preds_95_upper = None
        self.preds_95_lower = None
        parameters = {
            "trend": trend,
            "start_p": start_p,
            "d": d,
            "start_q": start_q,
            "max_p": max_p,
            "max_d": max_d,
            "max_q": max_q,
            "seasonal": seasonal,
            "maxiter": maxiter,
            "n_jobs": n_jobs,
        }
        parameters.update(kwargs)
        arima_model_msg = (
            "sktime is not installed. Please install using `pip install sktime.`"
        )
        sktime_arima = import_or_raise(
            "sktime.forecasting.arima",
            error_msg=arima_model_msg,
        )
        arima_model = sktime_arima.AutoARIMA(**parameters)
        parameters["use_covariates"] = use_covariates
        parameters["time_index"] = time_index
        self.sp = sp
        self.use_covariates = use_covariates
        super().__init__(
            parameters=parameters,
            component_obj=arima_model,
            random_seed=random_seed,
        )
    def _remove_datetime(
        self,
        data: pd.DataFrame,
        features: bool = False,
    ) -> pd.DataFrame:
        if data is None:
            return None
        data_no_dt = data.ww.copy()
        if isinstance(
            data_no_dt.index,
            (pd.DatetimeIndex, pd.PeriodIndex, pd.IntervalIndex),
        ):
            data_no_dt = data_no_dt.ww.reset_index(drop=True)
        if features:
            data_no_dt = data_no_dt.ww.select(exclude=["Datetime"])
        return data_no_dt
    def _set_forecast(self, X: pd.DataFrame):
        # we can only calculate the difference if the indices are of the same type
        units_diff = 1
        if isinstance(X.index[0], type(self.last_X_index)) and isinstance(
            X.index,
            pd.DatetimeIndex,
        ):
            dates_diff = pd.date_range(
                start=self.last_X_index,
                end=X.index[0],
                freq=X.index.freq,
            )
            units_diff = len(dates_diff) - 1
        elif is_integer_dtype(type(X.index[0])) and is_integer_dtype(
            type(self.last_X_index),
        ):
            units_diff = X.index[0] - self.last_X_index
        fh_ = ForecastingHorizon(
            [units_diff + i for i in range(len(X))],
            is_relative=True,
        )
        return fh_
    def _get_sp(self, X: pd.DataFrame) -> int:
        if X is None:
            return 1
        freq_mappings = {
            "D": 7,
            "M": 12,
            "Q": 4,
        }
        time_index = self._parameters.get("time_index", None)
        sp = self.sp
        if sp == "detect":
            inferred_freqs = X.ww.infer_temporal_frequencies()
            freq = inferred_freqs.get(time_index, None)
            sp = 1
            if freq is not None:
                sp = freq_mappings.get(freq[:1], 1)
        return sp
[docs]    def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None):
        """Fits ARIMA regressor to data.
        Args:
            X (pd.DataFrame): The input training data of shape [n_samples, n_features].
            y (pd.Series): The target training data of length [n_samples].
        Returns:
            self
        Raises:
            ValueError: If y was not passed in.
        """
        X, y = self._manage_woodwork(X, y)
        if X is not None:
            X = X.ww.fillna(X.mean())
        if y is None:
            raise ValueError("ARIMA Regressor requires y as input.")
        sp = self._get_sp(X)
        self._component_obj.sp = sp
        self.last_X_index = X.index[-1] if X is not None else y.index[-1]
        X = self._remove_datetime(X, features=True)
        if X is not None:
            X = convert_bool_to_double(X)
        y = self._remove_datetime(y)
        X, y = match_indices(X, y)
        if X is not None and not X.empty and self.use_covariates:
            self._component_obj.fit(y=y, X=X)
        else:
            self._component_obj.fit(y=y)
        return self 
    def _manage_types_and_forecast(self, X: pd.DataFrame) -> tuple:
        fh_ = self._set_forecast(X)
        X = X.ww.select(exclude=["Datetime"])
        X = convert_bool_to_double(X)
        return X, fh_
    @staticmethod
    def _parse_prediction_intervals(
        y_pred_intervals: pd.DataFrame,
        conf_int: float,
    ) -> Tuple[pd.Series, pd.Series]:
        coverage_name = y_pred_intervals.columns[0][0]
        preds_lower = y_pred_intervals.loc(axis=1)[(coverage_name, conf_int, "lower")]
        preds_upper = y_pred_intervals.loc(axis=1)[(coverage_name, conf_int, "upper")]
        preds_lower.name = None
        preds_upper.name = None
        return preds_lower, preds_upper
[docs]    def predict(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> pd.Series:
        """Make predictions using fitted ARIMA regressor.
        Args:
            X (pd.DataFrame): Data of shape [n_samples, n_features].
            y (pd.Series): Target data.
        Returns:
            pd.Series: Predicted values.
        Raises:
            ValueError: If X was passed to `fit` but not passed in `predict`.
        """
        X, y = self._manage_woodwork(X, y)
        X, fh_ = self._manage_types_and_forecast(X=X)
        if not X.empty and self.use_covariates:
            if fh_[0] != 1:
                # pmdarima (which sktime uses under the hood) only forecasts off the training data
                # but sktime circumvents this by predicting everything from the end of training data to the date / periods requested
                # and only returning the values for dates / periods given to sktime. Because of this,
                # pmdarima requires the number of covariate rows to equal the length of the total number of periods (X.shape[0] == fh_[-1]) if covariates are used.
                # We circument this by adding arbitrary rows to the start of X since sktime discards these values when predicting.
                num_rows_diff = fh_[-1] - X.shape[0]
                filler = pd.DataFrame(
                    columns=X.columns,
                    index=range(num_rows_diff),
                ).fillna(0)
                X_ = pd.concat([filler, X], ignore_index=True)
                X_.ww.init(schema=X.ww.schema)
            else:
                X_ = X
            y_pred_intervals = self._component_obj.predict_interval(
                fh=fh_,
                X=X_,
                coverage=[0.95],
            )
        else:
            y_pred_intervals = self._component_obj.predict_interval(
                fh=fh_,
                coverage=[0.95],
            )
        y_pred_intervals.index = X.index
        (
            self.preds_95_lower,
            self.preds_95_upper,
        ) = ARIMARegressor._parse_prediction_intervals(y_pred_intervals, 0.95)
        y_pred = pd.concat((self.preds_95_lower, self.preds_95_upper), axis=1).mean(
            axis=1,
        )
        return infer_feature_types(y_pred) 
[docs]    def get_prediction_intervals(
        self,
        X: pd.DataFrame,
        y: pd.Series = None,
        coverage: List[float] = None,
        predictions: pd.Series = None,
    ) -> Dict[str, pd.Series]:
        """Find the prediction intervals using the fitted ARIMARegressor.
        Args:
            X (pd.DataFrame): Data of shape [n_samples, n_features].
            y (pd.Series): Target data. Optional.
            coverage (list[float]): A list of floats between the values 0 and 1 that the upper and lower bounds of the
                prediction interval should be calculated for.
            predictions (pd.Series): Not used for ARIMA regressor.
        Returns:
            dict: Prediction intervals, keys are in the format {coverage}_lower or {coverage}_upper.
        """
        if coverage is None:
            coverage = [0.95]
        X, y = self._manage_woodwork(X, y)
        X, fh_ = self._manage_types_and_forecast(X=X)
        prediction_interval_result = {}
        if not X.empty and self.use_covariates:
            y_pred_intervals = self._component_obj.predict_interval(
                fh=fh_,
                X=X,
                coverage=coverage,
            )
        else:
            y_pred_intervals = self._component_obj.predict_interval(
                fh=fh_,
                coverage=coverage,
            )
        y_pred_intervals.index = X.index
        for conf_int in coverage:
            if (
                conf_int == 0.95
                and self.preds_95_lower is not None
                and self.preds_95_upper is not None
            ):
                prediction_interval_result[f"{conf_int}_lower"] = self.preds_95_lower
                prediction_interval_result[f"{conf_int}_upper"] = self.preds_95_upper
                continue
            preds_lower, preds_upper = ARIMARegressor._parse_prediction_intervals(
                y_pred_intervals,
                conf_int,
            )
            prediction_interval_result[f"{conf_int}_lower"] = preds_lower
            prediction_interval_result[f"{conf_int}_upper"] = preds_upper
        return prediction_interval_result 
    @property
    def feature_importance(self) -> np.ndarray:
        """Returns array of 0's with a length of 1 as feature_importance is not defined for ARIMA regressor."""
        return np.zeros(1)