Source code for autogluon.cloud.predictor.timeseries_cloud_predictor

from __future__ import annotations

import logging
import warnings
from pathlib import Path
from typing import Any, Dict, Optional, Union

import pandas as pd

from ..backend.constant import SAGEMAKER, TIMESERIES_SAGEMAKER
from .cloud_predictor import CloudPredictor

logger = logging.getLogger(__name__)


[docs] class TimeSeriesCloudPredictor(CloudPredictor): """Train and deploy AutoGluon time series forecasting models on AWS SageMaker. Wraps :class:`autogluon.timeseries.TimeSeriesPredictor` (`docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.html>`_) and runs ``fit``, ``predict``, and endpoint deployment as managed SageMaker jobs. """ predictor_file_name = "TimeSeriesCloudPredictor.pkl" backend_map = {SAGEMAKER: TIMESERIES_SAGEMAKER} @property def predictor_type(self): """ Type of the underneath AutoGluon Predictor """ return "timeseries" def _get_local_predictor_cls(self): from autogluon.timeseries import TimeSeriesPredictor return TimeSeriesPredictor
[docs] def fit( self, train_data: Optional[Union[str, Path, pd.DataFrame]] = None, *, predictor_init_args: Dict[str, Any], predictor_fit_args: Optional[Dict[str, Any]] = None, tuning_data: Optional[Union[str, Path, pd.DataFrame]] = None, static_features: Optional[Union[str, Path, pd.DataFrame]] = None, id_column: str = "item_id", timestamp_column: str = "timestamp", framework_version: str = "latest", job_name: Optional[str] = None, instance_type: str = "ml.m5.2xlarge", instance_count: int = 1, volume_size: int = 100, custom_image_uri: Optional[str] = None, wait: bool = True, backend_kwargs: Optional[Dict] = None, known_covariates: Optional[Union[str, Path, pd.DataFrame]] = None, ) -> TimeSeriesCloudPredictor: """ Fit the predictor with SageMaker. This function will first upload necessary config and train data to s3 bucket. Then launch a SageMaker training job with the AutoGluon training container. Parameters ---------- train_data: Union[str, pathlib.Path, pd.DataFrame] Training time series in long format, as a DataFrame or local/S3 path to a data file. See the `TimeSeriesPredictor.fit docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.fit.html>`_ for the expected format. predictor_init_args: dict Arguments forwarded to ``TimeSeriesPredictor()``. See the `TimeSeriesPredictor docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.html>`_ for available options (e.g. ``target``, ``prediction_length``, ``freq``, ``eval_metric``, ``quantile_levels``, ``known_covariates_names``). predictor_fit_args: Optional[dict], default = None Additional fit args forwarded to ``TimeSeriesPredictor.fit()``. See the `TimeSeriesPredictor.fit docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.fit.html>`_ for available options. Must NOT contain ``train_data`` or ``tuning_data`` — pass those as explicit arguments above. tuning_data: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None Optional tuning data in long format, as a DataFrame or local/S3 path to a data file. static_features: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None Static (time-independent) features describing each individual time series. id_column: str, default = "item_id" Name of the column with the unique identifier of each time series (item). timestamp_column: str, default = "timestamp" Name of the column with the observation timestamps. framework_version: str, default = `latest` Training container version of autogluon. If `latest`, will use the latest available container version. If provided a specific version, will use this version. If `custom_image_uri` is set, this argument will be ignored. job_name: str, default = None Name of the launched training job. If None, CloudPredictor will create one with prefix ag-cloudpredictor instance_type: str, default = 'ml.m5.2xlarge' Instance type the predictor will be trained on with SageMaker. instance_count: int, default = 1 Number of instance used to fit the predictor. volumes_size: int, default = 30 Size in GB of the EBS volume to use for storing input data during training (default: 30). Must be large enough to store training data if File Mode is used (which is the default). wait: bool, default = True Whether the call should wait until the job completes To be noticed, the function won't return immediately because there are some preparations needed prior fit. Use `get_fit_job_status` to get job status. backend_kwargs: dict, default = None Any extra arguments needed to pass to the underneath backend. For SageMaker backend, valid keys are: 1. autogluon_sagemaker_estimator_kwargs Any extra arguments needed to initialize AutoGluonSagemakerEstimator Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator for all options 2. fit_kwargs Any extra arguments needed to pass to fit. Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator.fit for all options Returns ------- `TimeSeriesCloudPredictor` object. Returns self. """ assert not self.backend.is_fit, ( "Predictor is already fit! To fit additional models, create a new `CloudPredictor`" ) if backend_kwargs is None: backend_kwargs = {} predictor_fit_args = {} if predictor_fit_args is None else dict(predictor_fit_args) data_channels = { "train_data": train_data, "tuning_data": tuning_data, "known_covariates": known_covariates, "static_features": static_features, } for key in ("train_data", "tuning_data", "known_covariates"): if key in predictor_fit_args: warnings.warn( f"Passing `{key}` via `predictor_fit_args` is deprecated and will be removed in autogluon.cloud 0.6.0. " f"Pass `{key}` as an explicit argument to `fit()` instead.", FutureWarning, stacklevel=2, ) if data_channels[key] is None: data_channels[key] = predictor_fit_args.pop(key) else: raise TypeError( f"`{key}` was passed both as an explicit argument and via `predictor_fit_args`. " f"Pass it only as an explicit argument." ) if data_channels["train_data"] is None: raise TypeError("fit() missing required argument: 'train_data'") backend_kwargs = self.backend.parse_backend_fit_kwargs(backend_kwargs) self.backend.fit( predictor_init_args=predictor_init_args, predictor_fit_args=predictor_fit_args, data_channels=data_channels, id_column=id_column, timestamp_column=timestamp_column, framework_version=framework_version, job_name=job_name, instance_type=instance_type, instance_count=instance_count, volume_size=volume_size, custom_image_uri=custom_image_uri, wait=wait, **backend_kwargs, ) return self
[docs] def predict_real_time( self, data: Union[str, pd.DataFrame], static_features: Optional[Union[str, pd.DataFrame]] = None, known_covariates: Optional[pd.DataFrame] = None, accept: str = "application/x-parquet", **kwargs, ) -> pd.DataFrame: """ Predict with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required. This is intended to provide a low latency inference. If you want to inference on a large dataset, use `predict()` instead. ``data`` must use the same ``id_column`` / ``timestamp_column`` names that were passed to ``fit()``. Parameters ---------- data: Union(str, pandas.DataFrame) Historical time series to forecast from, in long format, as a DataFrame or local/S3 path to a data file. static_features: Optional[pd.DataFrame] Static (time-independent) features describing each individual time series. known_covariates : Optional[pd.DataFrame] Future values of the known covariates over the forecast horizon. Must be provided if ``known_covariates_names`` was specified at fit time. accept: str, default = application/x-parquet Type of accept output content. Valid options are application/x-parquet, text/csv, application/json kwargs: Additional args that you would pass to `predict` calls of an AutoGluon logic Returns ------- Pandas.DataFrame Predict results in DataFrame """ return self.backend.predict_real_time( test_data=data, static_features=static_features, known_covariates=known_covariates, accept=accept, inference_kwargs=kwargs, )
def predict_proba_real_time(self, **kwargs) -> pd.DataFrame: """ :meta private: """ raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
[docs] def predict( self, data: Union[str, pd.DataFrame], static_features: Optional[Union[str, pd.DataFrame]] = None, known_covariates: Optional[Union[str, pd.DataFrame]] = None, predictor_path: Optional[str] = None, framework_version: str = "latest", job_name: Optional[str] = None, instance_type: str = "ml.m5.2xlarge", instance_count: int = 1, custom_image_uri: Optional[str] = None, wait: bool = True, backend_kwargs: Optional[Dict] = None, ) -> Optional[pd.DataFrame]: """ Predict using SageMaker batch transform. When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. If you want to minimize latency, use `predict_real_time()` instead. To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, then create a transformer with it, and call transform in the end. ``data`` must use the same ``id_column`` / ``timestamp_column`` names that were passed to ``fit()``. Parameters ---------- data: Union(str, pandas.DataFrame) Historical time series to forecast from, in long format, as a DataFrame or local/S3 path to a data file. static_features: Optional[Union[str, pd.DataFrame]] Static (time-independent) features describing each individual time series. known_covariates: Optional[Union[str, pd.DataFrame]] Future values of the known covariates over the forecast horizon. Must be provided if ``known_covariates_names`` was specified at fit time. predictor_path: str Path to the predictor tarball you want to use to predict. Path can be both a local path or a S3 location. If None, will use the most recent trained predictor trained with `fit()`. framework_version: str, default = `latest` Inference container version of autogluon. If `latest`, will use the latest available container version. If provided a specific version, will use this version. If `custom_image_uri` is set, this argument will be ignored. job_name: str, default = None Name of the launched training job. If None, CloudPredictor will create one with prefix ag-cloudpredictor. instance_count: int, default = 1, Number of instances used to do batch transform. instance_type: str, default = 'ml.m5.2xlarge' Instance to be used for batch transform. wait: bool, default = True Whether to wait for batch transform to complete. To be noticed, the function won't return immediately because there are some preparations needed prior transform. backend_kwargs: dict, default = None Any extra arguments needed to pass to the underneath backend. For SageMaker backend, valid keys are: 1. download: bool, default = True Whether to download the batch transform results to the disk and load it after the batch transform finishes. Will be ignored if `wait` is `False`. 2. persist: bool, default = True Whether to persist the downloaded batch transform results on the disk. Will be ignored if `download` is `False` 3. save_path: str, default = None, Path to save the downloaded result. Will be ignored if `download` is `False`. If None, CloudPredictor will create one. If `persist` is `False`, file would first be downloaded to this path and then removed. 4. model_kwargs: dict, default = dict() Any extra arguments needed to initialize Sagemaker Model Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#model for all options 5. transformer_kwargs: dict Any extra arguments needed to pass to transformer. Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer for all options. 6. transform_kwargs: Any extra arguments needed to pass to transform. Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer.transform for all options. """ if backend_kwargs is None: backend_kwargs = {} backend_kwargs = self.backend.parse_backend_predict_kwargs(backend_kwargs) return self.backend.predict( test_data=data, static_features=static_features, known_covariates=known_covariates, predictor_path=predictor_path, framework_version=framework_version, job_name=job_name, instance_type=instance_type, instance_count=instance_count, custom_image_uri=custom_image_uri, wait=wait, **backend_kwargs, )
def predict_proba( self, **kwargs, ) -> Optional[pd.DataFrame]: """ :meta private: """ raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
[docs] def fit_predict( self, train_data: Union[str, Path, pd.DataFrame], *, predictor_init_args: Dict[str, Any], predictor_fit_args: Optional[Dict[str, Any]] = None, known_covariates: Optional[Union[str, Path, pd.DataFrame]] = None, static_features: Optional[Union[str, Path, pd.DataFrame]] = None, id_column: str = "item_id", timestamp_column: str = "timestamp", framework_version: str = "latest", job_name: Optional[str] = None, instance_type: str = "ml.m5.2xlarge", instance_count: int = 1, volume_size: int = 100, custom_image_uri: Optional[str] = None, wait: bool = True, predictions_path: Optional[str] = None, backend_kwargs: Optional[Dict] = None, ) -> Optional[pd.DataFrame]: """ Fit and predict in a single SageMaker training job. This is useful for foundation-model forecasting workflows (e.g. Chronos-2) where "fit" is essentially loading a pretrained model. Running fit and predict in the same job avoids the SageMaker startup overhead twice. Predictions are generated inside the training container against ``train_data`` (the standard time-series forecasting flow where the last ``prediction_length`` steps of each series are forecast) and written directly to S3. Parameters ---------- train_data: Union[str, pathlib.Path, pd.DataFrame] Historical time series to train on and forecast from, in long format, as a DataFrame or local/S3 path to a data file. predictor_init_args: dict Arguments forwarded to ``TimeSeriesPredictor()``. Must include ``prediction_length``. See the `TimeSeriesPredictor docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.html>`_ for available options. predictor_fit_args: Optional[dict], default = None Additional fit args forwarded to ``TimeSeriesPredictor.fit()``. See the `TimeSeriesPredictor.fit docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.fit.html>`_ for available options. Must NOT contain ``train_data``, ``tuning_data``, or ``known_covariates`` — pass those as explicit arguments above. known_covariates: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None Future values of the known covariates over the forecast horizon. Must be provided if ``known_covariates_names`` was specified in ``predictor_init_args``. static_features: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None Static (time-independent) features describing each individual time series. id_column: str, default = "item_id" Name of the column with the unique identifier of each time series (item). timestamp_column: str, default = "timestamp" Name of the column with the observation timestamps. framework_version, job_name, instance_type, instance_count, volume_size, custom_image_uri, wait, backend_kwargs: Same semantics as ``fit()``. predictions_path: Optional[str] S3 URL where predictions will be written by the training container (e.g. ``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution role must have ``s3:PutObject`` permission for this location. Defaults to ``{cloud_output_path}/{job_name}/predictions.csv``. Predictions use AutoGluon's canonical column names ``item_id`` and ``timestamp``, regardless of the ``id_column`` / ``timestamp_column`` passed in. Returns ------- Optional[pd.DataFrame] Predictions as a DataFrame. Returns ``None`` when ``wait`` is False. """ if backend_kwargs is None: backend_kwargs = {} else: backend_kwargs = dict(backend_kwargs) extra_ag_args = {"predict_after_fit": True} if predictions_path is not None: extra_ag_args["predictions_path"] = predictions_path backend_kwargs["extra_ag_args"] = extra_ag_args self.fit( train_data=train_data, known_covariates=known_covariates, static_features=static_features, predictor_init_args=predictor_init_args, predictor_fit_args=predictor_fit_args, id_column=id_column, timestamp_column=timestamp_column, framework_version=framework_version, job_name=job_name, instance_type=instance_type, instance_count=instance_count, volume_size=volume_size, custom_image_uri=custom_image_uri, wait=wait, backend_kwargs=backend_kwargs, ) if not wait: logger.info( "fit_predict job launched asynchronously. Use `get_fit_job_status()` " "to poll, then `get_fit_predict_results()` to fetch predictions." ) return None return self.get_fit_predict_results()
[docs] def get_fit_predict_results(self) -> pd.DataFrame: """ Retrieve predictions produced by a completed ``fit_predict`` job. Returns ------- pd.DataFrame Predictions for the forecast horizon. """ return self.backend.get_fit_predict_results()