from __future__ import annotations
import logging
from typing import Any, Dict, Optional, Union
import pandas as pd
from autogluon.common.utils.s3_utils import is_s3_url
from ..backend.constant import SAGEMAKER, TIMESERIES_SAGEMAKER
from .cloud_predictor import CloudPredictor
logger = logging.getLogger(__name__)
[docs]
class TimeSeriesCloudPredictor(CloudPredictor):
predictor_file_name = "TimeSeriesCloudPredictor.pkl"
backend_map = {SAGEMAKER: TIMESERIES_SAGEMAKER}
@property
def predictor_type(self):
"""
Type of the underneath AutoGluon Predictor
"""
return "timeseries"
def _get_local_predictor_cls(self):
from autogluon.timeseries import TimeSeriesPredictor
return TimeSeriesPredictor
[docs]
def fit(
self,
*,
predictor_init_args: Dict[str, Any],
predictor_fit_args: Dict[str, Any],
id_column: str = "item_id",
timestamp_column: str = "timestamp",
static_features: Optional[Union[str, pd.DataFrame]] = None,
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
volume_size: int = 100,
custom_image_uri: Optional[str] = None,
wait: bool = True,
backend_kwargs: Optional[Dict] = None,
) -> TimeSeriesCloudPredictor:
"""
Fit the predictor with SageMaker.
This function will first upload necessary config and train data to s3 bucket.
Then launch a SageMaker training job with the AutoGluon training container.
Parameters
----------
predictor_init_args: dict
Init args for the predictor
predictor_fit_args: dict
Fit args for the predictor
id_column: str, default = "item_id"
Name of the item ID column
timestamp_column: str, default = "timestamp"
Name of the timestamp column
static_features: Optional[pd.DataFrame]
An optional data frame describing the metadata attributes of individual items in the item index.
For more detail, please refer to `TimeSeriesDataFrame` documentation:
https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesDataFrame.html
framework_version: str, default = `latest`
Training container version of autogluon.
If `latest`, will use the latest available container version.
If provided a specific version, will use this version.
If `custom_image_uri` is set, this argument will be ignored.
job_name: str, default = None
Name of the launched training job.
If None, CloudPredictor will create one with prefix ag-cloudpredictor
instance_type: str, default = 'ml.m5.2xlarge'
Instance type the predictor will be trained on with SageMaker.
instance_count: int, default = 1
Number of instance used to fit the predictor.
volumes_size: int, default = 30
Size in GB of the EBS volume to use for storing input data during training (default: 30).
Must be large enough to store training data if File Mode is used (which is the default).
wait: bool, default = True
Whether the call should wait until the job completes
To be noticed, the function won't return immediately because there are some preparations needed prior fit.
Use `get_fit_job_status` to get job status.
backend_kwargs: dict, default = None
Any extra arguments needed to pass to the underneath backend.
For SageMaker backend, valid keys are:
1. autogluon_sagemaker_estimator_kwargs
Any extra arguments needed to initialize AutoGluonSagemakerEstimator
Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator for all options
2. fit_kwargs
Any extra arguments needed to pass to fit.
Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator.fit for all options
Returns
-------
`TimeSeriesCloudPredictor` object. Returns self.
"""
assert not self.backend.is_fit, (
"Predictor is already fit! To fit additional models, create a new `CloudPredictor`"
)
if backend_kwargs is None:
backend_kwargs = {}
predictor_fit_args = dict(predictor_fit_args)
data_channels = {
"train_data": predictor_fit_args.pop("train_data"),
"tuning_data": predictor_fit_args.pop("tuning_data", None),
"known_covariates": predictor_fit_args.pop("known_covariates", None),
"static_features": static_features,
}
backend_kwargs = self.backend.parse_backend_fit_kwargs(backend_kwargs)
self.backend.fit(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
data_channels=data_channels,
id_column=id_column,
timestamp_column=timestamp_column,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
volume_size=volume_size,
custom_image_uri=custom_image_uri,
wait=wait,
**backend_kwargs,
)
return self
[docs]
def predict_real_time(
self,
data: Union[str, pd.DataFrame],
static_features: Optional[Union[str, pd.DataFrame]] = None,
known_covariates: Optional[pd.DataFrame] = None,
accept: str = "application/x-parquet",
**kwargs,
) -> pd.DataFrame:
"""
Predict with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required.
This is intended to provide a low latency inference.
If you want to inference on a large dataset, use `predict()` instead.
``data`` must use the same ``id_column`` / ``timestamp_column`` names that were passed to ``fit()``.
Parameters
----------
data: Union(str, pandas.DataFrame)
The data to forecast from.
Can be a pandas.DataFrame or a local path to a csv file.
static_features: Optional[pd.DataFrame]
An optional data frame describing the metadata attributes of individual items in the item index.
For more detail, please refer to `TimeSeriesDataFrame` documentation:
https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesDataFrame.html
known_covariates : Optional[pd.DataFrame]
If ``known_covariates_names`` were specified when creating the predictor, it is necessary to provide the
values of the known covariates for each time series during the forecast horizon.
For more details, please refer to the `TimeSeriesPredictor.predictor` documentation:
https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.predict.html
accept: str, default = application/x-parquet
Type of accept output content.
Valid options are application/x-parquet, text/csv, application/json
kwargs:
Additional args that you would pass to `predict` calls of an AutoGluon logic
Returns
-------
Pandas.DataFrame
Predict results in DataFrame
"""
return self.backend.predict_real_time(
test_data=data,
static_features=static_features,
known_covariates=known_covariates,
accept=accept,
inference_kwargs=kwargs,
)
[docs]
def predict_proba_real_time(self, **kwargs) -> pd.DataFrame:
raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
[docs]
def predict(
self,
data: Union[str, pd.DataFrame],
static_features: Optional[Union[str, pd.DataFrame]] = None,
known_covariates: Optional[Union[str, pd.DataFrame]] = None,
predictor_path: Optional[str] = None,
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
custom_image_uri: Optional[str] = None,
wait: bool = True,
backend_kwargs: Optional[Dict] = None,
) -> Optional[pd.DataFrame]:
"""
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
``data`` must use the same ``id_column`` / ``timestamp_column`` names that were passed to ``fit()``.
Parameters
----------
data: Union(str, pandas.DataFrame)
The data to forecast from.
Can be a pandas.DataFrame or a local path to a csv file.
static_features: Optional[Union[str, pd.DataFrame]]
An optional data frame describing the metadata attributes of individual items in the item index.
For more detail, please refer to `TimeSeriesDataFrame` documentation:
https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesDataFrame.html
known_covariates: Optional[Union[str, pd.DataFrame]]
Future values of the known covariates over the forecast horizon.
predictor_path: str
Path to the predictor tarball you want to use to predict.
Path can be both a local path or a S3 location.
If None, will use the most recent trained predictor trained with `fit()`.
framework_version: str, default = `latest`
Inference container version of autogluon.
If `latest`, will use the latest available container version.
If provided a specific version, will use this version.
If `custom_image_uri` is set, this argument will be ignored.
job_name: str, default = None
Name of the launched training job.
If None, CloudPredictor will create one with prefix ag-cloudpredictor.
instance_count: int, default = 1,
Number of instances used to do batch transform.
instance_type: str, default = 'ml.m5.2xlarge'
Instance to be used for batch transform.
wait: bool, default = True
Whether to wait for batch transform to complete.
To be noticed, the function won't return immediately because there are some preparations needed prior transform.
backend_kwargs: dict, default = None
Any extra arguments needed to pass to the underneath backend.
For SageMaker backend, valid keys are:
1. download: bool, default = True
Whether to download the batch transform results to the disk and load it after the batch transform finishes.
Will be ignored if `wait` is `False`.
2. persist: bool, default = True
Whether to persist the downloaded batch transform results on the disk.
Will be ignored if `download` is `False`
3. save_path: str, default = None,
Path to save the downloaded result.
Will be ignored if `download` is `False`.
If None, CloudPredictor will create one.
If `persist` is `False`, file would first be downloaded to this path and then removed.
4. model_kwargs: dict, default = dict()
Any extra arguments needed to initialize Sagemaker Model
Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#model for all options
5. transformer_kwargs: dict
Any extra arguments needed to pass to transformer.
Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer for all options.
6. transform_kwargs:
Any extra arguments needed to pass to transform.
Please refer to
https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer.transform for all options.
"""
if backend_kwargs is None:
backend_kwargs = {}
backend_kwargs = self.backend.parse_backend_predict_kwargs(backend_kwargs)
return self.backend.predict(
test_data=data,
static_features=static_features,
known_covariates=known_covariates,
predictor_path=predictor_path,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
custom_image_uri=custom_image_uri,
wait=wait,
**backend_kwargs,
)
[docs]
def predict_proba(
self,
**kwargs,
) -> Optional[pd.DataFrame]:
raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
[docs]
def fit_predict(
self,
train_data: Union[str, pd.DataFrame],
*,
predictor_init_args: Dict[str, Any],
predictor_fit_args: Optional[Dict[str, Any]] = None,
known_covariates: Optional[Union[str, pd.DataFrame]] = None,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
static_features: Optional[Union[str, pd.DataFrame]] = None,
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
volume_size: int = 100,
custom_image_uri: Optional[str] = None,
wait: bool = True,
predictions_path: Optional[str] = None,
backend_kwargs: Optional[Dict] = None,
) -> Optional[pd.DataFrame]:
"""
Fit and predict in a single SageMaker training job.
This is useful for foundation-model forecasting workflows (e.g. Chronos-2) where "fit" is essentially loading
a pretrained model. Running fit and predict in the same job avoids the SageMaker startup overhead twice.
Predictions are generated inside the training container against ``train_data`` (the standard time-series
forecasting flow where the last ``prediction_length`` steps of each series are forecast) and written
directly to S3.
Parameters
----------
train_data: Union[str, pd.DataFrame]
The historical time-series data to train on and forecast from. Either a pandas DataFrame or a local / S3
path to a data file (CSV or Parquet).
predictor_init_args: dict
Init args for the predictor (must include ``prediction_length``).
predictor_fit_args: Optional[dict], default = None
Additional fit args for the predictor. Must not contain a ``train_data`` key — pass ``train_data`` as the
explicit argument above.
known_covariates: Optional[Union[str, pd.DataFrame]], default = None
Values of the known covariates for each time series during the forecast horizon. Either a pandas
DataFrame or a local / S3 path to a data file (CSV or Parquet). Forwarded to
``TimeSeriesPredictor.predict`` in the container.
For details, see:
https://auto.gluon.ai/stable/api/autogluon.timeseries.TimeSeriesPredictor.predict.html
id_column: str, default = "item_id"
Name of the item ID column.
timestamp_column: str, default = "timestamp"
Name of the timestamp column.
static_features: Optional[pd.DataFrame]
Optional metadata attributes per item.
framework_version, job_name, instance_type, instance_count, volume_size, custom_image_uri, wait,
backend_kwargs:
Same semantics as ``fit()``.
predictions_path: Optional[str]
S3 URL where predictions will be written by the training container (e.g.
``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution role must
have ``s3:PutObject`` permission for this location. Defaults to
``{cloud_output_path}/{job_name}/predictions.csv``. Predictions use AutoGluon's canonical column
names ``item_id`` and ``timestamp``, regardless of the ``id_column`` / ``timestamp_column`` passed in.
Returns
-------
Optional[pd.DataFrame]
Predictions as a DataFrame. Returns ``None`` when ``wait`` is False.
"""
if predictions_path is not None:
if not is_s3_url(predictions_path) or not predictions_path.endswith((".csv", ".parquet")):
raise ValueError(
f"`predictions_path` must be a full S3 URL ending in '.csv' or '.parquet' "
f"(e.g. 's3://bucket/key/predictions.parquet'), got {predictions_path!r}."
)
if predictor_fit_args is None:
predictor_fit_args = {}
else:
predictor_fit_args = dict(predictor_fit_args)
if "train_data" in predictor_fit_args:
raise ValueError(
"`train_data` must be passed as an explicit argument to `fit_predict`, not via `predictor_fit_args`."
)
if "known_covariates" in predictor_fit_args:
raise ValueError(
"`known_covariates` must be passed as an explicit argument to `fit_predict`, "
"not via `predictor_fit_args`."
)
predictor_fit_args["train_data"] = train_data
if known_covariates is not None:
predictor_fit_args["known_covariates"] = known_covariates
if backend_kwargs is None:
backend_kwargs = {}
else:
backend_kwargs = dict(backend_kwargs)
extra_ag_args = {"predict_after_fit": True}
if predictions_path is not None:
extra_ag_args["predictions_path"] = predictions_path
backend_kwargs["extra_ag_args"] = extra_ag_args
self.fit(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
id_column=id_column,
timestamp_column=timestamp_column,
static_features=static_features,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
volume_size=volume_size,
custom_image_uri=custom_image_uri,
wait=wait,
backend_kwargs=backend_kwargs,
)
if not wait:
logger.info(
"fit_predict job launched asynchronously. Use `get_fit_job_status()` "
"to poll, then `get_fit_predict_results()` to fetch predictions."
)
return None
return self.get_fit_predict_results()
[docs]
def get_fit_predict_results(self) -> pd.DataFrame:
"""
Retrieve predictions produced by a completed ``fit_predict`` job.
Returns
-------
pd.DataFrame
Predictions for the forecast horizon.
"""
return self.backend.get_fit_predict_results()