from __future__ import annotations
import logging
import warnings
from pathlib import Path
from typing import Any, Dict, Optional, Union
import pandas as pd
from ..backend.constant import SAGEMAKER, TIMESERIES_SAGEMAKER
from .cloud_predictor import CloudPredictor
logger = logging.getLogger(__name__)
[docs]
class TimeSeriesCloudPredictor(CloudPredictor):
"""Train and deploy AutoGluon time series forecasting models on AWS SageMaker.
Wraps :class:`autogluon.timeseries.TimeSeriesPredictor` (`docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.html>`_)
and runs ``fit``, ``predict``, and endpoint deployment as managed SageMaker jobs.
"""
predictor_file_name = "TimeSeriesCloudPredictor.pkl"
backend_map = {SAGEMAKER: TIMESERIES_SAGEMAKER}
@property
def predictor_type(self):
"""
Type of the underneath AutoGluon Predictor
"""
return "timeseries"
def _get_local_predictor_cls(self):
from autogluon.timeseries import TimeSeriesPredictor
return TimeSeriesPredictor
[docs]
def fit(
self,
train_data: Optional[Union[str, Path, pd.DataFrame]] = None,
*,
predictor_init_args: Dict[str, Any],
predictor_fit_args: Optional[Dict[str, Any]] = None,
tuning_data: Optional[Union[str, Path, pd.DataFrame]] = None,
static_features: Optional[Union[str, Path, pd.DataFrame]] = None,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
volume_size: int = 100,
custom_image_uri: Optional[str] = None,
wait: bool = True,
backend_kwargs: Optional[Dict] = None,
known_covariates: Optional[Union[str, Path, pd.DataFrame]] = None,
) -> TimeSeriesCloudPredictor:
"""
Fit the predictor with SageMaker.
This function will first upload necessary config and train data to s3 bucket.
Then launch a SageMaker training job with the AutoGluon training container.
Parameters
----------
train_data: Union[str, pathlib.Path, pd.DataFrame]
Training time series in long format, as a DataFrame or local/S3 path to a data file.
See the `TimeSeriesPredictor.fit docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.fit.html>`_
for the expected format.
predictor_init_args: dict
Arguments forwarded to ``TimeSeriesPredictor()``. See the
`TimeSeriesPredictor docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.html>`_
for available options (e.g. ``target``, ``prediction_length``, ``freq``, ``eval_metric``,
``quantile_levels``, ``known_covariates_names``).
predictor_fit_args: Optional[dict], default = None
Additional fit args forwarded to ``TimeSeriesPredictor.fit()``. See the
`TimeSeriesPredictor.fit docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.fit.html>`_
for available options. Must NOT contain ``train_data`` or ``tuning_data`` — pass those as
explicit arguments above.
tuning_data: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None
Optional tuning data in long format, as a DataFrame or local/S3 path to a data file.
static_features: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None
Static (time-independent) features describing each individual time series.
id_column: str, default = "item_id"
Name of the column with the unique identifier of each time series (item).
timestamp_column: str, default = "timestamp"
Name of the column with the observation timestamps.
framework_version: str, default = `latest`
Training container version of autogluon.
If `latest`, will use the latest available container version.
If provided a specific version, will use this version.
If `custom_image_uri` is set, this argument will be ignored.
job_name: str, default = None
Name of the launched training job.
If None, CloudPredictor will create one with prefix ag-cloudpredictor
instance_type: str, default = 'ml.m5.2xlarge'
Instance type the predictor will be trained on with SageMaker.
instance_count: int, default = 1
Number of instance used to fit the predictor.
volumes_size: int, default = 30
Size in GB of the EBS volume to use for storing input data during training (default: 30).
Must be large enough to store training data if File Mode is used (which is the default).
wait: bool, default = True
Whether the call should wait until the job completes
To be noticed, the function won't return immediately because there are some preparations needed prior fit.
Use `get_fit_job_status` to get job status.
backend_kwargs: dict, default = None
Any extra arguments needed to pass to the underneath backend.
For SageMaker backend, valid keys are:
1. autogluon_sagemaker_estimator_kwargs
Any extra arguments needed to initialize AutoGluonSagemakerEstimator
Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator for all options
2. fit_kwargs
Any extra arguments needed to pass to fit.
Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator.fit for all options
Returns
-------
`TimeSeriesCloudPredictor` object. Returns self.
"""
assert not self.backend.is_fit, (
"Predictor is already fit! To fit additional models, create a new `CloudPredictor`"
)
if backend_kwargs is None:
backend_kwargs = {}
predictor_fit_args = {} if predictor_fit_args is None else dict(predictor_fit_args)
data_channels = {
"train_data": train_data,
"tuning_data": tuning_data,
"known_covariates": known_covariates,
"static_features": static_features,
}
for key in ("train_data", "tuning_data", "known_covariates"):
if key in predictor_fit_args:
warnings.warn(
f"Passing `{key}` via `predictor_fit_args` is deprecated and will be removed in autogluon.cloud 0.6.0. "
f"Pass `{key}` as an explicit argument to `fit()` instead.",
FutureWarning,
stacklevel=2,
)
if data_channels[key] is None:
data_channels[key] = predictor_fit_args.pop(key)
else:
raise TypeError(
f"`{key}` was passed both as an explicit argument and via `predictor_fit_args`. "
f"Pass it only as an explicit argument."
)
if data_channels["train_data"] is None:
raise TypeError("fit() missing required argument: 'train_data'")
backend_kwargs = self.backend.parse_backend_fit_kwargs(backend_kwargs)
self.backend.fit(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
data_channels=data_channels,
id_column=id_column,
timestamp_column=timestamp_column,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
volume_size=volume_size,
custom_image_uri=custom_image_uri,
wait=wait,
**backend_kwargs,
)
return self
[docs]
def predict_real_time(
self,
data: Union[str, pd.DataFrame],
static_features: Optional[Union[str, pd.DataFrame]] = None,
known_covariates: Optional[pd.DataFrame] = None,
accept: str = "application/x-parquet",
**kwargs,
) -> pd.DataFrame:
"""
Predict with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required.
This is intended to provide a low latency inference.
If you want to inference on a large dataset, use `predict()` instead.
``data`` must use the same ``id_column`` / ``timestamp_column`` names that were passed to ``fit()``.
Parameters
----------
data: Union(str, pandas.DataFrame)
Historical time series to forecast from, in long format, as a DataFrame or local/S3 path to
a data file.
static_features: Optional[pd.DataFrame]
Static (time-independent) features describing each individual time series.
known_covariates : Optional[pd.DataFrame]
Future values of the known covariates over the forecast horizon. Must be provided if
``known_covariates_names`` was specified at fit time.
accept: str, default = application/x-parquet
Type of accept output content.
Valid options are application/x-parquet, text/csv, application/json
kwargs:
Additional args that you would pass to `predict` calls of an AutoGluon logic
Returns
-------
Pandas.DataFrame
Predict results in DataFrame
"""
return self.backend.predict_real_time(
test_data=data,
static_features=static_features,
known_covariates=known_covariates,
accept=accept,
inference_kwargs=kwargs,
)
def predict_proba_real_time(self, **kwargs) -> pd.DataFrame:
"""
:meta private:
"""
raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
[docs]
def predict(
self,
data: Union[str, pd.DataFrame],
static_features: Optional[Union[str, pd.DataFrame]] = None,
known_covariates: Optional[Union[str, pd.DataFrame]] = None,
predictor_path: Optional[str] = None,
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
custom_image_uri: Optional[str] = None,
wait: bool = True,
backend_kwargs: Optional[Dict] = None,
) -> Optional[pd.DataFrame]:
"""
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
``data`` must use the same ``id_column`` / ``timestamp_column`` names that were passed to ``fit()``.
Parameters
----------
data: Union(str, pandas.DataFrame)
Historical time series to forecast from, in long format, as a DataFrame or local/S3 path to
a data file.
static_features: Optional[Union[str, pd.DataFrame]]
Static (time-independent) features describing each individual time series.
known_covariates: Optional[Union[str, pd.DataFrame]]
Future values of the known covariates over the forecast horizon. Must be provided if
``known_covariates_names`` was specified at fit time.
predictor_path: str
Path to the predictor tarball you want to use to predict.
Path can be both a local path or a S3 location.
If None, will use the most recent trained predictor trained with `fit()`.
framework_version: str, default = `latest`
Inference container version of autogluon.
If `latest`, will use the latest available container version.
If provided a specific version, will use this version.
If `custom_image_uri` is set, this argument will be ignored.
job_name: str, default = None
Name of the launched training job.
If None, CloudPredictor will create one with prefix ag-cloudpredictor.
instance_count: int, default = 1,
Number of instances used to do batch transform.
instance_type: str, default = 'ml.m5.2xlarge'
Instance to be used for batch transform.
wait: bool, default = True
Whether to wait for batch transform to complete.
To be noticed, the function won't return immediately because there are some preparations needed prior transform.
backend_kwargs: dict, default = None
Any extra arguments needed to pass to the underneath backend.
For SageMaker backend, valid keys are:
1. download: bool, default = True
Whether to download the batch transform results to the disk and load it after the batch transform finishes.
Will be ignored if `wait` is `False`.
2. persist: bool, default = True
Whether to persist the downloaded batch transform results on the disk.
Will be ignored if `download` is `False`
3. save_path: str, default = None,
Path to save the downloaded result.
Will be ignored if `download` is `False`.
If None, CloudPredictor will create one.
If `persist` is `False`, file would first be downloaded to this path and then removed.
4. model_kwargs: dict, default = dict()
Any extra arguments needed to initialize Sagemaker Model
Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#model for all options
5. transformer_kwargs: dict
Any extra arguments needed to pass to transformer.
Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer for all options.
6. transform_kwargs:
Any extra arguments needed to pass to transform.
Please refer to
https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer.transform for all options.
"""
if backend_kwargs is None:
backend_kwargs = {}
backend_kwargs = self.backend.parse_backend_predict_kwargs(backend_kwargs)
return self.backend.predict(
test_data=data,
static_features=static_features,
known_covariates=known_covariates,
predictor_path=predictor_path,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
custom_image_uri=custom_image_uri,
wait=wait,
**backend_kwargs,
)
def predict_proba(
self,
**kwargs,
) -> Optional[pd.DataFrame]:
"""
:meta private:
"""
raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
[docs]
def fit_predict(
self,
train_data: Union[str, Path, pd.DataFrame],
*,
predictor_init_args: Dict[str, Any],
predictor_fit_args: Optional[Dict[str, Any]] = None,
known_covariates: Optional[Union[str, Path, pd.DataFrame]] = None,
static_features: Optional[Union[str, Path, pd.DataFrame]] = None,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
volume_size: int = 100,
custom_image_uri: Optional[str] = None,
wait: bool = True,
predictions_path: Optional[str] = None,
backend_kwargs: Optional[Dict] = None,
) -> Optional[pd.DataFrame]:
"""
Fit and predict in a single SageMaker training job.
This is useful for foundation-model forecasting workflows (e.g. Chronos-2) where "fit" is essentially loading
a pretrained model. Running fit and predict in the same job avoids the SageMaker startup overhead twice.
Predictions are generated inside the training container against ``train_data`` (the standard time-series
forecasting flow where the last ``prediction_length`` steps of each series are forecast) and written
directly to S3.
Parameters
----------
train_data: Union[str, pathlib.Path, pd.DataFrame]
Historical time series to train on and forecast from, in long format, as a DataFrame or
local/S3 path to a data file.
predictor_init_args: dict
Arguments forwarded to ``TimeSeriesPredictor()``. Must include ``prediction_length``. See the
`TimeSeriesPredictor docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.html>`_
for available options.
predictor_fit_args: Optional[dict], default = None
Additional fit args forwarded to ``TimeSeriesPredictor.fit()``. See the
`TimeSeriesPredictor.fit docs <https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.fit.html>`_
for available options. Must NOT contain ``train_data``, ``tuning_data``, or
``known_covariates`` — pass those as explicit arguments above.
known_covariates: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None
Future values of the known covariates over the forecast horizon. Must be provided if
``known_covariates_names`` was specified in ``predictor_init_args``.
static_features: Optional[Union[str, pathlib.Path, pd.DataFrame]], default = None
Static (time-independent) features describing each individual time series.
id_column: str, default = "item_id"
Name of the column with the unique identifier of each time series (item).
timestamp_column: str, default = "timestamp"
Name of the column with the observation timestamps.
framework_version, job_name, instance_type, instance_count, volume_size, custom_image_uri, wait,
backend_kwargs:
Same semantics as ``fit()``.
predictions_path: Optional[str]
S3 URL where predictions will be written by the training container (e.g.
``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution role must
have ``s3:PutObject`` permission for this location. Defaults to
``{cloud_output_path}/{job_name}/predictions.csv``. Predictions use AutoGluon's canonical column
names ``item_id`` and ``timestamp``, regardless of the ``id_column`` / ``timestamp_column`` passed in.
Returns
-------
Optional[pd.DataFrame]
Predictions as a DataFrame. Returns ``None`` when ``wait`` is False.
"""
if backend_kwargs is None:
backend_kwargs = {}
else:
backend_kwargs = dict(backend_kwargs)
extra_ag_args = {"predict_after_fit": True}
if predictions_path is not None:
extra_ag_args["predictions_path"] = predictions_path
backend_kwargs["extra_ag_args"] = extra_ag_args
self.fit(
train_data=train_data,
known_covariates=known_covariates,
static_features=static_features,
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
id_column=id_column,
timestamp_column=timestamp_column,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
volume_size=volume_size,
custom_image_uri=custom_image_uri,
wait=wait,
backend_kwargs=backend_kwargs,
)
if not wait:
logger.info(
"fit_predict job launched asynchronously. Use `get_fit_job_status()` "
"to poll, then `get_fit_predict_results()` to fetch predictions."
)
return None
return self.get_fit_predict_results()
[docs]
def get_fit_predict_results(self) -> pd.DataFrame:
"""
Retrieve predictions produced by a completed ``fit_predict`` job.
Returns
-------
pd.DataFrame
Predictions for the forecast horizon.
"""
return self.backend.get_fit_predict_results()